09f4a579794405d224f33f131572cd21b6b7cfb4
[oonf.git] / src-plugins / subsystems / oonf_stream_socket.c
1
2 /*
3  * The olsr.org Optimized Link-State Routing daemon version 2 (olsrd2)
4  * Copyright (c) 2004-2013, the olsr.org team - see HISTORY file
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * * Redistributions of source code must retain the above copyright
12  *   notice, this list of conditions and the following disclaimer.
13  * * Redistributions in binary form must reproduce the above copyright
14  *   notice, this list of conditions and the following disclaimer in
15  *   the documentation and/or other materials provided with the
16  *   distribution.
17  * * Neither the name of olsr.org, olsrd nor the names of its
18  *   contributors may be used to endorse or promote products derived
19  *   from this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  *
34  * Visit http://www.olsr.org for more information.
35  *
36  * If you find this software useful feel free to make a donation
37  * to the project. For more information see the website or contact
38  * the copyright holders.
39  *
40  */
41
42 #include <string.h>
43 #include <stdlib.h>
44 #include <errno.h>
45
46 #include "common/autobuf.h"
47 #include "common/avl.h"
48 #include "common/list.h"
49 #include "core/oonf_logging.h"
50 #include "core/oonf_subsystem.h"
51 #include "subsystems/oonf_class.h"
52 #include "subsystems/oonf_interface.h"
53 #include "subsystems/oonf_socket.h"
54 #include "subsystems/oonf_timer.h"
55 #include "subsystems/oonf_stream_socket.h"
56 #include "subsystems/os_system.h"
57
58 /* Definitions */
59 #define LOG_STREAM _oonf_stream_socket_subsystem.logging
60
61 /* prototypes */
62 static int _init(void);
63 static void _cleanup(void);
64
65 static void _stream_close(struct oonf_stream_session *session);
66 int _apply_managed(struct oonf_stream_managed *managed);
67 static int _apply_managed_socket(int af_type, struct oonf_stream_managed *managed,
68     struct oonf_stream_socket *stream, struct os_interface_data *data);
69 static void _cb_parse_request(int fd, void *data, bool, bool);
70 static struct oonf_stream_session *_create_session(
71     struct oonf_stream_socket *stream_socket, int sock,
72     const struct netaddr *remote_addr,
73     const union netaddr_socket *remote_socket);
74 static void _cb_parse_connection(int fd, void *data, bool r,bool w);
75
76 static void _cb_timeout_handler(void *);
77 static void _cb_interface_listener(struct oonf_interface_listener *l);
78
79 /* list of olsr stream sockets */
80 static struct list_entity _stream_head;
81
82 /* server socket */
83 static struct oonf_class _connection_cookie = {
84   .name = "stream socket connection",
85   .size = sizeof(struct oonf_stream_session)
86 };
87
88 static struct oonf_timer_class _connection_timeout = {
89   .name = "stream socket timout",
90   .callback = _cb_timeout_handler,
91 };
92
93 /* subsystem definition */
94 static const char *_dependencies[] = {
95   OONF_CLASS_SUBSYSTEM,
96   OONF_INTERFACE_SUBSYSTEM,
97   OONF_SOCKET_SUBSYSTEM,
98   OONF_TIMER_SUBSYSTEM,
99   OONF_OS_SYSTEM_SUBSYSTEM,
100 };
101
102 static struct oonf_subsystem _oonf_stream_socket_subsystem = {
103   .name = OONF_STREAM_SUBSYSTEM,
104   .dependencies = _dependencies,
105   .dependencies_count = ARRAYSIZE(_dependencies),
106   .init = _init,
107   .cleanup = _cleanup,
108 };
109 DECLARE_OONF_PLUGIN(_oonf_stream_socket_subsystem);
110
111 /**
112  * Initialize the stream socket handlers
113  * @return always returns 0
114  */
115 static int
116 _init(void) {
117   oonf_class_add(&_connection_cookie);
118   oonf_timer_add(&_connection_timeout);
119   list_init_head(&_stream_head);
120   return 0;
121 }
122
123 /**
124  * Cleanup all resources allocated be stream socket handlers
125  */
126 static void
127 _cleanup(void) {
128   struct oonf_stream_socket *comport;
129
130   while (!list_is_empty(&_stream_head)) {
131     comport = list_first_element(&_stream_head, comport, _node);
132
133     oonf_stream_remove(comport, true);
134   }
135
136   oonf_class_remove(&_connection_cookie);
137   oonf_timer_remove(&_connection_timeout);
138 }
139
140 /**
141  * Flush all data in outgoing buffer of a stream socket
142  * @param con pointer to stream socket
143  */
144 void
145 oonf_stream_flush(struct oonf_stream_session *con) {
146   oonf_socket_set_write(&con->scheduler_entry, true);
147 }
148
149 /**
150  * Add a new stream socket to the scheduler
151  * @param stream_socket pointer to stream socket struct with
152  *   initialized config
153  * @param local pointer to local ip/port of socket, port must be 0 if
154  *   this shall be an outgoing socket
155  * @return -1 if an error happened, 0 otherwise
156  */
157 int
158 oonf_stream_add(struct oonf_stream_socket *stream_socket,
159     const union netaddr_socket *local) {
160   int s = -1;
161   struct netaddr_str buf;
162
163   /* server socket not necessary for outgoing connections */
164   if (netaddr_socket_get_port(local) != 0) {
165     /* Init socket */
166     s = os_socket_getsocket(local, true, 0, NULL, LOG_STREAM);
167     if (s < 0) {
168       goto add_stream_error;
169     }
170
171     /* show that we are willing to listen */
172     if (os_socket_listen(s, 1) == -1) {
173       OONF_WARN(LOG_STREAM, "tcp socket listen failed for %s: %s (%d)\n",
174           netaddr_socket_to_string(&buf, local), strerror(errno), errno);
175       goto add_stream_error;
176     }
177
178     stream_socket->scheduler_entry.fd = s;
179     stream_socket->scheduler_entry.process = _cb_parse_request;
180     stream_socket->scheduler_entry.data = stream_socket;
181     stream_socket->scheduler_entry.event_read = true;
182
183     oonf_socket_add(&stream_socket->scheduler_entry);
184   }
185   memcpy(&stream_socket->local_socket, local, sizeof(stream_socket->local_socket));
186
187   if (stream_socket->config.memcookie == NULL) {
188     stream_socket->config.memcookie = &_connection_cookie;
189   }
190   if (stream_socket->config.allowed_sessions == 0) {
191     stream_socket->config.allowed_sessions = 10;
192   }
193   if (stream_socket->config.maximum_input_buffer == 0) {
194     stream_socket->config.maximum_input_buffer = 65536;
195   }
196
197   list_init_head(&stream_socket->session);
198   list_add_tail(&_stream_head, &stream_socket->_node);
199
200   return 0;
201
202 add_stream_error:
203   if (stream_socket->scheduler_entry.fd) {
204     oonf_socket_remove(&stream_socket->scheduler_entry);
205   }
206   if (s != -1) {
207     os_socket_close(s);
208   }
209   return -1;
210 }
211
212 /**
213  * Remove a stream socket from the scheduler
214  * @param stream_socket pointer to socket
215  * @param force true if socket will be closed immediately,
216  *   false if scheduler should wait until outgoing buffers are empty
217  */
218 void
219 oonf_stream_remove(struct oonf_stream_socket *stream_socket, bool force) {
220   if (stream_socket->busy && !force) {
221     stream_socket->remove = true;
222     return;
223   }
224
225   if (!list_is_node_added(&stream_socket->_node)) {
226     return;
227   }
228
229   oonf_stream_close_all_sessions(stream_socket);
230   list_remove(&stream_socket->_node);
231
232   if (stream_socket->scheduler_entry.fd) {
233     /* only for server sockets */
234     os_socket_close(stream_socket->scheduler_entry.fd);
235     oonf_socket_remove(&stream_socket->scheduler_entry);
236   }
237 }
238
239 /**
240  * Closes all client connections of a stream socket, does not close the local
241  * socket itself.
242  * @param stream_socket stream socket
243  */
244 void
245 oonf_stream_close_all_sessions(struct oonf_stream_socket *stream_socket) {
246   struct oonf_stream_session *session, *ptr;
247
248   if (!list_is_node_added(&stream_socket->_node)) {
249     return;
250   }
251
252   list_for_each_element_safe(&stream_socket->session, session, node, ptr) {
253     if (abuf_getlen(&session->out) == 0 && !session->busy) {
254       /* close everything that doesn't need to send data anymore */
255       oonf_stream_close(session);
256     }
257   }
258   return;
259 }
260
261 /**
262  * Create an outgoing stream socket.
263  * @param stream_socket pointer to stream socket
264  * @param remote pointer to address of remote TCP server
265  * @return pointer to stream session, NULL if an error happened.
266  */
267 struct oonf_stream_session *
268 oonf_stream_connect_to(struct oonf_stream_socket *stream_socket,
269     const union netaddr_socket *remote) {
270   struct oonf_stream_session *session;
271   struct netaddr remote_addr;
272   bool wait_for_connect = false;
273   int s;
274   struct netaddr_str nbuf1;
275 #ifdef OONF_LOG_DEBUG_INFO
276   struct netaddr_str nbuf2;
277 #endif
278
279   OONF_DEBUG(LOG_STREAM, "Connect TCP socket from %s to %s",
280       netaddr_socket_to_string(&nbuf1, &stream_socket->local_socket),
281       netaddr_socket_to_string(&nbuf2, remote));
282
283   s = os_socket_getsocket(&stream_socket->local_socket,
284       true, 0, NULL, LOG_STREAM);
285   if (s < 0) {
286     return NULL;
287   }
288
289   if (os_socket_connect(s, remote)) {
290     if (errno != EINPROGRESS) {
291       OONF_WARN(LOG_STREAM, "Cannot connect outgoing tcp connection to %s: %s (%d)",
292           netaddr_socket_to_string(&nbuf1, remote), strerror(errno), errno);
293       goto connect_to_error;
294     }
295     wait_for_connect = true;
296   }
297
298   netaddr_from_socket(&remote_addr, remote);
299   session = _create_session(stream_socket, s, &remote_addr, remote);
300   if (session) {
301     session->wait_for_connect = wait_for_connect;
302     return session;
303   }
304
305   /* fall through */
306 connect_to_error:
307   if (s) {
308     os_socket_close(s);
309   }
310   return NULL;
311 }
312
313 /**
314  * Reset the session timeout of a TCP session
315  * @param con pointer to stream session
316  * @param timeout timeout in milliseconds
317  */
318 void
319 oonf_stream_set_timeout(struct oonf_stream_session *con, uint64_t timeout) {
320   oonf_timer_set(&con->timeout, timeout);
321 }
322
323 /**
324  * Close a TCP stream session
325  * @param session pointer to stream session
326  * @param force true if the session should be closed instantly,
327  *   false if all data in queue should still be sent
328  */
329 void
330 oonf_stream_close(struct oonf_stream_session *session) {
331   if (session->busy) {
332     /* remove the session later */
333     session->removed = true;
334     return;
335   }
336   _stream_close (session);
337 }
338
339 /**
340  * Initialized a managed TCP stream
341  * @param managed pointer to initialized managed stream
342  */
343 void
344 oonf_stream_add_managed(struct oonf_stream_managed *managed) {
345   if (managed->config.allowed_sessions == 0) {
346     managed->config.allowed_sessions = 10;
347   }
348   if (managed->config.maximum_input_buffer == 0) {
349     managed->config.maximum_input_buffer = 65536;
350   }
351   if (managed->config.session_timeout == 0) {
352     managed->config.session_timeout = 120000;
353   }
354
355   managed->_if_listener.process = _cb_interface_listener;
356   managed->_if_listener.name = managed->_managed_config.interface;
357 }
358
359 /**
360  * Apply a configuration to a stream. Will reset both ACLs
361  * and socket ports/bindings.
362  * @param managed pointer to managed stream
363  * @param config pointer to stream config
364  * @return -1 if an error happened, 0 otherwise.
365  */
366 int
367 oonf_stream_apply_managed(struct oonf_stream_managed *managed,
368     struct oonf_stream_managed_config *config) {
369   bool if_changed;
370
371   if_changed = strcmp(config->interface, managed->_managed_config.interface) != 0;
372
373   oonf_stream_copy_managed_config(&managed->_managed_config, config);
374
375   /* set back pointers */
376   managed->socket_v4.managed = managed;
377   managed->socket_v6.managed = managed;
378
379   /* handle change in interface listener */
380   if (if_changed) {
381     /* interface changed, remove old listener if necessary */
382     oonf_interface_remove_listener(&managed->_if_listener);
383
384     if (managed->_managed_config.interface[0]) {
385       /* create new interface listener */
386       oonf_interface_add_listener(&managed->_if_listener);
387     }
388   }
389
390   OONF_DEBUG(LOG_STREAM, "Apply changes for managed socket (if %s) with port %d",
391       config->interface == NULL || config->interface[0] == 0 ? "any" : config->interface,
392       config->port);
393
394   return _apply_managed(managed);
395 }
396
397 /**
398  * Remove a managed TCP stream
399  * @param managed pointer to managed stream
400  * @param force true if socket will be closed immediately,
401  *   false if scheduler should wait until outgoing buffers are empty
402  */
403 void
404 oonf_stream_remove_managed(struct oonf_stream_managed *managed, bool force) {
405   oonf_interface_remove_listener(&managed->_if_listener);
406
407   oonf_stream_remove(&managed->socket_v4, force);
408   oonf_stream_remove(&managed->socket_v6, force);
409   oonf_interface_remove_listener(&managed->_if_listener);
410   oonf_stream_free_managed_config(&managed->_managed_config);
411 }
412
413 /**
414  * Closes all connections of a managed socket, but not the socket itself
415  * @param managed managed stream socket
416  */
417 void
418 oonf_stream_close_all_managed_sessions(struct oonf_stream_managed *managed) {
419   oonf_stream_close_all_sessions(&managed->socket_v4);
420   oonf_stream_close_all_sessions(&managed->socket_v6);
421 }
422
423 /**
424  * Free dynamically allocated parts of managed stream configuration
425  * @param config packet configuration
426  */
427 void
428 oonf_stream_free_managed_config(struct oonf_stream_managed_config *config) {
429   netaddr_acl_remove(&config->acl);
430   netaddr_acl_remove(&config->bindto);
431 }
432
433 /**
434  * copies a stream managed configuration object
435  * @param dst Destination
436  * @param src Source
437  */
438 void
439 oonf_stream_copy_managed_config(struct oonf_stream_managed_config *dst,
440     struct oonf_stream_managed_config *src) {
441   oonf_stream_free_managed_config(dst);
442
443   memcpy(dst, src, sizeof(*dst));
444
445   memset(&dst->acl, 0, sizeof(dst->acl));
446   netaddr_acl_copy(&dst->acl, &src->acl);
447
448   memset(&dst->bindto, 0, sizeof(dst->bindto));
449   netaddr_acl_copy(&dst->bindto, &src->bindto);
450 }
451
452 /**
453  * Close a TCP stream
454  * @param session tcp stream session
455  */
456 static void
457 _stream_close(struct oonf_stream_session *session) {
458   if (!list_is_node_added(&session->node)) {
459     return;
460   }
461
462   if (session->comport->config.cleanup) {
463     session->comport->config.cleanup(session);
464   }
465
466   oonf_timer_stop(&session->timeout);
467
468   session->comport->config.allowed_sessions++;
469   list_remove(&session->node);
470
471   os_socket_close(session->scheduler_entry.fd);
472   oonf_socket_remove(&session->scheduler_entry);
473
474   abuf_free(&session->in);
475   abuf_free(&session->out);
476
477   oonf_class_free(session->comport->config.memcookie, session);
478 }
479
480 /**
481  * Apply the stored settings of a managed socket
482  * @param managed pointer to managed stream
483  * @return -1 if an error happened, 0 otherwise
484  */
485 int
486 _apply_managed(struct oonf_stream_managed *managed) {
487   struct os_interface_data *data = NULL;
488
489   /* get interface */
490   if (managed->_if_listener.interface) {
491     data = &managed->_if_listener.interface->data;
492   }
493
494   if (_apply_managed_socket(AF_INET, managed, &managed->socket_v4, data)) {
495     return -1;
496   }
497
498   if (os_system_is_ipv6_supported()) {
499     if (_apply_managed_socket(AF_INET6, managed, &managed->socket_v6, data)) {
500       return -1;
501     }
502   }
503   return 0;
504 }
505
506 /**
507  * Apply new configuration to a managed stream socket
508  * @param af_type address type to bind socket to
509  * @param managed pointer to managed stream
510  * @param stream pointer to TCP stream to configure
511  * @return -1 if an error happened, 0 otherwise.
512  */
513 static int
514 _apply_managed_socket(int af_type, struct oonf_stream_managed *managed,
515     struct oonf_stream_socket *stream, struct os_interface_data *data) {
516   struct netaddr_acl *bind_ip_acl;
517   const struct netaddr *bind_ip;
518   union netaddr_socket sock;
519   struct netaddr_str buf;
520
521   bind_ip_acl = &managed->_managed_config.bindto;
522
523   /* Get address the unicast socket should bind on */
524   if (data != NULL && !data->up) {
525     bind_ip = NULL;
526   }
527   else if (data != NULL && netaddr_get_address_family(data->linklocal_v6_ptr) == af_type &&
528       netaddr_acl_check_accept(bind_ip_acl, data->linklocal_v6_ptr)) {
529
530     bind_ip = data->linklocal_v6_ptr;
531   }
532   else {
533     bind_ip = oonf_interface_get_bindaddress(af_type, bind_ip_acl, data);
534   }
535   if (!bind_ip) {
536     oonf_stream_remove(stream, true);
537     return 0;
538   }
539   if (netaddr_socket_init(&sock, bind_ip, managed->_managed_config.port,
540       data == NULL ? 0 : data->index)) {
541     OONF_WARN(LOG_STREAM, "Cannot create managed socket address: %s/%u",
542         netaddr_to_string(&buf, bind_ip), managed->_managed_config.port);
543     return -1;
544   }
545
546   if (list_is_node_added(&stream->scheduler_entry._node)) {
547     if (memcmp(&sock, &stream->local_socket, sizeof(sock)) == 0) {
548       /* nothing changed */
549       return 0;
550     }
551
552     oonf_stream_remove(stream, true);
553   }
554   if (oonf_stream_add(stream, &sock)) {
555     return -1;
556   }
557
558   /* copy configuration */
559   memcpy(&stream->config, &managed->config, sizeof(stream->config));
560   if (stream->config.memcookie == NULL) {
561     stream->config.memcookie = &_connection_cookie;
562   }
563   return 0;
564 }
565
566 /**
567  * Handle incoming server socket event from socket scheduler.
568  * @param fd filedescriptor for event
569  * @param data custom user data
570  * @param event_read true if read-event is incoming
571  * @param event_write true if write-event is incoming
572  */
573 static void
574 _cb_parse_request(int fd, void *data, bool event_read,
575     bool event_write __attribute__((unused))) {
576   struct oonf_stream_socket *stream;
577   union netaddr_socket remote_socket;
578   struct netaddr remote_addr;
579   socklen_t addrlen;
580   int sock;
581 #ifdef OONF_LOG_DEBUG_INFO
582   struct netaddr_str buf1, buf2;
583 #endif
584
585   if (!event_read) {
586     return;
587   }
588
589   stream = data;
590
591   addrlen = sizeof(remote_socket);
592   sock = accept(fd, &remote_socket.std, &addrlen);
593   if (sock < 0) {
594     OONF_WARN(LOG_STREAM, "accept() call returned error: %s (%d)", strerror(errno), errno);
595     return;
596   }
597
598   netaddr_from_socket(&remote_addr, &remote_socket);
599   if (stream->config.acl) {
600     if (!netaddr_acl_check_accept(stream->config.acl, &remote_addr)) {
601       OONF_DEBUG(LOG_STREAM, "Access from %s to socket %s blocked because of ACL",
602           netaddr_to_string(&buf1, &remote_addr),
603           netaddr_socket_to_string(&buf2, &stream->local_socket));
604       close(sock);
605       return;
606     }
607   }
608   _create_session(stream, sock, &remote_addr, &remote_socket);
609 }
610
611 /**
612  * Configure a TCP session socket
613  * @param stream_socket pointer to stream socket
614  * @param sock pointer to socket filedescriptor
615  * @param remote_addr pointer to remote address
616  * @return pointer to new stream session, NULL if an error happened.
617  */
618 static struct oonf_stream_session *
619 _create_session(struct oonf_stream_socket *stream_socket,
620     int sock, const struct netaddr *remote_addr,
621     const union netaddr_socket *remote_socket) {
622   struct oonf_stream_session *session;
623 #ifdef OONF_LOG_DEBUG_INFO
624   struct netaddr_str buf;
625 #endif
626
627   /* put socket into non-blocking mode */
628   if (os_socket_set_nonblocking(sock)) {
629     OONF_WARN(LOG_STREAM, "Cannot read comport socket status: %s (%d)",
630         strerror(errno), errno);
631     return NULL;
632   }
633
634   session = oonf_class_malloc(stream_socket->config.memcookie);
635   if (session == NULL) {
636     OONF_WARN(LOG_STREAM, "Cannot allocate memory for comport session");
637     return NULL;
638   }
639
640   if (abuf_init(&session->in)) {
641     OONF_WARN(LOG_STREAM, "Cannot allocate memory for comport session");
642     goto parse_request_error;
643   }
644   if (abuf_init(&session->out)) {
645     OONF_WARN(LOG_STREAM, "Cannot allocate memory for comport session");
646     goto parse_request_error;
647   }
648
649   session->scheduler_entry.fd = sock;
650   session->scheduler_entry.process = _cb_parse_connection;
651   session->scheduler_entry.data = session;
652   session->scheduler_entry.event_read = true;
653   session->scheduler_entry.event_write = true;
654   oonf_socket_add(&session->scheduler_entry);
655
656   session->send_first = stream_socket->config.send_first;
657   session->comport = stream_socket;
658
659   session->remote_address = *remote_addr;
660   session->remote_socket = *remote_socket;
661
662   if (stream_socket->config.allowed_sessions-- > 0) {
663     /* create active session */
664     session->state = STREAM_SESSION_ACTIVE;
665   } else {
666     /* too many sessions */
667     if (stream_socket->config.create_error) {
668       stream_socket->config.create_error(session, STREAM_SERVICE_UNAVAILABLE);
669     }
670     session->state = STREAM_SESSION_SEND_AND_QUIT;
671   }
672
673   session->timeout.cb_context = session;
674   session->timeout.class = &_connection_timeout;
675   if (stream_socket->config.session_timeout) {
676     oonf_timer_start(&session->timeout, stream_socket->config.session_timeout);
677   }
678
679   if (stream_socket->config.init) {
680     if (stream_socket->config.init(session)) {
681       goto parse_request_error;
682     }
683   }
684
685   OONF_DEBUG(LOG_STREAM, "Got connection through socket %d with %s.\n",
686       sock, netaddr_to_string(&buf, remote_addr));
687
688   list_add_tail(&stream_socket->session, &session->node);
689   return session;
690
691 parse_request_error:
692   abuf_free(&session->in);
693   abuf_free(&session->out);
694   oonf_class_free(stream_socket->config.memcookie, session);
695
696   return NULL;
697 }
698
699 /**
700  * Handle TCP session timeout
701  * @param data custom data
702  */
703 static void
704 _cb_timeout_handler(void *data) {
705   struct oonf_stream_session *session = data;
706   oonf_stream_close(session);
707 }
708
709 /**
710  * Handle events for TCP session from network scheduler
711  * @param fd filedescriptor of TCP session
712  * @param data custom data
713  * @param event_read true if read-event is incoming
714  * @param event_write true if write-event is incoming
715  */
716 static void
717 _cb_parse_connection(int fd, void *data, bool event_read, bool event_write) {
718   struct oonf_stream_session *session;
719   struct oonf_stream_socket *s_sock;
720   int len;
721   char buffer[1024];
722   struct netaddr_str buf;
723
724   session = data;
725   s_sock = session->comport;
726
727   OONF_DEBUG(LOG_STREAM, "Parsing connection of socket %d\n", fd);
728
729   /* mark session and s_sock as busy */
730   session->busy = true;
731   s_sock->busy = true;
732
733   if (session->wait_for_connect) {
734     if (event_write) {
735       int value;
736
737       if(os_socket_get_socket_error(fd, &value)) {
738         OONF_WARN(LOG_STREAM, "getsockopt failed: %s (%d)",
739             strerror(errno), errno);
740         session->state = STREAM_SESSION_CLEANUP;
741       }
742       else if (value != 0) {
743         OONF_WARN(LOG_STREAM, "Connection to %s failed: %s (%d)",
744             netaddr_socket_to_string(&buf, &session->remote_socket),
745             strerror(value), value);
746         session->state = STREAM_SESSION_CLEANUP;
747       }
748       else {
749         session->wait_for_connect = false;
750       }
751     }
752   }
753
754   if (session->wait_for_connect) {
755     session->busy = false;
756     s_sock->busy = false;
757     return;
758   }
759
760   /* read data if necessary */
761   if (session->state == STREAM_SESSION_ACTIVE && event_read) {
762     len = os_socket_recvfrom(fd, buffer, sizeof(buffer), NULL, 0);
763     if (len > 0) {
764       OONF_DEBUG(LOG_STREAM, "  recv returned %d\n", len);
765       if (abuf_memcpy(&session->in, buffer, len)) {
766         /* out of memory */
767         OONF_WARN(LOG_STREAM, "Out of memory for comport session input buffer");
768         session->state = STREAM_SESSION_CLEANUP;
769       } else if (abuf_getlen(&session->in) > s_sock->config.maximum_input_buffer) {
770         /* input buffer overflow */
771         if (s_sock->config.create_error) {
772           s_sock->config.create_error(session, STREAM_REQUEST_TOO_LARGE);
773         }
774         session->state = STREAM_SESSION_SEND_AND_QUIT;
775       } else {
776         /* got new input block, reset timeout */
777         oonf_stream_set_timeout(session, s_sock->config.session_timeout);
778       }
779     } else if (len < 0 && errno != EINTR && errno != EAGAIN && errno
780         != EWOULDBLOCK) {
781       /* error during read */
782       OONF_WARN(LOG_STREAM, "Error while reading from communication stream with %s: %s (%d)\n",
783           netaddr_to_string(&buf, &session->remote_address), strerror(errno), errno);
784       session->state = STREAM_SESSION_CLEANUP;
785     } else if (len == 0) {
786       /* external s_sock closed */
787       session->state = STREAM_SESSION_SEND_AND_QUIT;
788     }
789   }
790
791   if (session->state == STREAM_SESSION_ACTIVE && s_sock->config.receive_data != NULL
792       && (abuf_getlen(&session->in) > 0 || session->send_first)) {
793     session->state = s_sock->config.receive_data(session);
794     session->send_first = false;
795   }
796
797   /* send data if necessary */
798   if (session->state != STREAM_SESSION_CLEANUP && abuf_getlen(&session->out) > 0) {
799     if (event_write) {
800       len = os_socket_sendto(fd, abuf_getptr(&session->out), abuf_getlen(&session->out), NULL, false);
801
802       if (len > 0) {
803         OONF_DEBUG(LOG_STREAM, "  send returned %d\n", len);
804         abuf_pull(&session->out, len);
805         oonf_stream_set_timeout(session, s_sock->config.session_timeout);
806       } else if (len < 0 && errno != EINTR && errno != EAGAIN && errno
807           != EWOULDBLOCK) {
808         OONF_WARN(LOG_STREAM, "Error while writing to communication stream with %s: %s (%d)\n",
809             netaddr_to_string(&buf, &session->remote_address), strerror(errno), errno);
810         session->state = STREAM_SESSION_CLEANUP;
811       }
812     } else {
813       OONF_DEBUG(LOG_STREAM, "  activating output in scheduler\n");
814       oonf_socket_set_write(&session->scheduler_entry, true);
815     }
816   }
817
818   if (abuf_getlen(&session->out) == 0) {
819     /* nothing to send anymore */
820     OONF_DEBUG(LOG_STREAM, "  deactivating output in scheduler\n");
821     oonf_socket_set_write(&session->scheduler_entry, false);
822     if (session->state == STREAM_SESSION_SEND_AND_QUIT) {
823       session->state = STREAM_SESSION_CLEANUP;
824     }
825   }
826
827   session->busy = false;
828   s_sock->busy = false;
829
830   /* end of connection ? */
831   if (session->state == STREAM_SESSION_CLEANUP || session->removed) {
832     OONF_DEBUG(LOG_STREAM, "  cleanup\n");
833
834     /* clean up connection by calling cleanup directly */
835     _stream_close(session);
836
837     /* session object will not be valid anymore after this point */
838   }
839
840   /* lazy socket removal */
841   if (s_sock->remove) {
842     oonf_stream_remove(s_sock, false);
843   }
844   return;
845 }
846
847 /**
848  * Callbacks for events on the interface
849  * @param l
850  * @param old
851  */
852 static void
853 _cb_interface_listener(struct oonf_interface_listener *l) {
854   struct oonf_stream_managed *managed;
855 #ifdef OONF_LOG_DEBUG_INFO
856   int result;
857 #endif
858
859   /* calculate managed socket for this event */
860   managed = container_of(l, struct oonf_stream_managed, _if_listener);
861
862 #ifdef OONF_LOG_DEBUG_INFO
863   result =
864 #endif
865       _apply_managed(managed);
866
867   OONF_DEBUG(LOG_STREAM,
868       "Result from interface triggered socket reconfiguration: %d", result);
869 }