bf6c83bb14af2786112ccd7d688acaef9e55b1c6
[oonf.git] / src / core / olsr_stream_socket.c
1
2 /*
3  * The olsr.org Optimized Link-State Routing daemon(olsrd)
4  * Copyright (c) 2004-2011, the olsr.org team - see HISTORY file
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * * Redistributions of source code must retain the above copyright
12  *   notice, this list of conditions and the following disclaimer.
13  * * Redistributions in binary form must reproduce the above copyright
14  *   notice, this list of conditions and the following disclaimer in
15  *   the documentation and/or other materials provided with the
16  *   distribution.
17  * * Neither the name of olsr.org, olsrd nor the names of its
18  *   contributors may be used to endorse or promote products derived
19  *   from this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  *
34  * Visit http://www.olsr.org for more information.
35  *
36  * If you find this software useful feel free to make a donation
37  * to the project. For more information see the website or contact
38  * the copyright holders.
39  *
40  */
41
42 #include <ctype.h>
43 #include <fcntl.h>
44 #include <stdio.h>
45 #include <string.h>
46 #include <stdlib.h>
47 #include <unistd.h>
48 #include <errno.h>
49
50 #include "common/autobuf.h"
51 #include "common/avl.h"
52 #include "common/list.h"
53 #include "olsr_cfg.h"
54 #include "olsr_logging.h"
55 #include "olsr_memcookie.h"
56 #include "olsr_socket.h"
57 #include "olsr_timer.h"
58 #include "os_net.h"
59 #include "olsr.h"
60 #include "olsr_stream_socket.h"
61
62 struct list_entity olsr_stream_head;
63
64 /* server socket */
65 static struct olsr_memcookie_info *connection_cookie;
66 static struct olsr_timer_info *connection_timeout;
67
68 /* remember if initialized or not */
69 OLSR_SUBSYSTEM_STATE(_stream_state);
70
71 static int _apply_managed_socket(struct olsr_stream_managed *managed,
72     struct olsr_stream_socket *stream, struct netaddr *bindto, uint16_t port);
73 static void _cb_parse_request(int fd, void *data, bool, bool);
74 static struct olsr_stream_session *_create_session(
75     struct olsr_stream_socket *stream_socket, int sock, struct netaddr *remote_addr);
76 static void _cb_parse_connection(int fd, void *data, bool r,bool w);
77
78 static void _cb_timeout_handler(void *);
79
80 /**
81  * Initialize the stream socket handlers
82  * @return -1 if an error happened, 0 otherwise.
83  */
84 int
85 olsr_stream_init(void) {
86   if (olsr_subsystem_is_initialized(&_stream_state))
87     return 0;
88
89   connection_cookie = olsr_memcookie_add("stream socket connections",
90       sizeof(struct olsr_stream_session));
91   if (connection_cookie == NULL) {
92     OLSR_WARN_OOM(LOG_SOCKET_STREAM);
93     return -1;
94   }
95
96   connection_timeout = olsr_timer_add("stream socket timout",
97       &_cb_timeout_handler, false);
98   if (connection_timeout == NULL) {
99     OLSR_WARN_OOM(LOG_SOCKET_STREAM);
100     olsr_memcookie_remove(connection_cookie);
101     return -1;
102   }
103
104   list_init_head(&olsr_stream_head);
105   olsr_subsystem_init(&_stream_state);
106   return 0;
107 }
108
109 /**
110  * Cleanup all resources allocated be stream socket handlers
111  */
112 void
113 olsr_stream_cleanup(void) {
114   struct olsr_stream_socket *comport;
115
116   if (olsr_subsystem_cleanup(&_stream_state))
117     return;
118
119   while (!list_is_empty(&olsr_stream_head)) {
120     comport = list_first_element(&olsr_stream_head, comport, node);
121
122     olsr_stream_remove(comport, true);
123   }
124
125   olsr_memcookie_remove(connection_cookie);
126   olsr_timer_remove(connection_timeout);
127 }
128
129 /**
130  * Flush all data in outgoing buffer of a stream socket
131  * @param con pointer to stream socket
132  */
133 void
134 olsr_stream_flush(struct olsr_stream_session *con) {
135   olsr_socket_set_write(&con->scheduler_entry, true);
136 }
137
138 /**
139  * Add a new stream socket to the scheduler
140  * @param stream_socket pointer to uninitialized stream socket struct
141  * @param local pointer to local ip/port of socket, port must be 0 if
142  *   this shall be an outgoing socket
143  * @return -1 if an error happened, 0 otherwise
144  */
145 int
146 olsr_stream_add(struct olsr_stream_socket *stream_socket,
147     union netaddr_socket *local) {
148   int s = -1;
149 #if !defined(REMOVE_LOG_WARN)
150   struct netaddr_str buf;
151 #endif
152
153   memset(stream_socket, 0, sizeof(*stream_socket));
154
155   /* server socket not necessary for outgoing connections */
156   if (netaddr_socket_get_port(local) != 0) {
157     /* Init socket */
158     s = os_net_getsocket(local, OS_SOCKET_TCP, 0, LOG_SOCKET_STREAM);
159     if (s < 0) {
160       goto add_stream_error;
161     }
162
163     /* show that we are willing to listen */
164     if (listen(s, 1) == -1) {
165       OLSR_WARN(LOG_SOCKET_STREAM, "tcp socket listen failed for %s: %s (%d)\n",
166           netaddr_socket_to_string(&buf, local), strerror(errno), errno);
167       goto add_stream_error;
168     }
169
170     stream_socket->scheduler_entry.fd = s;
171     stream_socket->scheduler_entry.process = _cb_parse_request;
172     stream_socket->scheduler_entry.data = stream_socket;
173     stream_socket->scheduler_entry.event_read = true;
174
175     olsr_socket_add(&stream_socket->scheduler_entry);
176   }
177   memcpy(&stream_socket->local_socket, local, sizeof(stream_socket->local_socket));
178
179   if (stream_socket->config.memcookie == NULL) {
180     stream_socket->config.memcookie = connection_cookie;
181   }
182   if (stream_socket->config.allowed_sessions == 0) {
183     stream_socket->config.allowed_sessions = 10;
184   }
185   if (stream_socket->config.maximum_input_buffer == 0) {
186     stream_socket->config.maximum_input_buffer = 65536;
187   }
188
189   list_init_head(&stream_socket->session);
190   list_add_tail(&olsr_stream_head, &stream_socket->node);
191
192   return 0;
193
194 add_stream_error:
195   if (stream_socket->scheduler_entry.fd) {
196     olsr_socket_remove(&stream_socket->scheduler_entry);
197   }
198   if (s != -1) {
199     os_close(s);
200   }
201   return -1;
202 }
203
204 /**
205  * Remove a stream socket from the scheduler
206  * @param stream_socket pointer to socket
207  * @param force true if socket will be closed immediately,
208  *   false if scheduler should wait until outgoing buffers are empty
209  */
210 void
211 olsr_stream_remove(struct olsr_stream_socket *stream_socket, bool force) {
212   struct olsr_stream_session *session, *ptr;
213
214   if (stream_socket->busy && !force) {
215     stream_socket->remove = true;
216     return;
217   }
218
219   if (!list_is_node_added(&stream_socket->node)) {
220     return;
221   }
222
223   list_for_each_element_safe(&stream_socket->session, session, node, ptr) {
224     if (force || (abuf_getlen(&session->out) == 0 && !session->busy)) {
225       /* close everything that doesn't need to send data anymore */
226       olsr_stream_close(session, force);
227     }
228   }
229
230   if (!list_is_empty(&stream_socket->session)) {
231     return;
232   }
233
234   list_remove(&stream_socket->node);
235
236   if (stream_socket->scheduler_entry.fd) {
237     /* only for server sockets */
238     os_close(stream_socket->scheduler_entry.fd);
239     olsr_socket_remove(&stream_socket->scheduler_entry);
240   }
241 }
242
243 /**
244  * Create an outgoing stream socket.
245  * @param stream socket pointer to stream socket
246  * @param remote pointer to address of remote TCP server
247  * @return pointer to stream session, NULL if an error happened.
248  */
249 struct olsr_stream_session *
250 olsr_stream_connect_to(struct olsr_stream_socket *stream_socket,
251     union netaddr_socket *remote) {
252   struct olsr_stream_session *session;
253   struct netaddr remote_addr;
254   bool wait_for_connect = false;
255   int s;
256 #if !defined REMOVE_LOG_WARN
257   struct netaddr_str buf;
258 #endif
259
260   s = os_net_getsocket(&stream_socket->local_socket,
261       OS_SOCKET_TCP, 0, LOG_SOCKET_STREAM);
262   if (s < 0) {
263     return NULL;
264   }
265
266   if (connect(s, &remote->std, sizeof(*remote))) {
267     if (errno != EINPROGRESS) {
268       OLSR_WARN(LOG_SOCKET_STREAM, "Cannot connect outgoing tcp connection to %s: %s (%d)",
269           netaddr_socket_to_string(&buf, remote), strerror(errno), errno);
270       goto connect_to_error;
271     }
272     wait_for_connect = true;
273   }
274
275   netaddr_from_socket(&remote_addr, remote);
276   session = _create_session(stream_socket, s, &remote_addr);
277   if (session) {
278     session->wait_for_connect = wait_for_connect;
279     return session;
280   }
281
282   /* fall through */
283 connect_to_error:
284   if (s) {
285     os_close(s);
286   }
287   return NULL;
288 }
289
290 /**
291  * Reset the session timeout of a TCP session
292  * @param con pointer to stream session
293  * @param timeout timeout in milliseconds
294  */
295 void
296 olsr_stream_set_timeout(struct olsr_stream_session *con, uint32_t timeout) {
297   olsr_timer_set(&con->timeout, timeout, 0, con, connection_timeout);
298 }
299
300 /**
301  * Close a TCP stream session
302  * @param session pointer to stream session
303  */
304 void
305 olsr_stream_close(struct olsr_stream_session *session, bool force) {
306   if (session->busy && !force) {
307     /* remove the session later */
308     session->removed = true;
309     return;
310   }
311
312   if (!list_is_node_added(&session->node)) {
313     return;
314   }
315
316   if (session->comport->config.cleanup) {
317     session->comport->config.cleanup(session);
318   }
319
320   if (session->timeout) {
321     olsr_timer_stop(session->timeout);
322   }
323
324   session->comport->config.allowed_sessions++;
325   list_remove(&session->node);
326
327   os_close(session->scheduler_entry.fd);
328   olsr_socket_remove(&session->scheduler_entry);
329
330   abuf_free(&session->in);
331   abuf_free(&session->out);
332
333   olsr_memcookie_free(session->comport->config.memcookie, session);
334 }
335
336 /**
337  * Initialized a managed TCP stream
338  * @param managed pointer to uninitialized managed stream
339  */
340 void
341 olsr_stream_add_managed(struct olsr_stream_managed *managed) {
342   memset(managed, 0, sizeof(*managed));
343   managed->config.allowed_sessions = 10;
344   managed->config.maximum_input_buffer = 65536;
345   managed->config.session_timeout = 120000;
346 }
347
348 /**
349  * Apply a configuration to a stream. Will reset both ACLs
350  * and socket ports/bindings.
351  * @param managed pointer to managed stream
352  * @param config pointer to stream config
353  * @return -1 if an error happened, 0 otherwise.
354  */
355 int
356 olsr_stream_apply_managed(struct olsr_stream_managed *managed,
357     struct olsr_stream_managed_config *config) {
358   olsr_acl_copy(&managed->acl, &config->acl);
359
360   if (config_global.ipv4) {
361     if (_apply_managed_socket(managed,
362         &managed->socket_v4, &config->bindto_v4, config->port)) {
363       return -1;
364     }
365   }
366   else {
367     olsr_stream_remove(&managed->socket_v4, true);
368   }
369
370   if (config_global.ipv6) {
371     if (_apply_managed_socket(managed,
372         &managed->socket_v6, &config->bindto_v6, config->port)) {
373       return -1;
374     }
375   }
376   else {
377     olsr_stream_remove(&managed->socket_v6, true);
378   }
379   return 0;
380 }
381
382 /**
383  * Remove a managed TCP stream
384  * @param managed pointer to managed stream
385  * @param force true if socket will be closed immediately,
386  *   false if scheduler should wait until outgoing buffers are empty
387  */
388 void
389 olsr_stream_remove_managed(struct olsr_stream_managed *managed, bool forced) {
390   olsr_stream_remove(&managed->socket_v4, forced);
391   olsr_stream_remove(&managed->socket_v6, forced);
392
393   olsr_acl_remove(&managed->acl);
394 }
395
396 /**
397  * Apply new configuration to a managed stream socket
398  * @param managed pointer to managed stream
399  * @param stream pointer to TCP stream to configure
400  * @param bindto local address to bind socket to
401  * @param port local port number
402  * @return -1 if an error happened, 0 otherwise.
403  */
404 static int
405 _apply_managed_socket(struct olsr_stream_managed *managed,
406     struct olsr_stream_socket *stream,
407     struct netaddr *bindto, uint16_t port) {
408   union netaddr_socket sock;
409 #if !defined(REMOVE_LOG_WARN)
410   struct netaddr_str buf;
411 #endif
412
413   if (netaddr_socket_init(&sock, bindto, port)) {
414     OLSR_WARN(LOG_SOCKET_STREAM, "Cannot create managed socket address: %s/%u",
415         netaddr_to_string(&buf, bindto), port);
416     return -1;
417   }
418
419   if (memcmp(&sock, &stream->local_socket, sizeof(sock)) == 0) {
420     /* nothing changed */
421     return 0;
422   }
423
424   olsr_stream_remove(stream, true);
425   if (olsr_stream_add(stream, &sock)) {
426     return -1;
427   }
428
429   /* copy configuration */
430   memcpy(&stream->config, &managed->config, sizeof(stream->config));
431   if (stream->config.memcookie == NULL) {
432     stream->config.memcookie = connection_cookie;
433   }
434   return 0;
435 }
436
437 /**
438  * Handle incoming server socket event from socket scheduler.
439  * @param fd filedescriptor for event
440  * @param data custom user data
441  * @param event_read true if read-event is incoming
442  * @param event_write true if write-event is incoming
443  */
444 static void
445 _cb_parse_request(int fd, void *data, bool event_read,
446     bool event_write __attribute__((unused))) {
447   struct olsr_stream_socket *comport;
448   union netaddr_socket remote_socket;
449   struct netaddr remote_addr;
450   socklen_t addrlen;
451   int sock;
452 #if !defined(REMOVE_LOG_DEBUG)
453       struct netaddr_str buf1, buf2;
454 #endif
455
456   if (!event_read) {
457     return;
458   }
459
460   comport = data;
461
462   addrlen = sizeof(remote_socket);
463   sock = accept(fd, &remote_socket.std, &addrlen);
464   if (sock < 0) {
465     OLSR_WARN(LOG_SOCKET_STREAM, "accept() call returned error: %s (%d)", strerror(errno), errno);
466     return;
467   }
468
469   netaddr_from_socket(&remote_addr, &remote_socket);
470   if (comport->config.acl) {
471     if (!olsr_acl_check_accept(comport->config.acl, &remote_addr)) {
472       OLSR_DEBUG(LOG_SOCKET_STREAM, "Access from %s to socket %s blocked because of ACL",
473           netaddr_to_string(&buf1, &remote_addr),
474           netaddr_socket_to_string(&buf2, &comport->local_socket));
475       close(sock);
476       return;
477     }
478   }
479   _create_session(comport, sock, &remote_addr);
480 }
481
482 /**
483  * Configure a TCP session socket
484  * @param stream_socket pointer to stream socket
485  * @param sock pointer to socket filedescriptor
486  * @param remote_addr pointer to remote address
487  * @return pointer to new stream session, NULL if an error happened.
488  */
489 static struct olsr_stream_session *
490 _create_session(struct olsr_stream_socket *stream_socket,
491     int sock, struct netaddr *remote_addr) {
492   struct olsr_stream_session *session;
493 #if !defined REMOVE_LOG_DEBUG
494   struct netaddr_str buf;
495 #endif
496
497   /* put socket into non-blocking mode */
498   if (os_net_set_nonblocking(sock)) {
499     OLSR_WARN(LOG_SOCKET_STREAM, "Cannot read comport socket status: %s (%d)",
500         strerror(errno), errno);
501     return NULL;
502   }
503
504   session = olsr_memcookie_malloc(stream_socket->config.memcookie);
505   if (session == NULL) {
506     OLSR_WARN(LOG_SOCKET_STREAM, "Cannot allocate memory for comport session");
507     goto parse_request_error;
508   }
509
510   if (abuf_init(&session->in)) {
511     OLSR_WARN(LOG_SOCKET_STREAM, "Cannot allocate memory for comport session");
512     goto parse_request_error;
513   }
514   if (abuf_init(&session->out)) {
515     OLSR_WARN(LOG_SOCKET_STREAM, "Cannot allocate memory for comport session");
516     goto parse_request_error;
517   }
518
519   session->scheduler_entry.fd = sock;
520   session->scheduler_entry.process = _cb_parse_connection;
521   session->scheduler_entry.data = session;
522   session->scheduler_entry.event_read = true;
523   session->scheduler_entry.event_write = true;
524   olsr_socket_add(&session->scheduler_entry);
525
526   session->send_first = stream_socket->config.send_first;
527   session->comport = stream_socket;
528
529   session->remote_address = *remote_addr;
530
531   if (stream_socket->config.allowed_sessions-- > 0) {
532     /* create active session */
533     session->state = STREAM_SESSION_ACTIVE;
534   } else {
535     /* too many sessions */
536     if (stream_socket->config.create_error) {
537       stream_socket->config.create_error(session, STREAM_SERVICE_UNAVAILABLE);
538     }
539     session->state = STREAM_SESSION_SEND_AND_QUIT;
540   }
541
542   if (stream_socket->config.session_timeout) {
543     session->timeout = olsr_timer_start(
544         stream_socket->config.session_timeout, 0, session, connection_timeout);
545   }
546
547   if (stream_socket->config.init) {
548     if (stream_socket->config.init(session)) {
549       goto parse_request_error;
550     }
551   }
552
553   OLSR_DEBUG(LOG_SOCKET_STREAM, "Got connection through socket %d with %s.\n",
554       sock, netaddr_to_string(&buf, remote_addr));
555
556   list_add_tail(&stream_socket->session, &session->node);
557   return session;
558
559 parse_request_error:
560   abuf_free(&session->in);
561   abuf_free(&session->out);
562   olsr_memcookie_free(stream_socket->config.memcookie, session);
563
564   return NULL;
565 }
566
567 /**
568  * Handle TCP session timeout
569  * @param data custom data
570  */
571 static void
572 _cb_timeout_handler(void *data) {
573   struct olsr_stream_session *session = data;
574   olsr_stream_close(session, false);
575 }
576
577 /**
578  * Handle events for TCP session from network scheduler
579  * @param fd filedescriptor of TCP session
580  * @param data custom data
581  * @param event_read true if read-event is incoming
582  * @param event_write true if write-event is incoming
583  */
584 static void
585 _cb_parse_connection(int fd, void *data, bool event_read, bool event_write) {
586   struct olsr_stream_session *session;
587   struct olsr_stream_socket *s_sock;
588   int len;
589   char buffer[1024];
590 #if !defined(REMOVE_LOG_WARN)
591   struct netaddr_str buf;
592 #endif
593
594   session = data;
595   s_sock = session->comport;
596
597   OLSR_DEBUG(LOG_SOCKET_STREAM, "Parsing connection of socket %d\n", fd);
598
599   /* mark session and s_sock as busy */
600   session->busy = true;
601   s_sock->busy = true;
602
603   if (session->wait_for_connect) {
604     if (event_write) {
605       int value;
606       socklen_t value_len;
607
608       value_len = sizeof(value);
609
610       if(getsockopt(fd, SOL_SOCKET, SO_ERROR, &value, &value_len)) {
611         OLSR_WARN(LOG_SOCKET_STREAM, "getsockopt failed: %s (%d)",
612             strerror(errno), errno);
613         session->state = STREAM_SESSION_CLEANUP;
614       }
615       else if (value != 0) {
616         OLSR_WARN(LOG_SOCKET_STREAM, "Connection to %s failed: %s (%d)",
617             netaddr_to_string(&buf, &session->remote_address), strerror(value), value);
618         session->state = STREAM_SESSION_CLEANUP;
619       }
620       else {
621         session->wait_for_connect = false;
622       }
623     }
624   }
625
626   if (session->wait_for_connect) {
627     session->busy = false;
628     s_sock->busy = false;
629     return;
630   }
631
632   /* read data if necessary */
633   if (session->state == STREAM_SESSION_ACTIVE && event_read) {
634     len = recv(fd, buffer, sizeof(buffer), 0);
635     if (len > 0) {
636       OLSR_DEBUG(LOG_SOCKET_STREAM, "  recv returned %d\n", len);
637       if (abuf_memcpy(&session->in, buffer, len)) {
638         /* out of memory */
639         OLSR_WARN(LOG_SOCKET_STREAM, "Out of memory for comport session input buffer");
640         session->state = STREAM_SESSION_CLEANUP;
641       } else if (abuf_getlen(&session->in) > s_sock->config.maximum_input_buffer) {
642         /* input buffer overflow */
643         if (s_sock->config.create_error) {
644           s_sock->config.create_error(session, STREAM_REQUEST_TOO_LARGE);
645         }
646         session->state = STREAM_SESSION_SEND_AND_QUIT;
647       } else {
648         /* got new input block, reset timeout */
649         olsr_stream_set_timeout(session, s_sock->config.session_timeout);
650       }
651     } else if (len < 0 && errno != EINTR && errno != EAGAIN && errno
652         != EWOULDBLOCK) {
653       /* error during read */
654       OLSR_WARN(LOG_SOCKET_STREAM, "Error while reading from communication stream with %s: %s (%d)\n",
655           netaddr_to_string(&buf, &session->remote_address), strerror(errno), errno);
656       session->state = STREAM_SESSION_CLEANUP;
657     } else if (len == 0) {
658       /* external s_sock closed */
659       session->state = STREAM_SESSION_SEND_AND_QUIT;
660     }
661   }
662
663   if (session->state == STREAM_SESSION_ACTIVE && s_sock->config.receive_data != NULL
664       && (abuf_getlen(&session->in) > 0 || session->send_first)) {
665     session->state = s_sock->config.receive_data(session);
666     session->send_first = false;
667   }
668
669   /* send data if necessary */
670   if (session->state != STREAM_SESSION_CLEANUP && abuf_getlen(&session->out) > 0) {
671     if (event_write) {
672       len = send(fd, abuf_getptr(&session->out), abuf_getlen(&session->out), 0);
673
674       if (len > 0) {
675         OLSR_DEBUG(LOG_SOCKET_STREAM, "  send returned %d\n", len);
676         abuf_pull(&session->out, len);
677         olsr_stream_set_timeout(session, s_sock->config.session_timeout);
678       } else if (len < 0 && errno != EINTR && errno != EAGAIN && errno
679           != EWOULDBLOCK) {
680         OLSR_WARN(LOG_SOCKET_STREAM, "Error while writing to communication stream with %s: %s (%d)\n",
681             netaddr_to_string(&buf, &session->remote_address), strerror(errno), errno);
682         session->state = STREAM_SESSION_CLEANUP;
683       }
684     } else {
685       OLSR_DEBUG(LOG_SOCKET_STREAM, "  activating output in scheduler\n");
686       olsr_socket_set_write(&session->scheduler_entry, true);
687     }
688   }
689
690   if (abuf_getlen(&session->out) == 0) {
691     /* nothing to send anymore */
692     OLSR_DEBUG(LOG_SOCKET_STREAM, "  deactivating output in scheduler\n");
693     olsr_socket_set_write(&session->scheduler_entry, false);
694     if (session->state == STREAM_SESSION_SEND_AND_QUIT) {
695       session->state = STREAM_SESSION_CLEANUP;
696     }
697   }
698
699   session->busy = false;
700   s_sock->busy = false;
701
702   /* end of connection ? */
703   if (session->state == STREAM_SESSION_CLEANUP || session->removed) {
704     OLSR_DEBUG(LOG_SOCKET_STREAM, "  cleanup\n");
705
706     /* clean up connection by calling cleanup directly */
707     olsr_stream_close(session, session->state == STREAM_SESSION_CLEANUP);
708   }
709
710   /* lazy socket removal */
711   if (s_sock->remove) {
712     olsr_stream_remove(s_sock, false);
713   }
714   return;
715 }