dba9e8381cba9174af3338dd817fb740e47de1c3
[oonf.git] / src / core / olsr_stream_socket.c
1
2 /*
3  * The olsr.org Optimized Link-State Routing daemon(olsrd)
4  * Copyright (c) 2004-2011, the olsr.org team - see HISTORY file
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * * Redistributions of source code must retain the above copyright
12  *   notice, this list of conditions and the following disclaimer.
13  * * Redistributions in binary form must reproduce the above copyright
14  *   notice, this list of conditions and the following disclaimer in
15  *   the documentation and/or other materials provided with the
16  *   distribution.
17  * * Neither the name of olsr.org, olsrd nor the names of its
18  *   contributors may be used to endorse or promote products derived
19  *   from this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  *
34  * Visit http://www.olsr.org for more information.
35  *
36  * If you find this software useful feel free to make a donation
37  * to the project. For more information see the website or contact
38  * the copyright holders.
39  *
40  */
41
42 #include <ctype.h>
43 #include <fcntl.h>
44 #include <stdio.h>
45 #include <string.h>
46 #include <stdlib.h>
47 #include <unistd.h>
48 #include <errno.h>
49
50 #include "common/autobuf.h"
51 #include "common/avl.h"
52 #include "common/list.h"
53 #include "olsr_cfg.h"
54 #include "olsr_logging.h"
55 #include "olsr_memcookie.h"
56 #include "olsr_socket.h"
57 #include "olsr_timer.h"
58 #include "os_net.h"
59 #include "olsr.h"
60 #include "olsr_stream_socket.h"
61
62 static int _apply_managed_socket(struct olsr_stream_managed *managed,
63     struct olsr_stream_socket *stream, struct netaddr *bindto, uint16_t port);
64 static void _cb_parse_request(int fd, void *data, bool, bool);
65 static struct olsr_stream_session *_create_session(
66     struct olsr_stream_socket *stream_socket, int sock, struct netaddr *remote_addr);
67 static void _cb_parse_connection(int fd, void *data, bool r,bool w);
68
69 static void _cb_timeout_handler(void *);
70
71 /* list of olsr stream sockets */
72 struct list_entity olsr_stream_head;
73
74 /* server socket */
75 static struct olsr_memcookie_info connection_cookie = {
76   .name = "stream socket connection",
77   .size = sizeof(struct olsr_stream_session)
78 };
79
80 static struct olsr_timer_info connection_timeout = {
81   .name = "stream socket timout",
82   .callback = _cb_timeout_handler,
83 };
84
85 /* remember if initialized or not */
86 OLSR_SUBSYSTEM_STATE(_stream_state);
87
88 /**
89  * Initialize the stream socket handlers
90  */
91 void
92 olsr_stream_init(void) {
93   if (olsr_subsystem_init(&_stream_state))
94     return;
95
96   olsr_memcookie_add(&connection_cookie);
97   olsr_timer_add(&connection_timeout);
98   list_init_head(&olsr_stream_head);
99 }
100
101 /**
102  * Cleanup all resources allocated be stream socket handlers
103  */
104 void
105 olsr_stream_cleanup(void) {
106   struct olsr_stream_socket *comport;
107
108   if (olsr_subsystem_cleanup(&_stream_state))
109     return;
110
111   while (!list_is_empty(&olsr_stream_head)) {
112     comport = list_first_element(&olsr_stream_head, comport, node);
113
114     olsr_stream_remove(comport, true);
115   }
116
117   olsr_memcookie_remove(&connection_cookie);
118   olsr_timer_remove(&connection_timeout);
119 }
120
121 /**
122  * Flush all data in outgoing buffer of a stream socket
123  * @param con pointer to stream socket
124  */
125 void
126 olsr_stream_flush(struct olsr_stream_session *con) {
127   olsr_socket_set_write(&con->scheduler_entry, true);
128 }
129
130 /**
131  * Add a new stream socket to the scheduler
132  * @param stream_socket pointer to uninitialized stream socket struct
133  * @param local pointer to local ip/port of socket, port must be 0 if
134  *   this shall be an outgoing socket
135  * @return -1 if an error happened, 0 otherwise
136  */
137 int
138 olsr_stream_add(struct olsr_stream_socket *stream_socket,
139     union netaddr_socket *local) {
140   int s = -1;
141 #if !defined(REMOVE_LOG_WARN)
142   struct netaddr_str buf;
143 #endif
144
145   memset(stream_socket, 0, sizeof(*stream_socket));
146
147   /* server socket not necessary for outgoing connections */
148   if (netaddr_socket_get_port(local) != 0) {
149     /* Init socket */
150     s = os_net_getsocket(local, OS_SOCKET_TCP, 0, LOG_SOCKET_STREAM);
151     if (s < 0) {
152       goto add_stream_error;
153     }
154
155     /* show that we are willing to listen */
156     if (listen(s, 1) == -1) {
157       OLSR_WARN(LOG_SOCKET_STREAM, "tcp socket listen failed for %s: %s (%d)\n",
158           netaddr_socket_to_string(&buf, local), strerror(errno), errno);
159       goto add_stream_error;
160     }
161
162     stream_socket->scheduler_entry.fd = s;
163     stream_socket->scheduler_entry.process = _cb_parse_request;
164     stream_socket->scheduler_entry.data = stream_socket;
165     stream_socket->scheduler_entry.event_read = true;
166
167     olsr_socket_add(&stream_socket->scheduler_entry);
168   }
169   memcpy(&stream_socket->local_socket, local, sizeof(stream_socket->local_socket));
170
171   if (stream_socket->config.memcookie == NULL) {
172     stream_socket->config.memcookie = &connection_cookie;
173   }
174   if (stream_socket->config.allowed_sessions == 0) {
175     stream_socket->config.allowed_sessions = 10;
176   }
177   if (stream_socket->config.maximum_input_buffer == 0) {
178     stream_socket->config.maximum_input_buffer = 65536;
179   }
180
181   list_init_head(&stream_socket->session);
182   list_add_tail(&olsr_stream_head, &stream_socket->node);
183
184   return 0;
185
186 add_stream_error:
187   if (stream_socket->scheduler_entry.fd) {
188     olsr_socket_remove(&stream_socket->scheduler_entry);
189   }
190   if (s != -1) {
191     os_close(s);
192   }
193   return -1;
194 }
195
196 /**
197  * Remove a stream socket from the scheduler
198  * @param stream_socket pointer to socket
199  * @param force true if socket will be closed immediately,
200  *   false if scheduler should wait until outgoing buffers are empty
201  */
202 void
203 olsr_stream_remove(struct olsr_stream_socket *stream_socket, bool force) {
204   struct olsr_stream_session *session, *ptr;
205
206   if (stream_socket->busy && !force) {
207     stream_socket->remove = true;
208     return;
209   }
210
211   if (!list_is_node_added(&stream_socket->node)) {
212     return;
213   }
214
215   list_for_each_element_safe(&stream_socket->session, session, node, ptr) {
216     if (force || (abuf_getlen(&session->out) == 0 && !session->busy)) {
217       /* close everything that doesn't need to send data anymore */
218       olsr_stream_close(session, force);
219     }
220   }
221
222   if (!list_is_empty(&stream_socket->session)) {
223     return;
224   }
225
226   list_remove(&stream_socket->node);
227
228   if (stream_socket->scheduler_entry.fd) {
229     /* only for server sockets */
230     os_close(stream_socket->scheduler_entry.fd);
231     olsr_socket_remove(&stream_socket->scheduler_entry);
232   }
233 }
234
235 /**
236  * Create an outgoing stream socket.
237  * @param stream socket pointer to stream socket
238  * @param remote pointer to address of remote TCP server
239  * @return pointer to stream session, NULL if an error happened.
240  */
241 struct olsr_stream_session *
242 olsr_stream_connect_to(struct olsr_stream_socket *stream_socket,
243     union netaddr_socket *remote) {
244   struct olsr_stream_session *session;
245   struct netaddr remote_addr;
246   bool wait_for_connect = false;
247   int s;
248 #if !defined REMOVE_LOG_WARN
249   struct netaddr_str buf;
250 #endif
251
252   s = os_net_getsocket(&stream_socket->local_socket,
253       OS_SOCKET_TCP, 0, LOG_SOCKET_STREAM);
254   if (s < 0) {
255     return NULL;
256   }
257
258   if (connect(s, &remote->std, sizeof(*remote))) {
259     if (errno != EINPROGRESS) {
260       OLSR_WARN(LOG_SOCKET_STREAM, "Cannot connect outgoing tcp connection to %s: %s (%d)",
261           netaddr_socket_to_string(&buf, remote), strerror(errno), errno);
262       goto connect_to_error;
263     }
264     wait_for_connect = true;
265   }
266
267   netaddr_from_socket(&remote_addr, remote);
268   session = _create_session(stream_socket, s, &remote_addr);
269   if (session) {
270     session->wait_for_connect = wait_for_connect;
271     return session;
272   }
273
274   /* fall through */
275 connect_to_error:
276   if (s) {
277     os_close(s);
278   }
279   return NULL;
280 }
281
282 /**
283  * Reset the session timeout of a TCP session
284  * @param con pointer to stream session
285  * @param timeout timeout in milliseconds
286  */
287 void
288 olsr_stream_set_timeout(struct olsr_stream_session *con, uint64_t timeout) {
289   olsr_timer_set(&con->timeout, timeout);
290 }
291
292 /**
293  * Close a TCP stream session
294  * @param session pointer to stream session
295  */
296 void
297 olsr_stream_close(struct olsr_stream_session *session, bool force) {
298   if (session->busy && !force) {
299     /* remove the session later */
300     session->removed = true;
301     return;
302   }
303
304   if (!list_is_node_added(&session->node)) {
305     return;
306   }
307
308   if (session->comport->config.cleanup) {
309     session->comport->config.cleanup(session);
310   }
311
312   if (session->timeout.timer_clock) {
313     olsr_timer_stop(&session->timeout);
314   }
315
316   session->comport->config.allowed_sessions++;
317   list_remove(&session->node);
318
319   os_close(session->scheduler_entry.fd);
320   olsr_socket_remove(&session->scheduler_entry);
321
322   abuf_free(&session->in);
323   abuf_free(&session->out);
324
325   olsr_memcookie_free(session->comport->config.memcookie, session);
326 }
327
328 /**
329  * Initialized a managed TCP stream
330  * @param managed pointer to uninitialized managed stream
331  */
332 void
333 olsr_stream_add_managed(struct olsr_stream_managed *managed) {
334   memset(managed, 0, sizeof(*managed));
335   managed->config.allowed_sessions = 10;
336   managed->config.maximum_input_buffer = 65536;
337   managed->config.session_timeout = 120000;
338 }
339
340 /**
341  * Apply a configuration to a stream. Will reset both ACLs
342  * and socket ports/bindings.
343  * @param managed pointer to managed stream
344  * @param config pointer to stream config
345  * @return -1 if an error happened, 0 otherwise.
346  */
347 int
348 olsr_stream_apply_managed(struct olsr_stream_managed *managed,
349     struct olsr_stream_managed_config *config) {
350   olsr_acl_copy(&managed->acl, &config->acl);
351
352   if (config_global.ipv4) {
353     if (_apply_managed_socket(managed,
354         &managed->socket_v4, &config->bindto_v4, config->port)) {
355       return -1;
356     }
357   }
358   else {
359     olsr_stream_remove(&managed->socket_v4, true);
360   }
361
362   if (config_global.ipv6) {
363     if (_apply_managed_socket(managed,
364         &managed->socket_v6, &config->bindto_v6, config->port)) {
365       return -1;
366     }
367   }
368   else {
369     olsr_stream_remove(&managed->socket_v6, true);
370   }
371   return 0;
372 }
373
374 /**
375  * Remove a managed TCP stream
376  * @param managed pointer to managed stream
377  * @param force true if socket will be closed immediately,
378  *   false if scheduler should wait until outgoing buffers are empty
379  */
380 void
381 olsr_stream_remove_managed(struct olsr_stream_managed *managed, bool forced) {
382   olsr_stream_remove(&managed->socket_v4, forced);
383   olsr_stream_remove(&managed->socket_v6, forced);
384
385   olsr_acl_remove(&managed->acl);
386 }
387
388 /**
389  * Apply new configuration to a managed stream socket
390  * @param managed pointer to managed stream
391  * @param stream pointer to TCP stream to configure
392  * @param bindto local address to bind socket to
393  * @param port local port number
394  * @return -1 if an error happened, 0 otherwise.
395  */
396 static int
397 _apply_managed_socket(struct olsr_stream_managed *managed,
398     struct olsr_stream_socket *stream,
399     struct netaddr *bindto, uint16_t port) {
400   union netaddr_socket sock;
401 #if !defined(REMOVE_LOG_WARN)
402   struct netaddr_str buf;
403 #endif
404
405   if (netaddr_socket_init(&sock, bindto, port)) {
406     OLSR_WARN(LOG_SOCKET_STREAM, "Cannot create managed socket address: %s/%u",
407         netaddr_to_string(&buf, bindto), port);
408     return -1;
409   }
410
411   if (memcmp(&sock, &stream->local_socket, sizeof(sock)) == 0) {
412     /* nothing changed */
413     return 0;
414   }
415
416   olsr_stream_remove(stream, true);
417   if (olsr_stream_add(stream, &sock)) {
418     return -1;
419   }
420
421   /* copy configuration */
422   memcpy(&stream->config, &managed->config, sizeof(stream->config));
423   if (stream->config.memcookie == NULL) {
424     stream->config.memcookie = &connection_cookie;
425   }
426   return 0;
427 }
428
429 /**
430  * Handle incoming server socket event from socket scheduler.
431  * @param fd filedescriptor for event
432  * @param data custom user data
433  * @param event_read true if read-event is incoming
434  * @param event_write true if write-event is incoming
435  */
436 static void
437 _cb_parse_request(int fd, void *data, bool event_read,
438     bool event_write __attribute__((unused))) {
439   struct olsr_stream_socket *comport;
440   union netaddr_socket remote_socket;
441   struct netaddr remote_addr;
442   socklen_t addrlen;
443   int sock;
444 #if !defined(REMOVE_LOG_DEBUG)
445       struct netaddr_str buf1, buf2;
446 #endif
447
448   if (!event_read) {
449     return;
450   }
451
452   comport = data;
453
454   addrlen = sizeof(remote_socket);
455   sock = accept(fd, &remote_socket.std, &addrlen);
456   if (sock < 0) {
457     OLSR_WARN(LOG_SOCKET_STREAM, "accept() call returned error: %s (%d)", strerror(errno), errno);
458     return;
459   }
460
461   netaddr_from_socket(&remote_addr, &remote_socket);
462   if (comport->config.acl) {
463     if (!olsr_acl_check_accept(comport->config.acl, &remote_addr)) {
464       OLSR_DEBUG(LOG_SOCKET_STREAM, "Access from %s to socket %s blocked because of ACL",
465           netaddr_to_string(&buf1, &remote_addr),
466           netaddr_socket_to_string(&buf2, &comport->local_socket));
467       close(sock);
468       return;
469     }
470   }
471   _create_session(comport, sock, &remote_addr);
472 }
473
474 /**
475  * Configure a TCP session socket
476  * @param stream_socket pointer to stream socket
477  * @param sock pointer to socket filedescriptor
478  * @param remote_addr pointer to remote address
479  * @return pointer to new stream session, NULL if an error happened.
480  */
481 static struct olsr_stream_session *
482 _create_session(struct olsr_stream_socket *stream_socket,
483     int sock, struct netaddr *remote_addr) {
484   struct olsr_stream_session *session;
485 #if !defined REMOVE_LOG_DEBUG
486   struct netaddr_str buf;
487 #endif
488
489   /* put socket into non-blocking mode */
490   if (os_net_set_nonblocking(sock)) {
491     OLSR_WARN(LOG_SOCKET_STREAM, "Cannot read comport socket status: %s (%d)",
492         strerror(errno), errno);
493     return NULL;
494   }
495
496   session = olsr_memcookie_malloc(stream_socket->config.memcookie);
497   if (session == NULL) {
498     OLSR_WARN(LOG_SOCKET_STREAM, "Cannot allocate memory for comport session");
499     goto parse_request_error;
500   }
501
502   if (abuf_init(&session->in)) {
503     OLSR_WARN(LOG_SOCKET_STREAM, "Cannot allocate memory for comport session");
504     goto parse_request_error;
505   }
506   if (abuf_init(&session->out)) {
507     OLSR_WARN(LOG_SOCKET_STREAM, "Cannot allocate memory for comport session");
508     goto parse_request_error;
509   }
510
511   session->scheduler_entry.fd = sock;
512   session->scheduler_entry.process = _cb_parse_connection;
513   session->scheduler_entry.data = session;
514   session->scheduler_entry.event_read = true;
515   session->scheduler_entry.event_write = true;
516   olsr_socket_add(&session->scheduler_entry);
517
518   session->send_first = stream_socket->config.send_first;
519   session->comport = stream_socket;
520
521   session->remote_address = *remote_addr;
522
523   if (stream_socket->config.allowed_sessions-- > 0) {
524     /* create active session */
525     session->state = STREAM_SESSION_ACTIVE;
526   } else {
527     /* too many sessions */
528     if (stream_socket->config.create_error) {
529       stream_socket->config.create_error(session, STREAM_SERVICE_UNAVAILABLE);
530     }
531     session->state = STREAM_SESSION_SEND_AND_QUIT;
532   }
533
534   session->timeout.timer_cb_context = session;
535   session->timeout.timer_info = &connection_timeout;
536   if (stream_socket->config.session_timeout) {
537     olsr_timer_start(&session->timeout, stream_socket->config.session_timeout);
538   }
539
540   if (stream_socket->config.init) {
541     if (stream_socket->config.init(session)) {
542       goto parse_request_error;
543     }
544   }
545
546   OLSR_DEBUG(LOG_SOCKET_STREAM, "Got connection through socket %d with %s.\n",
547       sock, netaddr_to_string(&buf, remote_addr));
548
549   list_add_tail(&stream_socket->session, &session->node);
550   return session;
551
552 parse_request_error:
553   abuf_free(&session->in);
554   abuf_free(&session->out);
555   olsr_memcookie_free(stream_socket->config.memcookie, session);
556
557   return NULL;
558 }
559
560 /**
561  * Handle TCP session timeout
562  * @param data custom data
563  */
564 static void
565 _cb_timeout_handler(void *data) {
566   struct olsr_stream_session *session = data;
567   olsr_stream_close(session, false);
568 }
569
570 /**
571  * Handle events for TCP session from network scheduler
572  * @param fd filedescriptor of TCP session
573  * @param data custom data
574  * @param event_read true if read-event is incoming
575  * @param event_write true if write-event is incoming
576  */
577 static void
578 _cb_parse_connection(int fd, void *data, bool event_read, bool event_write) {
579   struct olsr_stream_session *session;
580   struct olsr_stream_socket *s_sock;
581   int len;
582   char buffer[1024];
583 #if !defined(REMOVE_LOG_WARN)
584   struct netaddr_str buf;
585 #endif
586
587   session = data;
588   s_sock = session->comport;
589
590   OLSR_DEBUG(LOG_SOCKET_STREAM, "Parsing connection of socket %d\n", fd);
591
592   /* mark session and s_sock as busy */
593   session->busy = true;
594   s_sock->busy = true;
595
596   if (session->wait_for_connect) {
597     if (event_write) {
598       int value;
599       socklen_t value_len;
600
601       value_len = sizeof(value);
602
603       if(getsockopt(fd, SOL_SOCKET, SO_ERROR, &value, &value_len)) {
604         OLSR_WARN(LOG_SOCKET_STREAM, "getsockopt failed: %s (%d)",
605             strerror(errno), errno);
606         session->state = STREAM_SESSION_CLEANUP;
607       }
608       else if (value != 0) {
609         OLSR_WARN(LOG_SOCKET_STREAM, "Connection to %s failed: %s (%d)",
610             netaddr_to_string(&buf, &session->remote_address), strerror(value), value);
611         session->state = STREAM_SESSION_CLEANUP;
612       }
613       else {
614         session->wait_for_connect = false;
615       }
616     }
617   }
618
619   if (session->wait_for_connect) {
620     session->busy = false;
621     s_sock->busy = false;
622     return;
623   }
624
625   /* read data if necessary */
626   if (session->state == STREAM_SESSION_ACTIVE && event_read) {
627     len = recv(fd, buffer, sizeof(buffer), 0);
628     if (len > 0) {
629       OLSR_DEBUG(LOG_SOCKET_STREAM, "  recv returned %d\n", len);
630       if (abuf_memcpy(&session->in, buffer, len)) {
631         /* out of memory */
632         OLSR_WARN(LOG_SOCKET_STREAM, "Out of memory for comport session input buffer");
633         session->state = STREAM_SESSION_CLEANUP;
634       } else if (abuf_getlen(&session->in) > s_sock->config.maximum_input_buffer) {
635         /* input buffer overflow */
636         if (s_sock->config.create_error) {
637           s_sock->config.create_error(session, STREAM_REQUEST_TOO_LARGE);
638         }
639         session->state = STREAM_SESSION_SEND_AND_QUIT;
640       } else {
641         /* got new input block, reset timeout */
642         olsr_stream_set_timeout(session, s_sock->config.session_timeout);
643       }
644     } else if (len < 0 && errno != EINTR && errno != EAGAIN && errno
645         != EWOULDBLOCK) {
646       /* error during read */
647       OLSR_WARN(LOG_SOCKET_STREAM, "Error while reading from communication stream with %s: %s (%d)\n",
648           netaddr_to_string(&buf, &session->remote_address), strerror(errno), errno);
649       session->state = STREAM_SESSION_CLEANUP;
650     } else if (len == 0) {
651       /* external s_sock closed */
652       session->state = STREAM_SESSION_SEND_AND_QUIT;
653     }
654   }
655
656   if (session->state == STREAM_SESSION_ACTIVE && s_sock->config.receive_data != NULL
657       && (abuf_getlen(&session->in) > 0 || session->send_first)) {
658     session->state = s_sock->config.receive_data(session);
659     session->send_first = false;
660   }
661
662   /* send data if necessary */
663   if (session->state != STREAM_SESSION_CLEANUP && abuf_getlen(&session->out) > 0) {
664     if (event_write) {
665       len = send(fd, abuf_getptr(&session->out), abuf_getlen(&session->out), 0);
666
667       if (len > 0) {
668         OLSR_DEBUG(LOG_SOCKET_STREAM, "  send returned %d\n", len);
669         abuf_pull(&session->out, len);
670         olsr_stream_set_timeout(session, s_sock->config.session_timeout);
671       } else if (len < 0 && errno != EINTR && errno != EAGAIN && errno
672           != EWOULDBLOCK) {
673         OLSR_WARN(LOG_SOCKET_STREAM, "Error while writing to communication stream with %s: %s (%d)\n",
674             netaddr_to_string(&buf, &session->remote_address), strerror(errno), errno);
675         session->state = STREAM_SESSION_CLEANUP;
676       }
677     } else {
678       OLSR_DEBUG(LOG_SOCKET_STREAM, "  activating output in scheduler\n");
679       olsr_socket_set_write(&session->scheduler_entry, true);
680     }
681   }
682
683   if (abuf_getlen(&session->out) == 0) {
684     /* nothing to send anymore */
685     OLSR_DEBUG(LOG_SOCKET_STREAM, "  deactivating output in scheduler\n");
686     olsr_socket_set_write(&session->scheduler_entry, false);
687     if (session->state == STREAM_SESSION_SEND_AND_QUIT) {
688       session->state = STREAM_SESSION_CLEANUP;
689     }
690   }
691
692   session->busy = false;
693   s_sock->busy = false;
694
695   /* end of connection ? */
696   if (session->state == STREAM_SESSION_CLEANUP || session->removed) {
697     OLSR_DEBUG(LOG_SOCKET_STREAM, "  cleanup\n");
698
699     /* clean up connection by calling cleanup directly */
700     olsr_stream_close(session, session->state == STREAM_SESSION_CLEANUP);
701   }
702
703   /* lazy socket removal */
704   if (s_sock->remove) {
705     olsr_stream_remove(s_sock, false);
706   }
707   return;
708 }