Make the socket parser more general
authorBernd Petrovitsch <bernd@firmix.at>
Mon, 6 Oct 2008 23:35:06 +0000 (01:35 +0200)
committerBernd Petrovitsch <bernd@firmix.at>
Mon, 6 Oct 2008 23:35:06 +0000 (01:35 +0200)
The general goal is to get to a flexible central event loop to allow
decoupling of long time actions (e.g. a slow browser over a slow
connection) from the rest.

We extend the central select() loop to get an additional select() for
file descriptors where the data will be handled immediately.
We do this via an additional callback function (as to not disturb the
current ones).
The caller must also specify, if he wants to be called on a possible
read andor write on the "immediate" and/or the "pollrate" callback
function.
While we're at it, we add a "void *" which is passed back to the caller.
So the caller can actually register more than one instance on different
file descriptors and differentiate the instances via that pointer. E.g.
the httpinfo plugin is now able to really accept more than one client
connection at the same time.

12 files changed:
lib/dot_draw/src/olsrd_dot_draw.c
lib/httpinfo/src/olsrd_httpinfo.c
lib/pgraph/src/olsrd_pgraph.c
lib/txtinfo/src/olsrd_txtinfo.c
src/ipc_frontend.c
src/main.c
src/parser.c
src/parser.h
src/socket_parser.c
src/socket_parser.h
src/unix/ifnet.c
src/win32/ifnet.c

index 5b6adc0..bb0e447 100644 (file)
@@ -102,7 +102,7 @@ static int
 pcf_event(int, int, int);
 
 static void
-ipc_action(int);
+ipc_action(int, void *, unsigned int);
 
 static void
 ipc_print_neigh_link(const struct neighbor_entry *neighbor);
@@ -254,14 +254,14 @@ plugin_ipc_init(void)
 #if 0
   printf("Adding socket with olsrd\n");
 #endif
-  add_olsr_socket(ipc_socket, &ipc_action);
+  add_olsr_socket(ipc_socket, &ipc_action, NULL, NULL, SP_PR_READ);
 
   return 1;
 }
 
 
 static void
-ipc_action(int fd __attribute__((unused)))
+ipc_action(int fd __attribute__((unused)), void *data __attribute__((unused)), unsigned int flags __attribute__((unused)))
 {
   struct sockaddr_in pin;
   socklen_t addrlen = sizeof(struct sockaddr_in);
index f41f196..4b382b3 100644 (file)
@@ -166,7 +166,7 @@ static int get_http_socket(int);
 
 static int build_tabs(char *, olsr_u32_t, int);
 
-static void parse_http_request(int);
+static void parse_http_request(int, void *, unsigned int flags);
 
 static int build_http_header(http_header_type, olsr_bool, olsr_u32_t, char *, olsr_u32_t);
 
@@ -316,14 +316,14 @@ olsrd_plugin_init(void)
   }
 
   /* Register socket */
-  add_olsr_socket(http_socket, &parse_http_request);
+  add_olsr_socket(http_socket, &parse_http_request, NULL, NULL, SP_PR_READ);
 
   return 1;
 }
 
 /* Non reentrant - but we are not multithreaded anyway */
-void
-parse_http_request(int fd)
+static void
+parse_http_request(int fd, void *data __attribute__((unused)), unsigned int flags __attribute__((unused)))
 {
   struct sockaddr_in pin;
   socklen_t addrlen;
index b0a7a73..a8ed2c6 100644 (file)
@@ -138,7 +138,7 @@ void olsrd_get_plugin_parameters(const struct olsrd_plugin_parameters **params,
 /* Event function to register with the sceduler */
 static int pcf_event(int, int, int);
 
-static void ipc_action(int);
+static void ipc_action(int, void *, unsigned int);
 
 static void ipc_print_neigh_link(struct neighbor_entry *neighbor);
 
@@ -261,13 +261,13 @@ static int plugin_ipc_init(void)
 
 
       /* Register with olsrd */
-      add_olsr_socket(ipc_socket, &ipc_action);
+      add_olsr_socket(ipc_socket, &ipc_action, NULL, NULL, SP_PR_READ);
     }
 
   return 1;
 }
 
-static void ipc_action(int fd __attribute__((unused)))
+static void ipc_action(int fd __attribute__((unused)), void *data __attribute__((unused)), unsigned int flags __attribute__((unused)))
 {
   struct sockaddr_in pin;
   socklen_t addrlen;
index 05238ac..3774e4e 100644 (file)
@@ -98,7 +98,7 @@ static int plugin_ipc_init(void);
 
 static void send_info(int send_what);
 
-static void ipc_action(int);
+static void ipc_action(int, void *, unsigned int);
 
 static void ipc_print_neigh(void);
 
@@ -224,7 +224,7 @@ plugin_ipc_init(void)
         }
 
         /* Register with olsrd */
-        add_olsr_socket(ipc_socket, &ipc_action);
+        add_olsr_socket(ipc_socket, &ipc_action, NULL, NULL, SP_PR_READ);
         
 #ifndef NODEBUG
         olsr_printf(2, "(TXTINFO) listening on port %d\n",ipc_port);
@@ -235,7 +235,7 @@ plugin_ipc_init(void)
 }
 
 
-static void ipc_action(int fd)
+static void ipc_action(int fd, void *data __attribute__((unused)), unsigned int flags __attribute__((unused)))
 {
     struct sockaddr_storage pin;
     struct sockaddr_in *addr4;
index b1fb1e9..5d8ade4 100644 (file)
@@ -111,7 +111,7 @@ static int
 ipc_send_net_info(int fd);
 
 static void
-ipc_accept(int);
+ipc_accept(int, void *, unsigned int);
 
 #if 0
 static int
@@ -178,7 +178,7 @@ ipc_init(void)
   }
 
   /* Register the socket with the socket parser */
-  add_olsr_socket(ipc_sock, &ipc_accept);
+  add_olsr_socket(ipc_sock, &ipc_accept, NULL, NULL, SP_PR_READ);
 
   return ipc_sock;
 }
@@ -203,15 +203,13 @@ ipc_check_allowed_ip(const union olsr_ip_addr *addr)
 }
 
 static void
-ipc_accept(int fd)
+ipc_accept(int fd, void *data __attribute__((unused)), unsigned int flags __attribute__((unused)))
 {
-  socklen_t addrlen;
   struct sockaddr_in pin;
   char *addr;  
+  socklen_t addrlen = sizeof (struct sockaddr_in);
 
-
-  addrlen = sizeof (struct sockaddr_in);
-  ipc_conn = accept(fd, (struct sockaddr *)  &pin, &addrlen);
+  ipc_conn = accept(fd, (struct sockaddr *)&pin, &addrlen);
   if (ipc_conn == -1)
     {
       perror("IPC accept");
index 106c225..b04b88b 100644 (file)
@@ -271,11 +271,7 @@ main(int argc, char *argv[])
 #endif
 
   /*
-<<<<<<< local
    * socket for ioctl calls
-=======
-   *socket for ioctl calls
->>>>>>> other
    */
   olsr_cnf->ioctl_s = socket(olsr_cnf->ip_version, SOCK_DGRAM, 0);
   if (olsr_cnf->ioctl_s < 0) {
index bf4d4ce..ab5302e 100644 (file)
@@ -461,7 +461,7 @@ void parse_packet(struct olsr *olsr, int size, struct interface *in_if, union ol
  *@return nada
  */
 void
-olsr_input(int fd) {
+olsr_input(int fd, void *data __attribute__((unused)), unsigned int flags __attribute__((unused))) {
 
   struct interface *olsr_in_if;
   union olsr_ip_addr from_addr;
@@ -554,7 +554,7 @@ olsr_input(int fd) {
  *@param fd the filedescriptor that data should be read from.
  *@return nada
  */
-void olsr_input_hostemu(int fd) {
+void olsr_input_hostemu(int fd, void *data __attribute__((unused)), unsigned int flags __attribute__((unused))) {
   /* sockaddr_in6 is bigger than sockaddr !!!! */
   struct sockaddr_storage from;
   socklen_t fromlen;
index 5a7de44..0e9d9c9 100644 (file)
@@ -82,10 +82,10 @@ void
 olsr_init_parser(void);
 
 void 
-olsr_input(int);
+olsr_input(int, void *, unsigned int);
 
 void
-olsr_input_hostemu(int);
+olsr_input_hostemu(int, void *, unsigned int);
 
 void
 olsr_parser_add_function(parse_function, olsr_u32_t, int);
index 75dabfd..e8bd570 100644 (file)
@@ -38,8 +38,6 @@
  *
  */
 
-#include <unistd.h>
-
 #include "socket_parser.h"
 #include "scheduler.h"
 #include "olsr.h"
@@ -60,8 +58,7 @@
 #define strerror(x) StrError(x)
 #endif
 
-
-struct olsr_socket_entry *olsr_socket_entries;
+static struct olsr_socket_entry *olsr_socket_entries = NULL;
 
 /**
  * Add a socket and handler to the socketset
@@ -72,20 +69,23 @@ struct olsr_socket_entry *olsr_socket_entries;
  *@param pf the processing function
  */
 void
-add_olsr_socket(int fd, socket_handler_func  pf)
+add_olsr_socket(int fd, socket_handler_func pf_pr, socket_handler_func pf_imm, void *data, unsigned int flags)
 {
   struct olsr_socket_entry *new_entry;
 
-  if (fd == 0 || pf == NULL) {
-    fprintf(stderr, "Bogus socket entry - not registering...\n");
+  if (fd < 0 || (pf_pr == NULL && pf_imm == NULL)) {
+    olsr_syslog(OLSR_LOG_ERR, "%s: Bogus socket entry - not registering...", __func__);
     return;
   }
   OLSR_PRINTF(2, "Adding OLSR socket entry %d\n", fd);
 
-  new_entry = olsr_malloc(sizeof(struct olsr_socket_entry), "Socket entry");
+  new_entry = olsr_malloc(sizeof(*new_entry), "Socket entry");
 
   new_entry->fd = fd;
-  new_entry->process_function = pf;
+  new_entry->process_immediate = pf_imm;
+  new_entry->process_pollrate = pf_pr;
+  new_entry->data = data;
+  new_entry->flags = flags;
 
   /* Queue */
   new_entry->next = olsr_socket_entries;
@@ -101,12 +101,12 @@ add_olsr_socket(int fd, socket_handler_func  pf)
  *@param pf the processing function
  */
 int
-remove_olsr_socket(int fd, socket_handler_func pf)
+remove_olsr_socket(int fd, socket_handler_func pf_pr, socket_handler_func pf_imm)
 {
   struct olsr_socket_entry *entry, *prev_entry;
 
-  if (fd == 0 || pf == NULL) {
-    olsr_syslog(OLSR_LOG_ERR, "Bogus socket entry - not processing...\n");
+  if (fd < 0 || (pf_pr == NULL && pf_imm == NULL)) {
+    olsr_syslog(OLSR_LOG_ERR, "%s: Bogus socket entry - not processing...", __func__);
     return 0;
   }
   OLSR_PRINTF(1, "Removing OLSR socket entry %d\n", fd);
@@ -114,7 +114,7 @@ remove_olsr_socket(int fd, socket_handler_func pf)
   for (entry = olsr_socket_entries, prev_entry = NULL;
        entry != NULL;
        prev_entry = entry, entry = entry->next) {
-    if (entry->fd == fd && entry->process_function == pf) {
+    if (entry->fd == fd && entry->process_immediate == pf_imm && entry->process_pollrate == pf_pr) {
       if (prev_entry == NULL) {
        olsr_socket_entries = entry->next;
       } else {
@@ -132,10 +132,10 @@ void
 olsr_poll_sockets(void)
 {
   int n;
-  struct olsr_socket_entry *olsr_sockets;
-  fd_set ibits;
+  struct olsr_socket_entry *entry;
+  fd_set ibits, obits;
   struct timeval tvp = { 0, 0 };
-  int hfd = 0;
+  int hfd = 0, fdsets = 0;
   const char * err_msg;
   /* If there are no registered sockets we
    * do not call select(2)
@@ -145,37 +145,67 @@ olsr_poll_sockets(void)
   }
 
   FD_ZERO(&ibits);
+  FD_ZERO(&obits);
   
   /* Adding file-descriptors to FD set */
-  for (olsr_sockets = olsr_socket_entries; olsr_sockets; olsr_sockets = olsr_sockets->next) {
-    FD_SET((unsigned int)olsr_sockets->fd, &ibits); /* And we cast here since we get a warning on Win32 */    
-    if (olsr_sockets->fd >= hfd) {
-      hfd = olsr_sockets->fd + 1;
+  for (entry = olsr_socket_entries; entry != NULL; entry = entry->next) {
+    if (entry->process_pollrate == NULL) {
+      continue;
+    }
+    if ((entry->flags & SP_PR_READ) != 0) {
+      fdsets |= SP_PR_READ;
+      FD_SET((unsigned int)entry->fd, &ibits); /* And we cast here since we get a warning on Win32 */    
+    }
+    if ((entry->flags & SP_PR_WRITE) != 0) {
+      fdsets |= SP_PR_WRITE;
+      FD_SET((unsigned int)entry->fd, &obits); /* And we cast here since we get a warning on Win32 */    
+    }
+    if ((entry->flags & (SP_PR_READ|SP_PR_READ)) != 0) {
+      if (entry->fd >= hfd) {
+       hfd = entry->fd + 1;
+      }
     }
   }
+
+  if (hfd == 0) {
+    /* we didn't set anything - no need to continue */
+    return;
+  }
       
   /* Running select on the FD set */
   do {
-    n = olsr_select(hfd, &ibits, NULL, NULL, &tvp);
-  } while (n == -1 && errno == EINTR);
+    n = olsr_select(hfd, 
+                   fdsets & SP_PR_READ ? &ibits : NULL,
+                   fdsets & SP_PR_WRITE ? &obits : NULL,
+                   NULL,
+                   &tvp);
+  } while (n == -1 && (errno == EINTR || errno == EAGAIN));
 
   switch (n) {
   case 0:
     break;
 
-    /* Did somethig go wrong? */
-  case -1: 
+  case -1:     /* Did somethig go wrong? */
     err_msg = strerror(errno);
     olsr_syslog(OLSR_LOG_ERR, "select: %s", err_msg);
     OLSR_PRINTF(1, "Error select: %s", err_msg);
     break;
-    
-  default:
-    /* Update time since this is much used by the parsing functions */
+
+  default:     /* Update time since this is much used by the parsing functions */
     now_times = olsr_times();
-    for (olsr_sockets = olsr_socket_entries;olsr_sockets;olsr_sockets = olsr_sockets->next) {
-      if (FD_ISSET(olsr_sockets->fd, &ibits)) {
-       olsr_sockets->process_function(olsr_sockets->fd);
+    for (entry = olsr_socket_entries; entry != NULL; entry = entry->next) {
+      int rd, wr;
+      if (entry->process_pollrate == NULL) {
+       continue;
+      }
+      rd = (entry->flags & SP_PR_READ) != 0 && FD_ISSET(entry->fd, &ibits);
+      wr = (entry->flags & SP_PR_WRITE) != 0 && FD_ISSET(entry->fd, &obits);
+      if (rd && wr) {
+       entry->process_pollrate(entry->fd, entry->data, SP_PR_READ|SP_PR_WRITE);
+      } else if (wr) {
+       entry->process_pollrate(entry->fd, entry->data, SP_PR_READ);
+      } else if (rd) {
+       entry->process_pollrate(entry->fd, entry->data, SP_PR_WRITE);
       }
     }
     break;
index e824eb8..cf8b1b9 100644 (file)
  *
  */
 
-
 #ifndef _OLSR_SOCKET_PARSER
 #define _OLSR_SOCKET_PARSER
 
-typedef void(*socket_handler_func)(int fd);
+#define SP_PR_READ             0x01
+#define SP_PR_WRITE            0x02
+/*
+#define SP_IMM_READ            0x04
+#define SP_IMM_WRITE           0x08
+*/
+
+typedef void (*socket_handler_func)(int fd, void *data, unsigned int flags);
 
 
 struct olsr_socket_entry {
   int fd;
-  socket_handler_func process_function;
+  socket_handler_func process_immediate;
+  socket_handler_func process_pollrate;
+  void *data;
+  unsigned int flags;
   struct olsr_socket_entry *next;
 };
 
-extern struct olsr_socket_entry *olsr_socket_entries;
 void olsr_poll_sockets(void);
 
-void add_olsr_socket(int, socket_handler_func);
-int remove_olsr_socket(int, socket_handler_func);
+void add_olsr_socket(int fd, socket_handler_func pf_pr, socket_handler_func pf_imm, void *data, unsigned int flags);
+int remove_olsr_socket(int fd, socket_handler_func pf_pr, socket_handler_func pf_imm);
 
 #endif
 
index 165fdc4..37f856a 100644 (file)
@@ -466,7 +466,7 @@ chk_if_changed(struct olsr_if *iface)
   iface->interf = NULL;
   /* Close olsr socket */
   close(ifp->olsr_socket);
-  remove_olsr_socket(ifp->olsr_socket, &olsr_input);
+  remove_olsr_socket(ifp->olsr_socket, &olsr_input, NULL);
 
   /* Free memory */
   free(ifp->int_name);
@@ -611,7 +611,7 @@ add_hemu_if(struct olsr_if *iface)
     }  
   
   /* Register socket */
-  add_olsr_socket(ifp->olsr_socket, &olsr_input_hostemu);
+  add_olsr_socket(ifp->olsr_socket, &olsr_input_hostemu, NULL, NULL, SP_PR_READ);
 
 
   /*
@@ -929,7 +929,7 @@ chk_if_up(struct olsr_if *iface, int debuglvl __attribute__((unused)))
   set_buffer_timer(ifp);
 
   /* Register socket */
-  add_olsr_socket(ifp->olsr_socket, &olsr_input);
+  add_olsr_socket(ifp->olsr_socket, &olsr_input, NULL, NULL, SP_PR_READ);
   
 #ifdef linux 
   /* Set TOS */
index 761bcc2..451fdc8 100644 (file)
@@ -573,7 +573,7 @@ void RemoveInterface(struct olsr_if *IntConf)
   IntConf->interf = NULL;
 
   closesocket(Int->olsr_socket);
-  remove_olsr_socket(Int->olsr_socket, &olsr_input);
+  remove_olsr_socket(Int->olsr_socket, &olsr_input, NULL);
 
   free(Int->int_name);
   free(Int);
@@ -704,7 +704,7 @@ int add_hemu_if(struct olsr_if *iface)
     }  
   
   /* Register socket */
-  add_olsr_socket(ifp->olsr_socket, &olsr_input_hostemu);
+  add_olsr_socket(ifp->olsr_socket, &olsr_input_hostemu, NULL, NULL, SP_PR_READ);
 
   /*
    * Register functions for periodic message generation 
@@ -989,7 +989,7 @@ int chk_if_up(struct olsr_if *IntConf, int DebugLevel __attribute__((unused)))
     exit(1);
   }
 
-  add_olsr_socket(New->olsr_socket, &olsr_input);
+  add_olsr_socket(New->olsr_socket, &olsr_input, NULL, NULL, SP_PR_READ);
 
   New->int_next = ifnet;
   ifnet = New;