More cleanup work of the timer API
[oonf.git] / src / core / olsr_stream_socket.c
1
2 /*
3  * The olsr.org Optimized Link-State Routing daemon(olsrd)
4  * Copyright (c) 2004-2011, the olsr.org team - see HISTORY file
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * * Redistributions of source code must retain the above copyright
12  *   notice, this list of conditions and the following disclaimer.
13  * * Redistributions in binary form must reproduce the above copyright
14  *   notice, this list of conditions and the following disclaimer in
15  *   the documentation and/or other materials provided with the
16  *   distribution.
17  * * Neither the name of olsr.org, olsrd nor the names of its
18  *   contributors may be used to endorse or promote products derived
19  *   from this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  *
34  * Visit http://www.olsr.org for more information.
35  *
36  * If you find this software useful feel free to make a donation
37  * to the project. For more information see the website or contact
38  * the copyright holders.
39  *
40  */
41
42 #include <ctype.h>
43 #include <fcntl.h>
44 #include <stdio.h>
45 #include <string.h>
46 #include <stdlib.h>
47 #include <unistd.h>
48 #include <errno.h>
49
50 #include "common/autobuf.h"
51 #include "common/avl.h"
52 #include "common/list.h"
53 #include "olsr_cfg.h"
54 #include "olsr_logging.h"
55 #include "olsr_memcookie.h"
56 #include "olsr_socket.h"
57 #include "olsr_timer.h"
58 #include "os_net.h"
59 #include "olsr.h"
60 #include "olsr_stream_socket.h"
61
62 static int _apply_managed_socket(struct olsr_stream_managed *managed,
63     struct olsr_stream_socket *stream, struct netaddr *bindto, uint16_t port);
64 static void _cb_parse_request(int fd, void *data, bool, bool);
65 static struct olsr_stream_session *_create_session(
66     struct olsr_stream_socket *stream_socket, int sock, struct netaddr *remote_addr);
67 static void _cb_parse_connection(int fd, void *data, bool r,bool w);
68
69 static void _cb_timeout_handler(void *);
70
71 /* list of olsr stream sockets */
72 struct list_entity olsr_stream_head;
73
74 /* server socket */
75 static struct olsr_memcookie_info connection_cookie = {
76   .name = "stream socket connection",
77   .size = sizeof(struct olsr_stream_session)
78 };
79
80 static struct olsr_timer_info connection_timeout = {
81   .name = "stream socket timout",
82   .callback = _cb_timeout_handler,
83 };
84
85 /* remember if initialized or not */
86 OLSR_SUBSYSTEM_STATE(_stream_state);
87
88 /**
89  * Initialize the stream socket handlers
90  */
91 void
92 olsr_stream_init(void) {
93   if (olsr_subsystem_init(&_stream_state))
94     return;
95
96   olsr_memcookie_add(&connection_cookie);
97   olsr_timer_add(&connection_timeout);
98   list_init_head(&olsr_stream_head);
99 }
100
101 /**
102  * Cleanup all resources allocated be stream socket handlers
103  */
104 void
105 olsr_stream_cleanup(void) {
106   struct olsr_stream_socket *comport;
107
108   if (olsr_subsystem_cleanup(&_stream_state))
109     return;
110
111   while (!list_is_empty(&olsr_stream_head)) {
112     comport = list_first_element(&olsr_stream_head, comport, node);
113
114     olsr_stream_remove(comport, true);
115   }
116
117   olsr_memcookie_remove(&connection_cookie);
118   olsr_timer_remove(&connection_timeout);
119 }
120
121 /**
122  * Flush all data in outgoing buffer of a stream socket
123  * @param con pointer to stream socket
124  */
125 void
126 olsr_stream_flush(struct olsr_stream_session *con) {
127   olsr_socket_set_write(&con->scheduler_entry, true);
128 }
129
130 /**
131  * Add a new stream socket to the scheduler
132  * @param stream_socket pointer to uninitialized stream socket struct
133  * @param local pointer to local ip/port of socket, port must be 0 if
134  *   this shall be an outgoing socket
135  * @return -1 if an error happened, 0 otherwise
136  */
137 int
138 olsr_stream_add(struct olsr_stream_socket *stream_socket,
139     union netaddr_socket *local) {
140   int s = -1;
141 #if !defined(REMOVE_LOG_WARN)
142   struct netaddr_str buf;
143 #endif
144
145   memset(stream_socket, 0, sizeof(*stream_socket));
146
147   /* server socket not necessary for outgoing connections */
148   if (netaddr_socket_get_port(local) != 0) {
149     /* Init socket */
150     s = os_net_getsocket(local, OS_SOCKET_TCP, 0, LOG_SOCKET_STREAM);
151     if (s < 0) {
152       goto add_stream_error;
153     }
154
155     /* show that we are willing to listen */
156     if (listen(s, 1) == -1) {
157       OLSR_WARN(LOG_SOCKET_STREAM, "tcp socket listen failed for %s: %s (%d)\n",
158           netaddr_socket_to_string(&buf, local), strerror(errno), errno);
159       goto add_stream_error;
160     }
161
162     stream_socket->scheduler_entry.fd = s;
163     stream_socket->scheduler_entry.process = _cb_parse_request;
164     stream_socket->scheduler_entry.data = stream_socket;
165     stream_socket->scheduler_entry.event_read = true;
166
167     olsr_socket_add(&stream_socket->scheduler_entry);
168   }
169   memcpy(&stream_socket->local_socket, local, sizeof(stream_socket->local_socket));
170
171   if (stream_socket->config.memcookie == NULL) {
172     stream_socket->config.memcookie = &connection_cookie;
173   }
174   if (stream_socket->config.allowed_sessions == 0) {
175     stream_socket->config.allowed_sessions = 10;
176   }
177   if (stream_socket->config.maximum_input_buffer == 0) {
178     stream_socket->config.maximum_input_buffer = 65536;
179   }
180
181   list_init_head(&stream_socket->session);
182   list_add_tail(&olsr_stream_head, &stream_socket->node);
183
184   return 0;
185
186 add_stream_error:
187   if (stream_socket->scheduler_entry.fd) {
188     olsr_socket_remove(&stream_socket->scheduler_entry);
189   }
190   if (s != -1) {
191     os_close(s);
192   }
193   return -1;
194 }
195
196 /**
197  * Remove a stream socket from the scheduler
198  * @param stream_socket pointer to socket
199  * @param force true if socket will be closed immediately,
200  *   false if scheduler should wait until outgoing buffers are empty
201  */
202 void
203 olsr_stream_remove(struct olsr_stream_socket *stream_socket, bool force) {
204   struct olsr_stream_session *session, *ptr;
205
206   if (stream_socket->busy && !force) {
207     stream_socket->remove = true;
208     return;
209   }
210
211   if (!list_is_node_added(&stream_socket->node)) {
212     return;
213   }
214
215   list_for_each_element_safe(&stream_socket->session, session, node, ptr) {
216     if (force || (abuf_getlen(&session->out) == 0 && !session->busy)) {
217       /* close everything that doesn't need to send data anymore */
218       olsr_stream_close(session, force);
219     }
220   }
221
222   if (!list_is_empty(&stream_socket->session)) {
223     return;
224   }
225
226   list_remove(&stream_socket->node);
227
228   if (stream_socket->scheduler_entry.fd) {
229     /* only for server sockets */
230     os_close(stream_socket->scheduler_entry.fd);
231     olsr_socket_remove(&stream_socket->scheduler_entry);
232   }
233 }
234
235 /**
236  * Create an outgoing stream socket.
237  * @param stream socket pointer to stream socket
238  * @param remote pointer to address of remote TCP server
239  * @return pointer to stream session, NULL if an error happened.
240  */
241 struct olsr_stream_session *
242 olsr_stream_connect_to(struct olsr_stream_socket *stream_socket,
243     union netaddr_socket *remote) {
244   struct olsr_stream_session *session;
245   struct netaddr remote_addr;
246   bool wait_for_connect = false;
247   int s;
248 #if !defined REMOVE_LOG_WARN
249   struct netaddr_str buf;
250 #endif
251
252   s = os_net_getsocket(&stream_socket->local_socket,
253       OS_SOCKET_TCP, 0, LOG_SOCKET_STREAM);
254   if (s < 0) {
255     return NULL;
256   }
257
258   if (connect(s, &remote->std, sizeof(*remote))) {
259     if (errno != EINPROGRESS) {
260       OLSR_WARN(LOG_SOCKET_STREAM, "Cannot connect outgoing tcp connection to %s: %s (%d)",
261           netaddr_socket_to_string(&buf, remote), strerror(errno), errno);
262       goto connect_to_error;
263     }
264     wait_for_connect = true;
265   }
266
267   netaddr_from_socket(&remote_addr, remote);
268   session = _create_session(stream_socket, s, &remote_addr);
269   if (session) {
270     session->wait_for_connect = wait_for_connect;
271     return session;
272   }
273
274   /* fall through */
275 connect_to_error:
276   if (s) {
277     os_close(s);
278   }
279   return NULL;
280 }
281
282 /**
283  * Reset the session timeout of a TCP session
284  * @param con pointer to stream session
285  * @param timeout timeout in milliseconds
286  */
287 void
288 olsr_stream_set_timeout(struct olsr_stream_session *con, uint64_t timeout) {
289   olsr_timer_set(&con->timeout, timeout);
290 }
291
292 /**
293  * Close a TCP stream session
294  * @param session pointer to stream session
295  */
296 void
297 olsr_stream_close(struct olsr_stream_session *session, bool force) {
298   if (session->busy && !force) {
299     /* remove the session later */
300     session->removed = true;
301     return;
302   }
303
304   if (!list_is_node_added(&session->node)) {
305     return;
306   }
307
308   if (session->comport->config.cleanup) {
309     session->comport->config.cleanup(session);
310   }
311
312   olsr_timer_stop(&session->timeout);
313
314   session->comport->config.allowed_sessions++;
315   list_remove(&session->node);
316
317   os_close(session->scheduler_entry.fd);
318   olsr_socket_remove(&session->scheduler_entry);
319
320   abuf_free(&session->in);
321   abuf_free(&session->out);
322
323   olsr_memcookie_free(session->comport->config.memcookie, session);
324 }
325
326 /**
327  * Initialized a managed TCP stream
328  * @param managed pointer to uninitialized managed stream
329  */
330 void
331 olsr_stream_add_managed(struct olsr_stream_managed *managed) {
332   memset(managed, 0, sizeof(*managed));
333   managed->config.allowed_sessions = 10;
334   managed->config.maximum_input_buffer = 65536;
335   managed->config.session_timeout = 120000;
336 }
337
338 /**
339  * Apply a configuration to a stream. Will reset both ACLs
340  * and socket ports/bindings.
341  * @param managed pointer to managed stream
342  * @param config pointer to stream config
343  * @return -1 if an error happened, 0 otherwise.
344  */
345 int
346 olsr_stream_apply_managed(struct olsr_stream_managed *managed,
347     struct olsr_stream_managed_config *config) {
348   olsr_acl_copy(&managed->acl, &config->acl);
349
350   if (config_global.ipv4) {
351     if (_apply_managed_socket(managed,
352         &managed->socket_v4, &config->bindto_v4, config->port)) {
353       return -1;
354     }
355   }
356   else {
357     olsr_stream_remove(&managed->socket_v4, true);
358   }
359
360   if (config_global.ipv6) {
361     if (_apply_managed_socket(managed,
362         &managed->socket_v6, &config->bindto_v6, config->port)) {
363       return -1;
364     }
365   }
366   else {
367     olsr_stream_remove(&managed->socket_v6, true);
368   }
369   return 0;
370 }
371
372 /**
373  * Remove a managed TCP stream
374  * @param managed pointer to managed stream
375  * @param force true if socket will be closed immediately,
376  *   false if scheduler should wait until outgoing buffers are empty
377  */
378 void
379 olsr_stream_remove_managed(struct olsr_stream_managed *managed, bool forced) {
380   olsr_stream_remove(&managed->socket_v4, forced);
381   olsr_stream_remove(&managed->socket_v6, forced);
382
383   olsr_acl_remove(&managed->acl);
384 }
385
386 /**
387  * Apply new configuration to a managed stream socket
388  * @param managed pointer to managed stream
389  * @param stream pointer to TCP stream to configure
390  * @param bindto local address to bind socket to
391  * @param port local port number
392  * @return -1 if an error happened, 0 otherwise.
393  */
394 static int
395 _apply_managed_socket(struct olsr_stream_managed *managed,
396     struct olsr_stream_socket *stream,
397     struct netaddr *bindto, uint16_t port) {
398   union netaddr_socket sock;
399 #if !defined(REMOVE_LOG_WARN)
400   struct netaddr_str buf;
401 #endif
402
403   if (netaddr_socket_init(&sock, bindto, port)) {
404     OLSR_WARN(LOG_SOCKET_STREAM, "Cannot create managed socket address: %s/%u",
405         netaddr_to_string(&buf, bindto), port);
406     return -1;
407   }
408
409   if (memcmp(&sock, &stream->local_socket, sizeof(sock)) == 0) {
410     /* nothing changed */
411     return 0;
412   }
413
414   olsr_stream_remove(stream, true);
415   if (olsr_stream_add(stream, &sock)) {
416     return -1;
417   }
418
419   /* copy configuration */
420   memcpy(&stream->config, &managed->config, sizeof(stream->config));
421   if (stream->config.memcookie == NULL) {
422     stream->config.memcookie = &connection_cookie;
423   }
424   return 0;
425 }
426
427 /**
428  * Handle incoming server socket event from socket scheduler.
429  * @param fd filedescriptor for event
430  * @param data custom user data
431  * @param event_read true if read-event is incoming
432  * @param event_write true if write-event is incoming
433  */
434 static void
435 _cb_parse_request(int fd, void *data, bool event_read,
436     bool event_write __attribute__((unused))) {
437   struct olsr_stream_socket *comport;
438   union netaddr_socket remote_socket;
439   struct netaddr remote_addr;
440   socklen_t addrlen;
441   int sock;
442 #if !defined(REMOVE_LOG_DEBUG)
443       struct netaddr_str buf1, buf2;
444 #endif
445
446   if (!event_read) {
447     return;
448   }
449
450   comport = data;
451
452   addrlen = sizeof(remote_socket);
453   sock = accept(fd, &remote_socket.std, &addrlen);
454   if (sock < 0) {
455     OLSR_WARN(LOG_SOCKET_STREAM, "accept() call returned error: %s (%d)", strerror(errno), errno);
456     return;
457   }
458
459   netaddr_from_socket(&remote_addr, &remote_socket);
460   if (comport->config.acl) {
461     if (!olsr_acl_check_accept(comport->config.acl, &remote_addr)) {
462       OLSR_DEBUG(LOG_SOCKET_STREAM, "Access from %s to socket %s blocked because of ACL",
463           netaddr_to_string(&buf1, &remote_addr),
464           netaddr_socket_to_string(&buf2, &comport->local_socket));
465       close(sock);
466       return;
467     }
468   }
469   _create_session(comport, sock, &remote_addr);
470 }
471
472 /**
473  * Configure a TCP session socket
474  * @param stream_socket pointer to stream socket
475  * @param sock pointer to socket filedescriptor
476  * @param remote_addr pointer to remote address
477  * @return pointer to new stream session, NULL if an error happened.
478  */
479 static struct olsr_stream_session *
480 _create_session(struct olsr_stream_socket *stream_socket,
481     int sock, struct netaddr *remote_addr) {
482   struct olsr_stream_session *session;
483 #if !defined REMOVE_LOG_DEBUG
484   struct netaddr_str buf;
485 #endif
486
487   /* put socket into non-blocking mode */
488   if (os_net_set_nonblocking(sock)) {
489     OLSR_WARN(LOG_SOCKET_STREAM, "Cannot read comport socket status: %s (%d)",
490         strerror(errno), errno);
491     return NULL;
492   }
493
494   session = olsr_memcookie_malloc(stream_socket->config.memcookie);
495   if (session == NULL) {
496     OLSR_WARN(LOG_SOCKET_STREAM, "Cannot allocate memory for comport session");
497     goto parse_request_error;
498   }
499
500   if (abuf_init(&session->in)) {
501     OLSR_WARN(LOG_SOCKET_STREAM, "Cannot allocate memory for comport session");
502     goto parse_request_error;
503   }
504   if (abuf_init(&session->out)) {
505     OLSR_WARN(LOG_SOCKET_STREAM, "Cannot allocate memory for comport session");
506     goto parse_request_error;
507   }
508
509   session->scheduler_entry.fd = sock;
510   session->scheduler_entry.process = _cb_parse_connection;
511   session->scheduler_entry.data = session;
512   session->scheduler_entry.event_read = true;
513   session->scheduler_entry.event_write = true;
514   olsr_socket_add(&session->scheduler_entry);
515
516   session->send_first = stream_socket->config.send_first;
517   session->comport = stream_socket;
518
519   session->remote_address = *remote_addr;
520
521   if (stream_socket->config.allowed_sessions-- > 0) {
522     /* create active session */
523     session->state = STREAM_SESSION_ACTIVE;
524   } else {
525     /* too many sessions */
526     if (stream_socket->config.create_error) {
527       stream_socket->config.create_error(session, STREAM_SERVICE_UNAVAILABLE);
528     }
529     session->state = STREAM_SESSION_SEND_AND_QUIT;
530   }
531
532   session->timeout.cb_context = session;
533   session->timeout.info = &connection_timeout;
534   if (stream_socket->config.session_timeout) {
535     olsr_timer_start(&session->timeout, stream_socket->config.session_timeout);
536   }
537
538   if (stream_socket->config.init) {
539     if (stream_socket->config.init(session)) {
540       goto parse_request_error;
541     }
542   }
543
544   OLSR_DEBUG(LOG_SOCKET_STREAM, "Got connection through socket %d with %s.\n",
545       sock, netaddr_to_string(&buf, remote_addr));
546
547   list_add_tail(&stream_socket->session, &session->node);
548   return session;
549
550 parse_request_error:
551   abuf_free(&session->in);
552   abuf_free(&session->out);
553   olsr_memcookie_free(stream_socket->config.memcookie, session);
554
555   return NULL;
556 }
557
558 /**
559  * Handle TCP session timeout
560  * @param data custom data
561  */
562 static void
563 _cb_timeout_handler(void *data) {
564   struct olsr_stream_session *session = data;
565   olsr_stream_close(session, false);
566 }
567
568 /**
569  * Handle events for TCP session from network scheduler
570  * @param fd filedescriptor of TCP session
571  * @param data custom data
572  * @param event_read true if read-event is incoming
573  * @param event_write true if write-event is incoming
574  */
575 static void
576 _cb_parse_connection(int fd, void *data, bool event_read, bool event_write) {
577   struct olsr_stream_session *session;
578   struct olsr_stream_socket *s_sock;
579   int len;
580   char buffer[1024];
581 #if !defined(REMOVE_LOG_WARN)
582   struct netaddr_str buf;
583 #endif
584
585   session = data;
586   s_sock = session->comport;
587
588   OLSR_DEBUG(LOG_SOCKET_STREAM, "Parsing connection of socket %d\n", fd);
589
590   /* mark session and s_sock as busy */
591   session->busy = true;
592   s_sock->busy = true;
593
594   if (session->wait_for_connect) {
595     if (event_write) {
596       int value;
597       socklen_t value_len;
598
599       value_len = sizeof(value);
600
601       if(getsockopt(fd, SOL_SOCKET, SO_ERROR, &value, &value_len)) {
602         OLSR_WARN(LOG_SOCKET_STREAM, "getsockopt failed: %s (%d)",
603             strerror(errno), errno);
604         session->state = STREAM_SESSION_CLEANUP;
605       }
606       else if (value != 0) {
607         OLSR_WARN(LOG_SOCKET_STREAM, "Connection to %s failed: %s (%d)",
608             netaddr_to_string(&buf, &session->remote_address), strerror(value), value);
609         session->state = STREAM_SESSION_CLEANUP;
610       }
611       else {
612         session->wait_for_connect = false;
613       }
614     }
615   }
616
617   if (session->wait_for_connect) {
618     session->busy = false;
619     s_sock->busy = false;
620     return;
621   }
622
623   /* read data if necessary */
624   if (session->state == STREAM_SESSION_ACTIVE && event_read) {
625     len = recv(fd, buffer, sizeof(buffer), 0);
626     if (len > 0) {
627       OLSR_DEBUG(LOG_SOCKET_STREAM, "  recv returned %d\n", len);
628       if (abuf_memcpy(&session->in, buffer, len)) {
629         /* out of memory */
630         OLSR_WARN(LOG_SOCKET_STREAM, "Out of memory for comport session input buffer");
631         session->state = STREAM_SESSION_CLEANUP;
632       } else if (abuf_getlen(&session->in) > s_sock->config.maximum_input_buffer) {
633         /* input buffer overflow */
634         if (s_sock->config.create_error) {
635           s_sock->config.create_error(session, STREAM_REQUEST_TOO_LARGE);
636         }
637         session->state = STREAM_SESSION_SEND_AND_QUIT;
638       } else {
639         /* got new input block, reset timeout */
640         olsr_stream_set_timeout(session, s_sock->config.session_timeout);
641       }
642     } else if (len < 0 && errno != EINTR && errno != EAGAIN && errno
643         != EWOULDBLOCK) {
644       /* error during read */
645       OLSR_WARN(LOG_SOCKET_STREAM, "Error while reading from communication stream with %s: %s (%d)\n",
646           netaddr_to_string(&buf, &session->remote_address), strerror(errno), errno);
647       session->state = STREAM_SESSION_CLEANUP;
648     } else if (len == 0) {
649       /* external s_sock closed */
650       session->state = STREAM_SESSION_SEND_AND_QUIT;
651     }
652   }
653
654   if (session->state == STREAM_SESSION_ACTIVE && s_sock->config.receive_data != NULL
655       && (abuf_getlen(&session->in) > 0 || session->send_first)) {
656     session->state = s_sock->config.receive_data(session);
657     session->send_first = false;
658   }
659
660   /* send data if necessary */
661   if (session->state != STREAM_SESSION_CLEANUP && abuf_getlen(&session->out) > 0) {
662     if (event_write) {
663       len = send(fd, abuf_getptr(&session->out), abuf_getlen(&session->out), 0);
664
665       if (len > 0) {
666         OLSR_DEBUG(LOG_SOCKET_STREAM, "  send returned %d\n", len);
667         abuf_pull(&session->out, len);
668         olsr_stream_set_timeout(session, s_sock->config.session_timeout);
669       } else if (len < 0 && errno != EINTR && errno != EAGAIN && errno
670           != EWOULDBLOCK) {
671         OLSR_WARN(LOG_SOCKET_STREAM, "Error while writing to communication stream with %s: %s (%d)\n",
672             netaddr_to_string(&buf, &session->remote_address), strerror(errno), errno);
673         session->state = STREAM_SESSION_CLEANUP;
674       }
675     } else {
676       OLSR_DEBUG(LOG_SOCKET_STREAM, "  activating output in scheduler\n");
677       olsr_socket_set_write(&session->scheduler_entry, true);
678     }
679   }
680
681   if (abuf_getlen(&session->out) == 0) {
682     /* nothing to send anymore */
683     OLSR_DEBUG(LOG_SOCKET_STREAM, "  deactivating output in scheduler\n");
684     olsr_socket_set_write(&session->scheduler_entry, false);
685     if (session->state == STREAM_SESSION_SEND_AND_QUIT) {
686       session->state = STREAM_SESSION_CLEANUP;
687     }
688   }
689
690   session->busy = false;
691   s_sock->busy = false;
692
693   /* end of connection ? */
694   if (session->state == STREAM_SESSION_CLEANUP || session->removed) {
695     OLSR_DEBUG(LOG_SOCKET_STREAM, "  cleanup\n");
696
697     /* clean up connection by calling cleanup directly */
698     olsr_stream_close(session, session->state == STREAM_SESSION_CLEANUP);
699   }
700
701   /* lazy socket removal */
702   if (s_sock->remove) {
703     olsr_stream_remove(s_sock, false);
704   }
705   return;
706 }