fc45eda56eb9544056dfc21a2de02770a5893e09
[oonf.git] / src / subsystems / oonf_stream_socket.c
1
2 /*
3  * The olsr.org Optimized Link-State Routing daemon version 2 (olsrd2)
4  * Copyright (c) 2004-2015, the olsr.org team - see HISTORY file
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * * Redistributions of source code must retain the above copyright
12  *   notice, this list of conditions and the following disclaimer.
13  * * Redistributions in binary form must reproduce the above copyright
14  *   notice, this list of conditions and the following disclaimer in
15  *   the documentation and/or other materials provided with the
16  *   distribution.
17  * * Neither the name of olsr.org, olsrd nor the names of its
18  *   contributors may be used to endorse or promote products derived
19  *   from this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  *
34  * Visit http://www.olsr.org for more information.
35  *
36  * If you find this software useful feel free to make a donation
37  * to the project. For more information see the website or contact
38  * the copyright holders.
39  *
40  */
41
42 /**
43  * @file
44  */
45
46 #include <errno.h>
47 #include <stdlib.h>
48 #include <string.h>
49
50 #include <oonf/libcommon/autobuf.h>
51 #include <oonf/libcommon/avl.h>
52 #include <oonf/libcommon/list.h>
53 #include <oonf/libcore/oonf_logging.h>
54 #include <oonf/libcore/oonf_subsystem.h>
55 #include <oonf/subsystems/oonf_class.h>
56 #include <oonf/subsystems/oonf_stream_socket.h>
57 #include <oonf/subsystems/oonf_timer.h>
58 #include <oonf/subsystems/os_fd.h>
59 #include <oonf/subsystems/os_interface.h>
60 #include <oonf/subsystems/os_system.h>
61
62 /* Definitions */
63 #define LOG_STREAM _oonf_stream_socket_subsystem.logging
64
65 /* prototypes */
66 static int _init(void);
67 static void _cleanup(void);
68
69 static void _stream_close(struct oonf_stream_session *session);
70 int _apply_managed(struct oonf_stream_managed *managed);
71 static int _apply_managed_socket(
72   int af_type, struct oonf_stream_managed *managed, struct oonf_stream_socket *stream, struct os_interface *os_if);
73 static void _cb_parse_request(struct oonf_socket_entry *);
74 static struct oonf_stream_session *_create_session(struct oonf_stream_socket *stream_socket, struct os_fd *sock,
75   const struct netaddr *remote_addr, const union netaddr_socket *remote_socket);
76 static void _cb_parse_connection(struct oonf_socket_entry *entry);
77
78 static void _cb_timeout_handler(struct oonf_timer_instance *);
79 static int _cb_interface_listener(struct os_interface_listener *listener);
80
81 /* list of olsr stream sockets */
82 static struct list_entity _stream_head;
83
84 /* server socket */
85 static struct oonf_class _connection_cookie = { .name = "stream socket connection",
86   .size = sizeof(struct oonf_stream_session) };
87
88 static struct oonf_timer_class _connection_timeout = {
89   .name = "stream socket timout",
90   .callback = _cb_timeout_handler,
91 };
92
93 /* subsystem definition */
94 static const char *_dependencies[] = {
95   OONF_CLASS_SUBSYSTEM,
96   OONF_SOCKET_SUBSYSTEM,
97   OONF_TIMER_SUBSYSTEM,
98   OONF_OS_FD_SUBSYSTEM,
99   OONF_OS_INTERFACE_SUBSYSTEM,
100   OONF_OS_SYSTEM_SUBSYSTEM,
101 };
102
103 static struct oonf_subsystem _oonf_stream_socket_subsystem = {
104   .name = OONF_STREAM_SUBSYSTEM,
105   .dependencies = _dependencies,
106   .dependencies_count = ARRAYSIZE(_dependencies),
107   .init = _init,
108   .cleanup = _cleanup,
109 };
110 DECLARE_OONF_PLUGIN(_oonf_stream_socket_subsystem);
111
112 /**
113  * Initialize the stream socket handlers
114  * @return always returns 0
115  */
116 static int
117 _init(void) {
118   oonf_class_add(&_connection_cookie);
119   oonf_timer_add(&_connection_timeout);
120   list_init_head(&_stream_head);
121   return 0;
122 }
123
124 /**
125  * Cleanup all resources allocated be stream socket handlers
126  */
127 static void
128 _cleanup(void) {
129   struct oonf_stream_socket *comport;
130
131   while (!list_is_empty(&_stream_head)) {
132     comport = list_first_element(&_stream_head, comport, _node);
133
134     oonf_stream_remove(comport, true);
135   }
136
137   oonf_class_remove(&_connection_cookie);
138   oonf_timer_remove(&_connection_timeout);
139 }
140
141 /**
142  * Flush all data in outgoing buffer of a stream socket
143  * @param con pointer to stream socket
144  */
145 void
146 oonf_stream_flush(struct oonf_stream_session *con) {
147   oonf_socket_set_write(&con->scheduler_entry, true);
148 }
149
150 /**
151  * Add a new stream socket to the scheduler
152  * @param stream_socket pointer to stream socket struct with
153  *   initialized config
154  * @param local pointer to local ip/port of socket, port must be 0 if
155  *   this shall be an outgoing socket
156  * @return -1 if an error happened, 0 otherwise
157  */
158 int
159 oonf_stream_add(struct oonf_stream_socket *stream_socket, const union netaddr_socket *local) {
160   struct netaddr_str buf;
161
162   /* server socket not necessary for outgoing connections */
163   if (netaddr_socket_get_port(local) != 0) {
164     /* Init socket */
165     if (os_fd_getsocket(&stream_socket->scheduler_entry.fd, local, true, 0, NULL, LOG_STREAM)) {
166       goto add_stream_error;
167     }
168
169     /* show that we are willing to listen */
170     if (os_fd_listen(&stream_socket->scheduler_entry.fd, 1) == -1) {
171       OONF_WARN(LOG_STREAM, "tcp socket listen failed for %s: %s (%d)\n", netaddr_socket_to_string(&buf, local),
172         strerror(errno), errno);
173       goto add_stream_error;
174     }
175     stream_socket->scheduler_entry.name = stream_socket->socket_name;
176     stream_socket->scheduler_entry.process = _cb_parse_request;
177
178     snprintf(stream_socket->socket_name, sizeof(stream_socket->socket_name), "tcp-server: %s",
179       netaddr_socket_to_string(&buf, local));
180     oonf_socket_add(&stream_socket->scheduler_entry);
181     oonf_socket_set_read(&stream_socket->scheduler_entry, true);
182   }
183   memcpy(&stream_socket->local_socket, local, sizeof(stream_socket->local_socket));
184
185   if (stream_socket->config.memcookie == NULL) {
186     stream_socket->config.memcookie = &_connection_cookie;
187   }
188   if (stream_socket->config.allowed_sessions == 0) {
189     stream_socket->config.allowed_sessions = 10;
190   }
191   if (stream_socket->config.maximum_input_buffer == 0) {
192     stream_socket->config.maximum_input_buffer = 65536;
193   }
194
195   list_init_head(&stream_socket->session);
196   list_add_tail(&_stream_head, &stream_socket->_node);
197
198   return 0;
199
200 add_stream_error:
201   oonf_socket_remove(&stream_socket->scheduler_entry);
202   os_fd_close(&stream_socket->scheduler_entry.fd);
203   return -1;
204 }
205
206 /**
207  * Remove a stream socket from the scheduler
208  * @param stream_socket pointer to socket
209  * @param force true if socket will be closed immediately,
210  *   false if scheduler should wait until outgoing buffers are empty
211  */
212 void
213 oonf_stream_remove(struct oonf_stream_socket *stream_socket, bool force) {
214   if (stream_socket->busy && !force) {
215     stream_socket->remove = true;
216     return;
217   }
218
219   if (!list_is_node_added(&stream_socket->_node)) {
220     return;
221   }
222
223   oonf_stream_close_all_sessions(stream_socket);
224   list_remove(&stream_socket->_node);
225
226   oonf_socket_remove(&stream_socket->scheduler_entry);
227   os_fd_close(&stream_socket->scheduler_entry.fd);
228
229   if (stream_socket->config.cleanup_socket) {
230     stream_socket->config.cleanup_socket(stream_socket);
231   }
232 }
233
234 /**
235  * Closes all client connections of a stream socket, does not close the local
236  * socket itself.
237  * @param stream_socket stream socket
238  */
239 void
240 oonf_stream_close_all_sessions(struct oonf_stream_socket *stream_socket) {
241   struct oonf_stream_session *session, *ptr;
242
243   if (!list_is_node_added(&stream_socket->_node)) {
244     return;
245   }
246
247   list_for_each_element_safe(&stream_socket->session, session, node, ptr) {
248     if (abuf_getlen(&session->out) == 0 && !session->busy) {
249       /* close everything that doesn't need to send data anymore */
250       oonf_stream_close(session);
251     }
252   }
253   return;
254 }
255
256 /**
257  * Create an outgoing stream socket.
258  * @param stream_socket pointer to stream socket
259  * @param remote pointer to address of remote TCP server
260  * @return pointer to stream session, NULL if an error happened.
261  */
262 struct oonf_stream_session *
263 oonf_stream_connect_to(struct oonf_stream_socket *stream_socket, const union netaddr_socket *remote) {
264   struct oonf_stream_session *session;
265   struct os_fd sock;
266   struct netaddr remote_addr;
267   bool wait_for_connect = false;
268   struct netaddr_str nbuf1;
269 #ifdef OONF_LOG_DEBUG_INFO
270   struct netaddr_str nbuf2;
271 #endif
272
273   OONF_DEBUG(LOG_STREAM, "Connect TCP socket from %s to %s",
274     netaddr_socket_to_string(&nbuf1, &stream_socket->local_socket), netaddr_socket_to_string(&nbuf2, remote));
275
276   if (os_fd_getsocket(&sock, &stream_socket->local_socket, true, 0, NULL, LOG_STREAM)) {
277     return NULL;
278   }
279
280   if (os_fd_connect(&sock, remote)) {
281     if (errno == ECONNREFUSED) {
282       /* Don't produce a warning for an failed outgoing TCP connection */
283       OONF_INFO(LOG_STREAM, "TCP connection to %s refused: %s (%d)",
284         netaddr_socket_to_string(&nbuf1, remote), strerror(errno), errno);
285       goto connect_to_error;
286     }
287     else if (errno != EINPROGRESS) {
288       OONF_WARN(LOG_STREAM, "Cannot connect outgoing tcp connection to %s: %s (%d)",
289         netaddr_socket_to_string(&nbuf1, remote), strerror(errno), errno);
290       goto connect_to_error;
291     }
292     wait_for_connect = true;
293   }
294
295   netaddr_from_socket(&remote_addr, remote);
296   session = _create_session(stream_socket, &sock, &remote_addr, remote);
297   if (session) {
298     session->wait_for_connect = wait_for_connect;
299     return session;
300   }
301
302   /* fall through */
303 connect_to_error:
304   os_fd_close(&stream_socket->scheduler_entry.fd);
305   return NULL;
306 }
307
308 /**
309  * Reset the session timeout of a TCP session
310  * @param con pointer to stream session
311  * @param timeout timeout in milliseconds
312  */
313 void
314 oonf_stream_set_timeout(struct oonf_stream_session *con, uint64_t timeout) {
315   oonf_timer_set(&con->timeout, timeout);
316 }
317
318 /**
319  * Close a TCP stream session
320  * @param session pointer to stream session
321  */
322 void
323 oonf_stream_close(struct oonf_stream_session *session) {
324   if (session->busy) {
325     /* remove the session later */
326     session->removed = true;
327     return;
328   }
329   _stream_close(session);
330 }
331
332 /**
333  * Initialized a managed TCP stream
334  * @param managed pointer to initialized managed stream
335  */
336 void
337 oonf_stream_add_managed(struct oonf_stream_managed *managed) {
338   if (managed->config.allowed_sessions == 0) {
339     managed->config.allowed_sessions = 10;
340   }
341   if (managed->config.maximum_input_buffer == 0) {
342     managed->config.maximum_input_buffer = 65536;
343   }
344   if (managed->config.session_timeout == 0) {
345     managed->config.session_timeout = 120000;
346   }
347
348   managed->_if_listener.if_changed = _cb_interface_listener;
349   managed->_if_listener.name = managed->_managed_config.interface;
350 }
351
352 /**
353  * Apply a configuration to a stream. Will reset both ACLs
354  * and socket ports/bindings.
355  * @param managed pointer to managed stream
356  * @param config pointer to stream config
357  * @return -1 if an error happened, 0 otherwise.
358  */
359 int
360 oonf_stream_apply_managed(struct oonf_stream_managed *managed, struct oonf_stream_managed_config *config) {
361   bool if_changed;
362   int result;
363
364   if_changed = strcmp(config->interface, managed->_managed_config.interface) != 0 ||
365                !list_is_node_added(&managed->_if_listener._node);
366
367   oonf_stream_copy_managed_config(&managed->_managed_config, config);
368
369   if (managed->config.memcookie == NULL) {
370     managed->config.memcookie = &_connection_cookie;
371   }
372
373   /* set back pointers */
374   managed->socket_v4.managed = managed;
375   managed->socket_v6.managed = managed;
376
377   /* handle change in interface listener */
378   if (if_changed) {
379     /* interface changed, remove old listener if necessary */
380     os_interface_remove(&managed->_if_listener);
381
382     /* create new interface listener */
383     os_interface_add(&managed->_if_listener);
384   }
385
386   OONF_DEBUG(LOG_STREAM, "Apply changes for managed socket (if %s) with port %d",
387     config->interface == NULL || config->interface[0] == 0 ? "any" : config->interface, config->port);
388
389   result = _apply_managed(managed);
390   if (result) {
391     /* did not work, trigger interface handler to try later again */
392     os_interface_trigger_handler(&managed->_if_listener);
393   }
394   return result;
395 }
396
397 /**
398  * Remove a managed TCP stream
399  * @param managed pointer to managed stream
400  * @param force true if socket will be closed immediately,
401  *   false if scheduler should wait until outgoing buffers are empty
402  */
403 void
404 oonf_stream_remove_managed(struct oonf_stream_managed *managed, bool force) {
405   os_interface_remove(&managed->_if_listener);
406
407   oonf_stream_remove(&managed->socket_v4, force);
408   oonf_stream_remove(&managed->socket_v6, force);
409   os_interface_remove(&managed->_if_listener);
410   oonf_stream_free_managed_config(&managed->_managed_config);
411 }
412
413 /**
414  * Closes all connections of a managed socket, but not the socket itself
415  * @param managed managed stream socket
416  */
417 void
418 oonf_stream_close_all_managed_sessions(struct oonf_stream_managed *managed) {
419   oonf_stream_close_all_sessions(&managed->socket_v4);
420   oonf_stream_close_all_sessions(&managed->socket_v6);
421 }
422
423 /**
424  * Free dynamically allocated parts of managed stream configuration
425  * @param config packet configuration
426  */
427 void
428 oonf_stream_free_managed_config(struct oonf_stream_managed_config *config) {
429   netaddr_acl_remove(&config->acl);
430   netaddr_acl_remove(&config->bindto);
431 }
432
433 /**
434  * copies a stream managed configuration object
435  * @param dst Destination
436  * @param src Source
437  */
438 void
439 oonf_stream_copy_managed_config(struct oonf_stream_managed_config *dst, struct oonf_stream_managed_config *src) {
440   oonf_stream_free_managed_config(dst);
441
442   memcpy(dst, src, sizeof(*dst));
443
444   memset(&dst->acl, 0, sizeof(dst->acl));
445   netaddr_acl_copy(&dst->acl, &src->acl);
446
447   memset(&dst->bindto, 0, sizeof(dst->bindto));
448   netaddr_acl_copy(&dst->bindto, &src->bindto);
449 }
450
451 /**
452  * Close a TCP stream
453  * @param session tcp stream session
454  */
455 static void
456 _stream_close(struct oonf_stream_session *session) {
457   if (session->stream_socket->config.cleanup_session) {
458     session->stream_socket->config.cleanup_session(session);
459   }
460
461   oonf_timer_stop(&session->timeout);
462
463   session->stream_socket->session_counter--;
464   list_remove(&session->node);
465
466   oonf_socket_remove(&session->scheduler_entry);
467   os_fd_close(&session->scheduler_entry.fd);
468
469   abuf_free(&session->in);
470   abuf_free(&session->out);
471
472   oonf_class_free(session->stream_socket->config.memcookie, session);
473 }
474
475 /**
476  * Apply the stored settings of a managed socket
477  * @param managed pointer to managed stream
478  * @return -1 if an error happened, 0 otherwise
479  */
480 int
481 _apply_managed(struct oonf_stream_managed *managed) {
482   struct os_interface *bind_socket_to_if = NULL;
483
484   /* get interface */
485   if (!managed->_if_listener.data->flags.any) {
486     bind_socket_to_if = managed->_if_listener.data;
487   }
488
489   if (_apply_managed_socket(AF_INET, managed, &managed->socket_v4, bind_socket_to_if)) {
490     return -1;
491   }
492
493   if (os_system_is_ipv6_supported()) {
494     if (_apply_managed_socket(AF_INET6, managed, &managed->socket_v6, bind_socket_to_if)) {
495       return -1;
496     }
497   }
498   return 0;
499 }
500
501 /**
502  * Apply new configuration to a managed stream socket
503  * @param af_type address type to bind socket to
504  * @param managed pointer to managed stream
505  * @param stream pointer to TCP stream to configure
506  * @return -1 if an error happened, 0 otherwise.
507  */
508 static int
509 _apply_managed_socket(
510   int af_type, struct oonf_stream_managed *managed, struct oonf_stream_socket *stream, struct os_interface *data) {
511   struct netaddr_acl *bind_ip_acl;
512   const struct netaddr *bind_ip;
513   union netaddr_socket sock;
514   struct netaddr_str buf;
515
516   bind_ip_acl = &managed->_managed_config.bindto;
517
518   /* Get address the unicast socket should bind on */
519   if (data != NULL && !data->flags.up) {
520     bind_ip = NULL;
521   }
522   else if (data != NULL && netaddr_get_address_family(data->if_linklocal_v6) == af_type &&
523            netaddr_acl_check_accept(bind_ip_acl, data->if_linklocal_v6)) {
524     bind_ip = data->if_linklocal_v6;
525   }
526   else {
527     bind_ip = os_interface_get_bindaddress(af_type, bind_ip_acl, data);
528   }
529   if (!bind_ip) {
530     oonf_stream_remove(stream, true);
531     return 0;
532   }
533   if (netaddr_socket_init(&sock, bind_ip, managed->_managed_config.port, data == NULL ? 0 : data->index)) {
534     OONF_WARN(LOG_STREAM, "Cannot create managed socket address: %s/%u", netaddr_to_string(&buf, bind_ip),
535       managed->_managed_config.port);
536     return -1;
537   }
538
539   if (list_is_node_added(&stream->scheduler_entry._node)) {
540     if (memcmp(&sock, &stream->local_socket, sizeof(sock)) == 0) {
541       /* nothing changed, just copy configuration */
542       memcpy(&stream->config, &managed->config, sizeof(stream->config));
543       return 0;
544     }
545
546     oonf_stream_remove(stream, true);
547   }
548
549   /* copy configuration */
550   memcpy(&stream->config, &managed->config, sizeof(stream->config));
551   if (stream->config.memcookie == NULL) {
552     stream->config.memcookie = &_connection_cookie;
553   }
554
555   if (oonf_stream_add(stream, &sock)) {
556     return -1;
557   }
558
559   return 0;
560 }
561
562 /**
563  * Handle incoming server socket event from socket scheduler.
564  * @param entry socket entry for event parsing
565  */
566 static void
567 _cb_parse_request(struct oonf_socket_entry *entry) {
568   struct oonf_stream_socket *stream;
569   union netaddr_socket remote_socket;
570   struct netaddr remote_addr;
571   struct os_fd sock;
572 #ifdef OONF_LOG_DEBUG_INFO
573   struct netaddr_str buf1, buf2;
574 #endif
575
576   if (!oonf_socket_is_read(entry)) {
577     return;
578   }
579
580   stream = container_of(entry, typeof(*stream), scheduler_entry);
581
582   if (os_fd_accept(&sock, &entry->fd, &remote_socket)) {
583     OONF_WARN(LOG_STREAM, "accept() call returned error: %s (%d)", strerror(errno), errno);
584     return;
585   }
586
587   netaddr_from_socket(&remote_addr, &remote_socket);
588   if (stream->config.acl) {
589     if (!netaddr_acl_check_accept(stream->config.acl, &remote_addr)) {
590       OONF_DEBUG(LOG_STREAM, "Access from %s to socket %s blocked because of ACL",
591         netaddr_to_string(&buf1, &remote_addr), netaddr_socket_to_string(&buf2, &stream->local_socket));
592       os_fd_close(&sock);
593       return;
594     }
595   }
596   _create_session(stream, &sock, &remote_addr, &remote_socket);
597 }
598
599 /**
600  * Configure a TCP session socket
601  * @param stream_socket pointer to stream socket
602  * @param sock pointer to socket filedescriptor
603  * @param remote_addr pointer to remote address
604  * @return pointer to new stream session, NULL if an error happened.
605  */
606 static struct oonf_stream_session *
607 _create_session(struct oonf_stream_socket *stream_socket, struct os_fd *sock, const struct netaddr *remote_addr,
608   const union netaddr_socket *remote_socket) {
609   struct oonf_stream_session *session;
610   struct netaddr_str nbuf1, nbuf2;
611
612   /* put socket into non-blocking mode */
613   if (os_fd_set_nonblocking(sock)) {
614     OONF_WARN(LOG_STREAM, "Cannot set socket %d nonblocking: %s (%d)", os_fd_get_fd(sock), strerror(errno), errno);
615     return NULL;
616   }
617
618   session = oonf_class_malloc(stream_socket->config.memcookie);
619   if (session == NULL) {
620     OONF_WARN(LOG_STREAM, "Cannot allocate memory for comport session");
621     return NULL;
622   }
623
624   if (abuf_init(&session->in)) {
625     OONF_WARN(LOG_STREAM, "Cannot allocate memory for comport session");
626     goto parse_request_error;
627   }
628   if (abuf_init(&session->out)) {
629     OONF_WARN(LOG_STREAM, "Cannot allocate memory for comport session");
630     goto parse_request_error;
631   }
632
633   os_fd_copy(&session->scheduler_entry.fd, sock);
634   session->scheduler_entry.name = session->socket_name;
635   session->scheduler_entry.process = _cb_parse_connection;
636   session->send_first = stream_socket->config.send_first;
637   session->stream_socket = stream_socket;
638
639   session->remote_address = *remote_addr;
640   session->remote_socket = *remote_socket;
641
642   /* generate socket name */
643   snprintf(session->socket_name, sizeof(session->socket_name), "tcp: %s,%s",
644     netaddr_socket_to_string(&nbuf1, &stream_socket->local_socket),
645     netaddr_socket_to_string(&nbuf2, &session->remote_socket));
646
647   if (stream_socket->session_counter < stream_socket->config.allowed_sessions) {
648     /* create active session */
649     session->state = STREAM_SESSION_ACTIVE;
650     stream_socket->session_counter++;
651   }
652   else {
653     /* too many sessions */
654     if (stream_socket->config.create_error) {
655       stream_socket->config.create_error(session, STREAM_SERVICE_UNAVAILABLE);
656     }
657     session->state = STREAM_SESSION_SEND_AND_QUIT;
658   }
659
660   session->timeout.class = &_connection_timeout;
661   if (stream_socket->config.session_timeout) {
662     oonf_timer_start(&session->timeout, stream_socket->config.session_timeout);
663   }
664
665   oonf_socket_add(&session->scheduler_entry);
666   oonf_socket_set_read(&session->scheduler_entry, true);
667   oonf_socket_set_write(&session->scheduler_entry, true);
668
669   if (stream_socket->config.init_session) {
670     if (stream_socket->config.init_session(session)) {
671       goto parse_request_error;
672     }
673   }
674
675   OONF_DEBUG(LOG_STREAM, "Got connection through socket %d with %s.\n", os_fd_get_fd(sock),
676     netaddr_to_string(&nbuf1, remote_addr));
677
678   list_add_tail(&stream_socket->session, &session->node);
679   return session;
680
681 parse_request_error:
682   abuf_free(&session->in);
683   abuf_free(&session->out);
684   oonf_class_free(stream_socket->config.memcookie, session);
685
686   return NULL;
687 }
688
689 /**
690  * Handle TCP session timeout
691  * @param ptr timer instance that fired
692  */
693 static void
694 _cb_timeout_handler(struct oonf_timer_instance *ptr) {
695   struct oonf_stream_session *session;
696
697   session = container_of(ptr, struct oonf_stream_session, timeout);
698   oonf_stream_close(session);
699 }
700
701 /**
702  * Handle events for TCP session from network scheduler
703  * @param entry socket entry to be parsed
704  */
705 static void
706 _cb_parse_connection(struct oonf_socket_entry *entry) {
707   struct oonf_stream_session *session;
708   struct oonf_stream_socket *s_sock;
709   int len;
710   char buffer[1024];
711   struct netaddr_str buf;
712
713   session = container_of(entry, typeof(*session), scheduler_entry);
714   s_sock = session->stream_socket;
715
716   OONF_DEBUG(LOG_STREAM, "Parsing connection of socket %d\n", os_fd_get_fd(&entry->fd));
717
718   /* mark session and s_sock as busy */
719   session->busy = true;
720   s_sock->busy = true;
721
722   if (session->wait_for_connect) {
723     if (oonf_socket_is_write(entry)) {
724       int value;
725
726       if (os_fd_get_socket_error(&entry->fd, &value)) {
727         OONF_WARN(LOG_STREAM, "getsockopt failed: %s (%d)", strerror(errno), errno);
728         session->state = STREAM_SESSION_CLEANUP;
729       }
730       else if (value == ECONNREFUSED) {
731         /* Don't produce a warning for an failed outgoing TCP connection */
732         OONF_INFO(LOG_STREAM, "TCP connection to %s refused: %s (%d)",
733             netaddr_socket_to_string(&buf, &session->remote_socket), strerror(value), value);
734         session->state = STREAM_SESSION_CLEANUP;
735       }
736       else if (value != 0) {
737         OONF_WARN(LOG_STREAM, "Connection to %s failed: %s (%d)",
738           netaddr_socket_to_string(&buf, &session->remote_socket), strerror(value), value);
739         session->state = STREAM_SESSION_CLEANUP;
740       }
741       else {
742         session->wait_for_connect = false;
743       }
744     }
745   }
746
747   if (session->wait_for_connect) {
748     session->busy = false;
749     s_sock->busy = false;
750     return;
751   }
752
753   /* read data if necessary */
754   if (session->state == STREAM_SESSION_ACTIVE && oonf_socket_is_read(entry)) {
755     len = os_fd_recvfrom(&entry->fd, buffer, sizeof(buffer), NULL, 0);
756     if (len > 0) {
757       OONF_DEBUG(LOG_STREAM, "  recv returned %d\n", len);
758       if (abuf_memcpy(&session->in, buffer, len)) {
759         /* out of memory */
760         OONF_WARN(LOG_STREAM, "Out of memory for comport session input buffer");
761         session->state = STREAM_SESSION_CLEANUP;
762       }
763       else if (abuf_getlen(&session->in) > s_sock->config.maximum_input_buffer) {
764         /* input buffer overflow */
765         if (s_sock->config.create_error) {
766           s_sock->config.create_error(session, STREAM_REQUEST_TOO_LARGE);
767         }
768         session->state = STREAM_SESSION_SEND_AND_QUIT;
769       }
770       else {
771         /* got new input block, reset timeout */
772         oonf_stream_set_timeout(session, s_sock->config.session_timeout);
773       }
774     }
775     else if (len < 0 && errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
776       /* error during read */
777       OONF_WARN(LOG_STREAM, "Error while reading from communication stream with %s: %s (%d)\n",
778         netaddr_to_string(&buf, &session->remote_address), strerror(errno), errno);
779       session->state = STREAM_SESSION_CLEANUP;
780     }
781     else if (len == 0) {
782       /* external s_sock closed */
783       session->state = STREAM_SESSION_SEND_AND_QUIT;
784
785       /* still call callback once more */
786       session->state = s_sock->config.receive_data(session);
787
788       /* switch off read events */
789       oonf_socket_set_read(entry, false);
790     }
791   }
792
793   if (session->state == STREAM_SESSION_ACTIVE && s_sock->config.receive_data != NULL &&
794       (abuf_getlen(&session->in) > 0 || session->send_first)) {
795     session->state = s_sock->config.receive_data(session);
796     session->send_first = false;
797   }
798
799   /* send data if necessary */
800   if (session->state != STREAM_SESSION_CLEANUP && abuf_getlen(&session->out) > 0) {
801     if (oonf_socket_is_write(entry)) {
802       len = os_fd_sendto(&entry->fd, abuf_getptr(&session->out), abuf_getlen(&session->out), NULL, false);
803
804       if (len > 0) {
805         OONF_DEBUG(LOG_STREAM, "  send returned %d\n", len);
806         abuf_pull(&session->out, len);
807         oonf_stream_set_timeout(session, s_sock->config.session_timeout);
808       }
809       else if (len < 0 && errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
810         OONF_WARN(LOG_STREAM, "Error while writing to communication stream with %s: %s (%d)\n",
811           netaddr_to_string(&buf, &session->remote_address), strerror(errno), errno);
812         session->state = STREAM_SESSION_CLEANUP;
813       }
814     }
815     else {
816       OONF_DEBUG(LOG_STREAM, "  activating output in scheduler\n");
817       oonf_socket_set_write(&session->scheduler_entry, true);
818     }
819   }
820
821   /* send file if necessary */
822   if (session->state == STREAM_SESSION_SEND_AND_QUIT && abuf_getlen(&session->out) == 0 &&
823       os_fd_is_initialized(&session->copy_fd)) {
824     if (oonf_socket_is_write(entry)) {
825       len = os_fd_sendfile(
826         &entry->fd, &session->copy_fd, session->copy_bytes_sent, session->copy_total_size - session->copy_bytes_sent);
827       if (len <= 0) {
828         OONF_WARN(LOG_STREAM, "Error while copying file to output stream (%d/%d): %s (%d)", os_fd_get_fd(&entry->fd),
829           os_fd_get_fd(&session->copy_fd), strerror(errno), errno);
830         session->state = STREAM_SESSION_CLEANUP;
831       }
832       else {
833         session->copy_bytes_sent += len;
834       }
835     }
836   }
837
838   /* check for buffer underrun */
839   if (session->state == STREAM_SESSION_ACTIVE && abuf_getlen(&session->out) == 0 &&
840       s_sock->config.buffer_underrun != NULL) {
841     session->state = s_sock->config.buffer_underrun(session);
842   }
843
844   if (abuf_getlen(&session->out) == 0 && session->copy_bytes_sent == session->copy_total_size) {
845     /* nothing to send anymore */
846     OONF_DEBUG(LOG_STREAM, "  deactivating output in scheduler\n");
847     oonf_socket_set_write(&session->scheduler_entry, false);
848     if (session->state == STREAM_SESSION_SEND_AND_QUIT) {
849       session->state = STREAM_SESSION_CLEANUP;
850     }
851   }
852
853   session->busy = false;
854   s_sock->busy = false;
855
856   /* end of connection ? */
857   if (session->state == STREAM_SESSION_CLEANUP || session->removed) {
858     OONF_DEBUG(LOG_STREAM, "  cleanup\n");
859
860     /* clean up connection by calling cleanup directly */
861     _stream_close(session);
862
863     /* session object will not be valid anymore after this point */
864   }
865
866   /* lazy socket removal */
867   if (s_sock->remove) {
868     oonf_stream_remove(s_sock, false);
869   }
870   return;
871 }
872
873 /**
874  * Callbacks for events on the interface
875  * @param interf os interface listener that fired
876  * @return -1 if an error happened, 0 otherwise
877  */
878 static int
879 _cb_interface_listener(struct os_interface_listener *interf) {
880   struct oonf_stream_managed *managed;
881   int result;
882
883   /* calculate managed socket for this event */
884   managed = container_of(interf, struct oonf_stream_managed, _if_listener);
885
886   result = _apply_managed(managed);
887
888   OONF_DEBUG(LOG_STREAM, "Result from interface %s triggered socket reconfiguration: %d", interf->name, result);
889
890   return result;
891 }