Fix dependencies and graph files
[oonf.git] / src-plugins / subsystems / oonf_stream_socket.c
1
2 /*
3  * The olsr.org Optimized Link-State Routing daemon version 2 (olsrd2)
4  * Copyright (c) 2004-2013, the olsr.org team - see HISTORY file
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * * Redistributions of source code must retain the above copyright
12  *   notice, this list of conditions and the following disclaimer.
13  * * Redistributions in binary form must reproduce the above copyright
14  *   notice, this list of conditions and the following disclaimer in
15  *   the documentation and/or other materials provided with the
16  *   distribution.
17  * * Neither the name of olsr.org, olsrd nor the names of its
18  *   contributors may be used to endorse or promote products derived
19  *   from this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  *
34  * Visit http://www.olsr.org for more information.
35  *
36  * If you find this software useful feel free to make a donation
37  * to the project. For more information see the website or contact
38  * the copyright holders.
39  *
40  */
41
42 #include <string.h>
43 #include <stdlib.h>
44 #include <errno.h>
45
46 #include "common/autobuf.h"
47 #include "common/avl.h"
48 #include "common/list.h"
49 #include "core/oonf_logging.h"
50 #include "core/oonf_subsystem.h"
51 #include "subsystems/oonf_class.h"
52 #include "subsystems/oonf_interface.h"
53 #include "subsystems/oonf_timer.h"
54 #include "subsystems/oonf_stream_socket.h"
55 #include "subsystems/os_system.h"
56 #include "subsystems/os_socket.h"
57
58 /* Definitions */
59 #define LOG_STREAM _oonf_stream_socket_subsystem.logging
60
61 /* prototypes */
62 static int _init(void);
63 static void _cleanup(void);
64
65 static void _stream_close(struct oonf_stream_session *session);
66 int _apply_managed(struct oonf_stream_managed *managed);
67 static int _apply_managed_socket(int af_type, struct oonf_stream_managed *managed,
68     struct oonf_stream_socket *stream, struct os_interface_data *data);
69 static void _cb_parse_request(int fd, void *data, bool, bool);
70 static struct oonf_stream_session *_create_session(
71     struct oonf_stream_socket *stream_socket, int sock,
72     const struct netaddr *remote_addr,
73     const union netaddr_socket *remote_socket);
74 static void _cb_parse_connection(int fd, void *data, bool r,bool w);
75
76 static void _cb_timeout_handler(void *);
77 static void _cb_interface_listener(struct oonf_interface_listener *l);
78
79 /* list of olsr stream sockets */
80 static struct list_entity _stream_head;
81
82 /* server socket */
83 static struct oonf_class _connection_cookie = {
84   .name = "stream socket connection",
85   .size = sizeof(struct oonf_stream_session)
86 };
87
88 static struct oonf_timer_class _connection_timeout = {
89   .name = "stream socket timout",
90   .callback = _cb_timeout_handler,
91 };
92
93 /* subsystem definition */
94 static const char *_dependencies[] = {
95   OONF_CLASS_SUBSYSTEM,
96   OONF_INTERFACE_SUBSYSTEM,
97   OONF_SOCKET_SUBSYSTEM,
98   OONF_TIMER_SUBSYSTEM,
99   OONF_OS_SYSTEM_SUBSYSTEM,
100   OONF_OS_SOCKET_SUBSYSTEM,
101 };
102
103 static struct oonf_subsystem _oonf_stream_socket_subsystem = {
104   .name = OONF_STREAM_SUBSYSTEM,
105   .dependencies = _dependencies,
106   .dependencies_count = ARRAYSIZE(_dependencies),
107   .init = _init,
108   .cleanup = _cleanup,
109 };
110 DECLARE_OONF_PLUGIN(_oonf_stream_socket_subsystem);
111
112 /**
113  * Initialize the stream socket handlers
114  * @return always returns 0
115  */
116 static int
117 _init(void) {
118   oonf_class_add(&_connection_cookie);
119   oonf_timer_add(&_connection_timeout);
120   list_init_head(&_stream_head);
121   return 0;
122 }
123
124 /**
125  * Cleanup all resources allocated be stream socket handlers
126  */
127 static void
128 _cleanup(void) {
129   struct oonf_stream_socket *comport;
130
131   while (!list_is_empty(&_stream_head)) {
132     comport = list_first_element(&_stream_head, comport, _node);
133
134     oonf_stream_remove(comport, true);
135   }
136
137   oonf_class_remove(&_connection_cookie);
138   oonf_timer_remove(&_connection_timeout);
139 }
140
141 /**
142  * Flush all data in outgoing buffer of a stream socket
143  * @param con pointer to stream socket
144  */
145 void
146 oonf_stream_flush(struct oonf_stream_session *con) {
147   oonf_socket_set_write(&con->scheduler_entry, true);
148 }
149
150 /**
151  * Add a new stream socket to the scheduler
152  * @param stream_socket pointer to stream socket struct with
153  *   initialized config
154  * @param local pointer to local ip/port of socket, port must be 0 if
155  *   this shall be an outgoing socket
156  * @return -1 if an error happened, 0 otherwise
157  */
158 int
159 oonf_stream_add(struct oonf_stream_socket *stream_socket,
160     const union netaddr_socket *local) {
161   int s = -1;
162   struct netaddr_str buf;
163
164   /* server socket not necessary for outgoing connections */
165   if (netaddr_socket_get_port(local) != 0) {
166     /* Init socket */
167     s = os_socket_getsocket(local, true, 0, NULL, LOG_STREAM);
168     if (s < 0) {
169       goto add_stream_error;
170     }
171
172     /* show that we are willing to listen */
173     if (os_socket_listen(s, 1) == -1) {
174       OONF_WARN(LOG_STREAM, "tcp socket listen failed for %s: %s (%d)\n",
175           netaddr_socket_to_string(&buf, local), strerror(errno), errno);
176       goto add_stream_error;
177     }
178
179     stream_socket->scheduler_entry.fd = s;
180     stream_socket->scheduler_entry.process = _cb_parse_request;
181     stream_socket->scheduler_entry.data = stream_socket;
182     stream_socket->scheduler_entry.event_read = true;
183
184     oonf_socket_add(&stream_socket->scheduler_entry);
185   }
186   memcpy(&stream_socket->local_socket, local, sizeof(stream_socket->local_socket));
187
188   if (stream_socket->config.memcookie == NULL) {
189     stream_socket->config.memcookie = &_connection_cookie;
190   }
191   if (stream_socket->config.allowed_sessions == 0) {
192     stream_socket->config.allowed_sessions = 10;
193   }
194   if (stream_socket->config.maximum_input_buffer == 0) {
195     stream_socket->config.maximum_input_buffer = 65536;
196   }
197
198   list_init_head(&stream_socket->session);
199   list_add_tail(&_stream_head, &stream_socket->_node);
200
201   return 0;
202
203 add_stream_error:
204   if (stream_socket->scheduler_entry.fd) {
205     oonf_socket_remove(&stream_socket->scheduler_entry);
206   }
207   if (s != -1) {
208     os_socket_close(s);
209   }
210   return -1;
211 }
212
213 /**
214  * Remove a stream socket from the scheduler
215  * @param stream_socket pointer to socket
216  * @param force true if socket will be closed immediately,
217  *   false if scheduler should wait until outgoing buffers are empty
218  */
219 void
220 oonf_stream_remove(struct oonf_stream_socket *stream_socket, bool force) {
221   if (stream_socket->busy && !force) {
222     stream_socket->remove = true;
223     return;
224   }
225
226   if (!list_is_node_added(&stream_socket->_node)) {
227     return;
228   }
229
230   oonf_stream_close_all_sessions(stream_socket);
231   list_remove(&stream_socket->_node);
232
233   if (stream_socket->scheduler_entry.fd) {
234     /* only for server sockets */
235     os_socket_close(stream_socket->scheduler_entry.fd);
236     oonf_socket_remove(&stream_socket->scheduler_entry);
237   }
238 }
239
240 /**
241  * Closes all client connections of a stream socket, does not close the local
242  * socket itself.
243  * @param stream_socket stream socket
244  */
245 void
246 oonf_stream_close_all_sessions(struct oonf_stream_socket *stream_socket) {
247   struct oonf_stream_session *session, *ptr;
248
249   if (!list_is_node_added(&stream_socket->_node)) {
250     return;
251   }
252
253   list_for_each_element_safe(&stream_socket->session, session, node, ptr) {
254     if (abuf_getlen(&session->out) == 0 && !session->busy) {
255       /* close everything that doesn't need to send data anymore */
256       oonf_stream_close(session);
257     }
258   }
259   return;
260 }
261
262 /**
263  * Create an outgoing stream socket.
264  * @param stream_socket pointer to stream socket
265  * @param remote pointer to address of remote TCP server
266  * @return pointer to stream session, NULL if an error happened.
267  */
268 struct oonf_stream_session *
269 oonf_stream_connect_to(struct oonf_stream_socket *stream_socket,
270     const union netaddr_socket *remote) {
271   struct oonf_stream_session *session;
272   struct netaddr remote_addr;
273   bool wait_for_connect = false;
274   int s;
275   struct netaddr_str nbuf1;
276 #ifdef OONF_LOG_DEBUG_INFO
277   struct netaddr_str nbuf2;
278 #endif
279
280   OONF_DEBUG(LOG_STREAM, "Connect TCP socket from %s to %s",
281       netaddr_socket_to_string(&nbuf1, &stream_socket->local_socket),
282       netaddr_socket_to_string(&nbuf2, remote));
283
284   s = os_socket_getsocket(&stream_socket->local_socket,
285       true, 0, NULL, LOG_STREAM);
286   if (s < 0) {
287     return NULL;
288   }
289
290   if (os_socket_connect(s, remote)) {
291     if (errno != EINPROGRESS) {
292       OONF_WARN(LOG_STREAM, "Cannot connect outgoing tcp connection to %s: %s (%d)",
293           netaddr_socket_to_string(&nbuf1, remote), strerror(errno), errno);
294       goto connect_to_error;
295     }
296     wait_for_connect = true;
297   }
298
299   netaddr_from_socket(&remote_addr, remote);
300   session = _create_session(stream_socket, s, &remote_addr, remote);
301   if (session) {
302     session->wait_for_connect = wait_for_connect;
303     return session;
304   }
305
306   /* fall through */
307 connect_to_error:
308   if (s) {
309     os_socket_close(s);
310   }
311   return NULL;
312 }
313
314 /**
315  * Reset the session timeout of a TCP session
316  * @param con pointer to stream session
317  * @param timeout timeout in milliseconds
318  */
319 void
320 oonf_stream_set_timeout(struct oonf_stream_session *con, uint64_t timeout) {
321   oonf_timer_set(&con->timeout, timeout);
322 }
323
324 /**
325  * Close a TCP stream session
326  * @param session pointer to stream session
327  * @param force true if the session should be closed instantly,
328  *   false if all data in queue should still be sent
329  */
330 void
331 oonf_stream_close(struct oonf_stream_session *session) {
332   if (session->busy) {
333     /* remove the session later */
334     session->removed = true;
335     return;
336   }
337   _stream_close (session);
338 }
339
340 /**
341  * Initialized a managed TCP stream
342  * @param managed pointer to initialized managed stream
343  */
344 void
345 oonf_stream_add_managed(struct oonf_stream_managed *managed) {
346   if (managed->config.allowed_sessions == 0) {
347     managed->config.allowed_sessions = 10;
348   }
349   if (managed->config.maximum_input_buffer == 0) {
350     managed->config.maximum_input_buffer = 65536;
351   }
352   if (managed->config.session_timeout == 0) {
353     managed->config.session_timeout = 120000;
354   }
355
356   managed->_if_listener.process = _cb_interface_listener;
357   managed->_if_listener.name = managed->_managed_config.interface;
358 }
359
360 /**
361  * Apply a configuration to a stream. Will reset both ACLs
362  * and socket ports/bindings.
363  * @param managed pointer to managed stream
364  * @param config pointer to stream config
365  * @return -1 if an error happened, 0 otherwise.
366  */
367 int
368 oonf_stream_apply_managed(struct oonf_stream_managed *managed,
369     struct oonf_stream_managed_config *config) {
370   bool if_changed;
371
372   if_changed = strcmp(config->interface, managed->_managed_config.interface) != 0;
373
374   oonf_stream_copy_managed_config(&managed->_managed_config, config);
375
376   /* set back pointers */
377   managed->socket_v4.managed = managed;
378   managed->socket_v6.managed = managed;
379
380   /* handle change in interface listener */
381   if (if_changed) {
382     /* interface changed, remove old listener if necessary */
383     oonf_interface_remove_listener(&managed->_if_listener);
384
385     if (managed->_managed_config.interface[0]) {
386       /* create new interface listener */
387       oonf_interface_add_listener(&managed->_if_listener);
388     }
389   }
390
391   OONF_DEBUG(LOG_STREAM, "Apply changes for managed socket (if %s) with port %d",
392       config->interface == NULL || config->interface[0] == 0 ? "any" : config->interface,
393       config->port);
394
395   return _apply_managed(managed);
396 }
397
398 /**
399  * Remove a managed TCP stream
400  * @param managed pointer to managed stream
401  * @param force true if socket will be closed immediately,
402  *   false if scheduler should wait until outgoing buffers are empty
403  */
404 void
405 oonf_stream_remove_managed(struct oonf_stream_managed *managed, bool force) {
406   oonf_interface_remove_listener(&managed->_if_listener);
407
408   oonf_stream_remove(&managed->socket_v4, force);
409   oonf_stream_remove(&managed->socket_v6, force);
410   oonf_interface_remove_listener(&managed->_if_listener);
411   oonf_stream_free_managed_config(&managed->_managed_config);
412 }
413
414 /**
415  * Closes all connections of a managed socket, but not the socket itself
416  * @param managed managed stream socket
417  */
418 void
419 oonf_stream_close_all_managed_sessions(struct oonf_stream_managed *managed) {
420   oonf_stream_close_all_sessions(&managed->socket_v4);
421   oonf_stream_close_all_sessions(&managed->socket_v6);
422 }
423
424 /**
425  * Free dynamically allocated parts of managed stream configuration
426  * @param config packet configuration
427  */
428 void
429 oonf_stream_free_managed_config(struct oonf_stream_managed_config *config) {
430   netaddr_acl_remove(&config->acl);
431   netaddr_acl_remove(&config->bindto);
432 }
433
434 /**
435  * copies a stream managed configuration object
436  * @param dst Destination
437  * @param src Source
438  */
439 void
440 oonf_stream_copy_managed_config(struct oonf_stream_managed_config *dst,
441     struct oonf_stream_managed_config *src) {
442   oonf_stream_free_managed_config(dst);
443
444   memcpy(dst, src, sizeof(*dst));
445
446   memset(&dst->acl, 0, sizeof(dst->acl));
447   netaddr_acl_copy(&dst->acl, &src->acl);
448
449   memset(&dst->bindto, 0, sizeof(dst->bindto));
450   netaddr_acl_copy(&dst->bindto, &src->bindto);
451 }
452
453 /**
454  * Close a TCP stream
455  * @param session tcp stream session
456  */
457 static void
458 _stream_close(struct oonf_stream_session *session) {
459   if (!list_is_node_added(&session->node)) {
460     return;
461   }
462
463   if (session->comport->config.cleanup) {
464     session->comport->config.cleanup(session);
465   }
466
467   oonf_timer_stop(&session->timeout);
468
469   session->comport->config.allowed_sessions++;
470   list_remove(&session->node);
471
472   os_socket_close(session->scheduler_entry.fd);
473   oonf_socket_remove(&session->scheduler_entry);
474
475   abuf_free(&session->in);
476   abuf_free(&session->out);
477
478   oonf_class_free(session->comport->config.memcookie, session);
479 }
480
481 /**
482  * Apply the stored settings of a managed socket
483  * @param managed pointer to managed stream
484  * @return -1 if an error happened, 0 otherwise
485  */
486 int
487 _apply_managed(struct oonf_stream_managed *managed) {
488   struct os_interface_data *data = NULL;
489
490   /* get interface */
491   if (managed->_if_listener.interface) {
492     data = &managed->_if_listener.interface->data;
493   }
494
495   if (_apply_managed_socket(AF_INET, managed, &managed->socket_v4, data)) {
496     return -1;
497   }
498
499   if (os_system_is_ipv6_supported()) {
500     if (_apply_managed_socket(AF_INET6, managed, &managed->socket_v6, data)) {
501       return -1;
502     }
503   }
504   return 0;
505 }
506
507 /**
508  * Apply new configuration to a managed stream socket
509  * @param af_type address type to bind socket to
510  * @param managed pointer to managed stream
511  * @param stream pointer to TCP stream to configure
512  * @return -1 if an error happened, 0 otherwise.
513  */
514 static int
515 _apply_managed_socket(int af_type, struct oonf_stream_managed *managed,
516     struct oonf_stream_socket *stream, struct os_interface_data *data) {
517   struct netaddr_acl *bind_ip_acl;
518   const struct netaddr *bind_ip;
519   union netaddr_socket sock;
520   struct netaddr_str buf;
521
522   bind_ip_acl = &managed->_managed_config.bindto;
523
524   /* Get address the unicast socket should bind on */
525   if (data != NULL && !data->up) {
526     bind_ip = NULL;
527   }
528   else if (data != NULL && netaddr_get_address_family(data->linklocal_v6_ptr) == af_type &&
529       netaddr_acl_check_accept(bind_ip_acl, data->linklocal_v6_ptr)) {
530
531     bind_ip = data->linklocal_v6_ptr;
532   }
533   else {
534     bind_ip = oonf_interface_get_bindaddress(af_type, bind_ip_acl, data);
535   }
536   if (!bind_ip) {
537     oonf_stream_remove(stream, true);
538     return 0;
539   }
540   if (netaddr_socket_init(&sock, bind_ip, managed->_managed_config.port,
541       data == NULL ? 0 : data->index)) {
542     OONF_WARN(LOG_STREAM, "Cannot create managed socket address: %s/%u",
543         netaddr_to_string(&buf, bind_ip), managed->_managed_config.port);
544     return -1;
545   }
546
547   if (list_is_node_added(&stream->scheduler_entry._node)) {
548     if (memcmp(&sock, &stream->local_socket, sizeof(sock)) == 0) {
549       /* nothing changed */
550       return 0;
551     }
552
553     oonf_stream_remove(stream, true);
554   }
555   if (oonf_stream_add(stream, &sock)) {
556     return -1;
557   }
558
559   /* copy configuration */
560   memcpy(&stream->config, &managed->config, sizeof(stream->config));
561   if (stream->config.memcookie == NULL) {
562     stream->config.memcookie = &_connection_cookie;
563   }
564   return 0;
565 }
566
567 /**
568  * Handle incoming server socket event from socket scheduler.
569  * @param fd filedescriptor for event
570  * @param data custom user data
571  * @param event_read true if read-event is incoming
572  * @param event_write true if write-event is incoming
573  */
574 static void
575 _cb_parse_request(int fd, void *data, bool event_read,
576     bool event_write __attribute__((unused))) {
577   struct oonf_stream_socket *stream;
578   union netaddr_socket remote_socket;
579   struct netaddr remote_addr;
580   socklen_t addrlen;
581   int sock;
582 #ifdef OONF_LOG_DEBUG_INFO
583   struct netaddr_str buf1, buf2;
584 #endif
585
586   if (!event_read) {
587     return;
588   }
589
590   stream = data;
591
592   addrlen = sizeof(remote_socket);
593   sock = accept(fd, &remote_socket.std, &addrlen);
594   if (sock < 0) {
595     OONF_WARN(LOG_STREAM, "accept() call returned error: %s (%d)", strerror(errno), errno);
596     return;
597   }
598
599   netaddr_from_socket(&remote_addr, &remote_socket);
600   if (stream->config.acl) {
601     if (!netaddr_acl_check_accept(stream->config.acl, &remote_addr)) {
602       OONF_DEBUG(LOG_STREAM, "Access from %s to socket %s blocked because of ACL",
603           netaddr_to_string(&buf1, &remote_addr),
604           netaddr_socket_to_string(&buf2, &stream->local_socket));
605       close(sock);
606       return;
607     }
608   }
609   _create_session(stream, sock, &remote_addr, &remote_socket);
610 }
611
612 /**
613  * Configure a TCP session socket
614  * @param stream_socket pointer to stream socket
615  * @param sock pointer to socket filedescriptor
616  * @param remote_addr pointer to remote address
617  * @return pointer to new stream session, NULL if an error happened.
618  */
619 static struct oonf_stream_session *
620 _create_session(struct oonf_stream_socket *stream_socket,
621     int sock, const struct netaddr *remote_addr,
622     const union netaddr_socket *remote_socket) {
623   struct oonf_stream_session *session;
624 #ifdef OONF_LOG_DEBUG_INFO
625   struct netaddr_str buf;
626 #endif
627
628   /* put socket into non-blocking mode */
629   if (os_socket_set_nonblocking(sock)) {
630     OONF_WARN(LOG_STREAM, "Cannot read comport socket status: %s (%d)",
631         strerror(errno), errno);
632     return NULL;
633   }
634
635   session = oonf_class_malloc(stream_socket->config.memcookie);
636   if (session == NULL) {
637     OONF_WARN(LOG_STREAM, "Cannot allocate memory for comport session");
638     return NULL;
639   }
640
641   if (abuf_init(&session->in)) {
642     OONF_WARN(LOG_STREAM, "Cannot allocate memory for comport session");
643     goto parse_request_error;
644   }
645   if (abuf_init(&session->out)) {
646     OONF_WARN(LOG_STREAM, "Cannot allocate memory for comport session");
647     goto parse_request_error;
648   }
649
650   session->scheduler_entry.fd = sock;
651   session->scheduler_entry.process = _cb_parse_connection;
652   session->scheduler_entry.data = session;
653   session->scheduler_entry.event_read = true;
654   session->scheduler_entry.event_write = true;
655   oonf_socket_add(&session->scheduler_entry);
656
657   session->send_first = stream_socket->config.send_first;
658   session->comport = stream_socket;
659
660   session->remote_address = *remote_addr;
661   session->remote_socket = *remote_socket;
662
663   if (stream_socket->config.allowed_sessions-- > 0) {
664     /* create active session */
665     session->state = STREAM_SESSION_ACTIVE;
666   } else {
667     /* too many sessions */
668     if (stream_socket->config.create_error) {
669       stream_socket->config.create_error(session, STREAM_SERVICE_UNAVAILABLE);
670     }
671     session->state = STREAM_SESSION_SEND_AND_QUIT;
672   }
673
674   session->timeout.cb_context = session;
675   session->timeout.class = &_connection_timeout;
676   if (stream_socket->config.session_timeout) {
677     oonf_timer_start(&session->timeout, stream_socket->config.session_timeout);
678   }
679
680   if (stream_socket->config.init) {
681     if (stream_socket->config.init(session)) {
682       goto parse_request_error;
683     }
684   }
685
686   OONF_DEBUG(LOG_STREAM, "Got connection through socket %d with %s.\n",
687       sock, netaddr_to_string(&buf, remote_addr));
688
689   list_add_tail(&stream_socket->session, &session->node);
690   return session;
691
692 parse_request_error:
693   abuf_free(&session->in);
694   abuf_free(&session->out);
695   oonf_class_free(stream_socket->config.memcookie, session);
696
697   return NULL;
698 }
699
700 /**
701  * Handle TCP session timeout
702  * @param data custom data
703  */
704 static void
705 _cb_timeout_handler(void *data) {
706   struct oonf_stream_session *session = data;
707   oonf_stream_close(session);
708 }
709
710 /**
711  * Handle events for TCP session from network scheduler
712  * @param fd filedescriptor of TCP session
713  * @param data custom data
714  * @param event_read true if read-event is incoming
715  * @param event_write true if write-event is incoming
716  */
717 static void
718 _cb_parse_connection(int fd, void *data, bool event_read, bool event_write) {
719   struct oonf_stream_session *session;
720   struct oonf_stream_socket *s_sock;
721   int len;
722   char buffer[1024];
723   struct netaddr_str buf;
724
725   session = data;
726   s_sock = session->comport;
727
728   OONF_DEBUG(LOG_STREAM, "Parsing connection of socket %d\n", fd);
729
730   /* mark session and s_sock as busy */
731   session->busy = true;
732   s_sock->busy = true;
733
734   if (session->wait_for_connect) {
735     if (event_write) {
736       int value;
737
738       if(os_socket_get_socket_error(fd, &value)) {
739         OONF_WARN(LOG_STREAM, "getsockopt failed: %s (%d)",
740             strerror(errno), errno);
741         session->state = STREAM_SESSION_CLEANUP;
742       }
743       else if (value != 0) {
744         OONF_WARN(LOG_STREAM, "Connection to %s failed: %s (%d)",
745             netaddr_socket_to_string(&buf, &session->remote_socket),
746             strerror(value), value);
747         session->state = STREAM_SESSION_CLEANUP;
748       }
749       else {
750         session->wait_for_connect = false;
751       }
752     }
753   }
754
755   if (session->wait_for_connect) {
756     session->busy = false;
757     s_sock->busy = false;
758     return;
759   }
760
761   /* read data if necessary */
762   if (session->state == STREAM_SESSION_ACTIVE && event_read) {
763     len = os_socket_recvfrom(fd, buffer, sizeof(buffer), NULL, 0);
764     if (len > 0) {
765       OONF_DEBUG(LOG_STREAM, "  recv returned %d\n", len);
766       if (abuf_memcpy(&session->in, buffer, len)) {
767         /* out of memory */
768         OONF_WARN(LOG_STREAM, "Out of memory for comport session input buffer");
769         session->state = STREAM_SESSION_CLEANUP;
770       } else if (abuf_getlen(&session->in) > s_sock->config.maximum_input_buffer) {
771         /* input buffer overflow */
772         if (s_sock->config.create_error) {
773           s_sock->config.create_error(session, STREAM_REQUEST_TOO_LARGE);
774         }
775         session->state = STREAM_SESSION_SEND_AND_QUIT;
776       } else {
777         /* got new input block, reset timeout */
778         oonf_stream_set_timeout(session, s_sock->config.session_timeout);
779       }
780     } else if (len < 0 && errno != EINTR && errno != EAGAIN && errno
781         != EWOULDBLOCK) {
782       /* error during read */
783       OONF_WARN(LOG_STREAM, "Error while reading from communication stream with %s: %s (%d)\n",
784           netaddr_to_string(&buf, &session->remote_address), strerror(errno), errno);
785       session->state = STREAM_SESSION_CLEANUP;
786     } else if (len == 0) {
787       /* external s_sock closed */
788       session->state = STREAM_SESSION_SEND_AND_QUIT;
789     }
790   }
791
792   if (session->state == STREAM_SESSION_ACTIVE && s_sock->config.receive_data != NULL
793       && (abuf_getlen(&session->in) > 0 || session->send_first)) {
794     session->state = s_sock->config.receive_data(session);
795     session->send_first = false;
796   }
797
798   /* send data if necessary */
799   if (session->state != STREAM_SESSION_CLEANUP && abuf_getlen(&session->out) > 0) {
800     if (event_write) {
801       len = os_socket_sendto(fd, abuf_getptr(&session->out), abuf_getlen(&session->out), NULL, false);
802
803       if (len > 0) {
804         OONF_DEBUG(LOG_STREAM, "  send returned %d\n", len);
805         abuf_pull(&session->out, len);
806         oonf_stream_set_timeout(session, s_sock->config.session_timeout);
807       } else if (len < 0 && errno != EINTR && errno != EAGAIN && errno
808           != EWOULDBLOCK) {
809         OONF_WARN(LOG_STREAM, "Error while writing to communication stream with %s: %s (%d)\n",
810             netaddr_to_string(&buf, &session->remote_address), strerror(errno), errno);
811         session->state = STREAM_SESSION_CLEANUP;
812       }
813     } else {
814       OONF_DEBUG(LOG_STREAM, "  activating output in scheduler\n");
815       oonf_socket_set_write(&session->scheduler_entry, true);
816     }
817   }
818
819   if (abuf_getlen(&session->out) == 0) {
820     /* nothing to send anymore */
821     OONF_DEBUG(LOG_STREAM, "  deactivating output in scheduler\n");
822     oonf_socket_set_write(&session->scheduler_entry, false);
823     if (session->state == STREAM_SESSION_SEND_AND_QUIT) {
824       session->state = STREAM_SESSION_CLEANUP;
825     }
826   }
827
828   session->busy = false;
829   s_sock->busy = false;
830
831   /* end of connection ? */
832   if (session->state == STREAM_SESSION_CLEANUP || session->removed) {
833     OONF_DEBUG(LOG_STREAM, "  cleanup\n");
834
835     /* clean up connection by calling cleanup directly */
836     _stream_close(session);
837
838     /* session object will not be valid anymore after this point */
839   }
840
841   /* lazy socket removal */
842   if (s_sock->remove) {
843     oonf_stream_remove(s_sock, false);
844   }
845   return;
846 }
847
848 /**
849  * Callbacks for events on the interface
850  * @param l
851  * @param old
852  */
853 static void
854 _cb_interface_listener(struct oonf_interface_listener *l) {
855   struct oonf_stream_managed *managed;
856 #ifdef OONF_LOG_DEBUG_INFO
857   int result;
858 #endif
859
860   /* calculate managed socket for this event */
861   managed = container_of(l, struct oonf_stream_managed, _if_listener);
862
863 #ifdef OONF_LOG_DEBUG_INFO
864   result =
865 #endif
866       _apply_managed(managed);
867
868   OONF_DEBUG(LOG_STREAM,
869       "Result from interface triggered socket reconfiguration: %d", result);
870 }