Some bugfixes in stream scheduler
authorHenning Rogge <henning.rogge@fkie.fraunhofer.de>
Wed, 11 Jan 2012 13:27:22 +0000 (14:27 +0100)
committerHenning Rogge <henning.rogge@fkie.fraunhofer.de>
Wed, 11 Jan 2012 13:27:22 +0000 (14:27 +0100)
src/core/olsr_http.c
src/core/olsr_socket.c
src/core/olsr_stream_socket.c
src/core/olsr_stream_socket.h
src/core/olsr_telnet.c

index 5cb79c1..59a544e 100644 (file)
@@ -129,7 +129,7 @@ olsr_http_cleanup(void) {
   if (olsr_subsystem_cleanup(&_http_state))
     return;
 
-  olsr_stream_remove_managed(&_http_managed_socket);
+  olsr_stream_remove_managed(&_http_managed_socket, true);
 
   cfg_delta_remove_handler(olsr_cfg_get_delta(), &_http_handler);
   cfg_schema_remove_section(olsr_cfg_get_schema(), &_http_section);
index 4c4441c..7da0db6 100644 (file)
@@ -113,6 +113,7 @@ void
 olsr_socket_remove(struct olsr_socket_entry *entry)
 {
   OLSR_DEBUG(LOG_SOCKET, "Removing socket entry %d\n", entry->fd);
+
   list_remove(&entry->node);
 }
 
@@ -167,6 +168,7 @@ olsr_socket_handle(uint32_t until_time)
       if (entry->process == NULL) {
         continue;
       }
+
       if (entry->event_read) {
         fd_read = true;
         FD_SET((unsigned int)entry->fd, &ibits);        /* And we cast here since we get a warning on Win32 */
index 1d0f0b4..2b5b49f 100644 (file)
@@ -119,7 +119,7 @@ olsr_stream_cleanup(void) {
   while (!list_is_empty(&olsr_stream_head)) {
     comport = list_first_element(&olsr_stream_head, comport, node);
 
-    olsr_stream_remove(comport);
+    olsr_stream_remove(comport, true);
   }
 
   olsr_memcookie_remove(connection_cookie);
@@ -204,16 +204,31 @@ add_stream_error:
 /**
  * Remove a stream socket from the scheduler
  * @param stream_socket pointer to socket
+ * @param force true if socket will be closed immediately,
+ *   false if scheduler should wait until outgoing buffers are empty
  */
 void
-olsr_stream_remove(struct olsr_stream_socket *stream_socket) {
-  struct olsr_stream_session *session;
+olsr_stream_remove(struct olsr_stream_socket *stream_socket, bool force) {
+  struct olsr_stream_session *session, *ptr;
+
+  if (stream_socket->busy) {
+    stream_socket->remove = true;
+    return;
+  }
 
   if (list_is_node_added(&stream_socket->node)) {
-    stream_socket = list_first_element(&olsr_stream_head, stream_socket, node);
-    while (!list_is_empty(&stream_socket->session)) {
-      session = list_first_element(&stream_socket->session, session, node);
-      olsr_stream_close(session);
+    list_for_each_element_safe(&stream_socket->session, session, node, ptr) {
+      if (!force && session->out.len > 0) {
+        stream_socket->remove_when_finished = true;
+      }
+      else {
+        /* close everything that doesn't need to send data anymore */
+        olsr_stream_close(session);
+      }
+    }
+
+    if (!list_is_empty(&stream_socket->session)) {
+      return;
     }
 
     list_remove(&stream_socket->node);
@@ -289,6 +304,12 @@ olsr_stream_set_timeout(struct olsr_stream_session *con, uint32_t timeout) {
  */
 void
 olsr_stream_close(struct olsr_stream_session *session) {
+  if (session->busy) {
+    /* remove the session later */
+    session->removed = true;
+    return;
+  }
+
   if (session->comport->config.cleanup) {
     session->comport->config.cleanup(session);
   }
@@ -336,7 +357,7 @@ olsr_stream_apply_managed(struct olsr_stream_managed *managed,
     }
   }
   else {
-    olsr_stream_remove(&managed->socket_v4);
+    olsr_stream_remove(&managed->socket_v4, true);
   }
 
   if (config_global.ipv6) {
@@ -346,7 +367,7 @@ olsr_stream_apply_managed(struct olsr_stream_managed *managed,
     }
   }
   else {
-    olsr_stream_remove(&managed->socket_v6);
+    olsr_stream_remove(&managed->socket_v6, true);
   }
   return 0;
 }
@@ -354,11 +375,13 @@ olsr_stream_apply_managed(struct olsr_stream_managed *managed,
 /**
  * Remove a managed TCP stream
  * @param managed pointer to managed stream
+ * @param force true if socket will be closed immediately,
+ *   false if scheduler should wait until outgoing buffers are empty
  */
 void
-olsr_stream_remove_managed(struct olsr_stream_managed *managed) {
-  olsr_stream_remove(&managed->socket_v4);
-  olsr_stream_remove(&managed->socket_v6);
+olsr_stream_remove_managed(struct olsr_stream_managed *managed, bool forced) {
+  olsr_stream_remove(&managed->socket_v4, forced);
+  olsr_stream_remove(&managed->socket_v6, forced);
 
   olsr_acl_remove(&managed->acl);
 }
@@ -391,7 +414,7 @@ _apply_managed_socket(struct olsr_stream_managed *managed,
     return 0;
   }
 
-  olsr_stream_remove(stream);
+  olsr_stream_remove(stream, true);
   if (olsr_stream_add(stream, &sock)) {
     return -1;
   }
@@ -554,7 +577,7 @@ _cb_timeout_handler(void *data) {
 static void
 _cb_parse_connection(int fd, void *data, bool event_read, bool event_write) {
   struct olsr_stream_session *session;
-  struct olsr_stream_socket *comport;
+  struct olsr_stream_socket *s_sock;
   int len;
   char buffer[1024];
 #if !defined(REMOVE_LOG_WARN)
@@ -562,10 +585,14 @@ _cb_parse_connection(int fd, void *data, bool event_read, bool event_write) {
 #endif
 
   session = data;
-  comport = session->comport;
+  s_sock = session->comport;
 
   OLSR_DEBUG(LOG_SOCKET_STREAM, "Parsing connection of socket %d\n", fd);
 
+  /* mark session and s_sock as busy */
+  session->busy = true;
+  s_sock->busy = true;
+
   if (session->wait_for_connect) {
     if (event_write) {
       int value;
@@ -590,6 +617,8 @@ _cb_parse_connection(int fd, void *data, bool event_read, bool event_write) {
   }
 
   if (session->wait_for_connect) {
+    session->busy = false;
+    s_sock->busy = false;
     return;
   }
 
@@ -602,15 +631,15 @@ _cb_parse_connection(int fd, void *data, bool event_read, bool event_write) {
         /* out of memory */
         OLSR_WARN(LOG_SOCKET_STREAM, "Out of memory for comport session input buffer");
         session->state = STREAM_SESSION_CLEANUP;
-      } else if (session->in.len > comport->config.maximum_input_buffer) {
+      } else if (session->in.len > s_sock->config.maximum_input_buffer) {
         /* input buffer overflow */
-        if (comport->config.create_error) {
-          comport->config.create_error(session, STREAM_REQUEST_TOO_LARGE);
+        if (s_sock->config.create_error) {
+          s_sock->config.create_error(session, STREAM_REQUEST_TOO_LARGE);
         }
         session->state = STREAM_SESSION_SEND_AND_QUIT;
       } else {
         /* got new input block, reset timeout */
-        olsr_stream_set_timeout(session, comport->config.session_timeout);
+        olsr_stream_set_timeout(session, s_sock->config.session_timeout);
       }
     } else if (len < 0 && errno != EINTR && errno != EAGAIN && errno
         != EWOULDBLOCK) {
@@ -619,14 +648,14 @@ _cb_parse_connection(int fd, void *data, bool event_read, bool event_write) {
           netaddr_to_string(&buf, &session->remote_address), strerror(errno), errno);
       session->state = STREAM_SESSION_CLEANUP;
     } else if (len == 0) {
-      /* external socket closed */
+      /* external s_sock closed */
       session->state = STREAM_SESSION_SEND_AND_QUIT;
     }
   }
 
-  if (session->state == STREAM_SESSION_ACTIVE && comport->config.receive_data != NULL
+  if (session->state == STREAM_SESSION_ACTIVE && s_sock->config.receive_data != NULL
       && (session->in.len > 0 || session->send_first)) {
-    session->state = comport->config.receive_data(session);
+    session->state = s_sock->config.receive_data(session);
     session->send_first = false;
   }
 
@@ -638,7 +667,7 @@ _cb_parse_connection(int fd, void *data, bool event_read, bool event_write) {
       if (len > 0) {
         OLSR_DEBUG(LOG_SOCKET_STREAM, "  send returned %d\n", len);
         abuf_pull(&session->out, len);
-        olsr_stream_set_timeout(session, comport->config.session_timeout);
+        olsr_stream_set_timeout(session, s_sock->config.session_timeout);
       } else if (len < 0 && errno != EINTR && errno != EAGAIN && errno
           != EWOULDBLOCK) {
         OLSR_WARN(LOG_SOCKET_STREAM, "Error while writing to communication stream with %s: %s (%d)\n",
@@ -660,6 +689,9 @@ _cb_parse_connection(int fd, void *data, bool event_read, bool event_write) {
     }
   }
 
+  session->busy = false;
+  s_sock->busy = false;
+
   /* end of connection ? */
   if (session->state == STREAM_SESSION_CLEANUP) {
     OLSR_DEBUG(LOG_SOCKET_STREAM, "  cleanup\n");
@@ -669,5 +701,14 @@ _cb_parse_connection(int fd, void *data, bool event_read, bool event_write) {
     session->timeout = NULL;
     olsr_stream_close(session);
   }
+  else if (session->removed) {
+    /* lazy session removal */
+    olsr_stream_close(session);
+  }
+
+  /* lazy socket removal */
+  if (s_sock->remove) {
+    olsr_stream_remove(s_sock, !s_sock->remove_when_finished);
+  }
   return;
 }
index 4dd3fd5..66ddc9f 100644 (file)
@@ -105,6 +105,12 @@ struct olsr_stream_session {
   /* true if session is still waiting for initial handshake to finish */
   bool wait_for_connect;
 
+  /* session event is just busy in scheduler */
+  bool busy;
+
+  /* session has been remove while being busy */
+  bool removed;
+
   enum olsr_stream_session_state state;
 };
 
@@ -164,6 +170,10 @@ struct olsr_stream_socket {
   struct olsr_socket_entry scheduler_entry;
 
   struct olsr_stream_config config;
+
+  bool busy;
+  bool remove;
+  bool remove_when_finished;
 };
 
 struct olsr_stream_managed {
@@ -186,7 +196,7 @@ void olsr_stream_cleanup(void);
 
 EXPORT int olsr_stream_add(struct olsr_stream_socket *,
     union netaddr_socket *local);
-EXPORT void olsr_stream_remove(struct olsr_stream_socket *);
+EXPORT void olsr_stream_remove(struct olsr_stream_socket *, bool force);
 EXPORT struct olsr_stream_session *olsr_stream_connect_to(
     struct olsr_stream_socket *, union netaddr_socket *remote);
 EXPORT void olsr_stream_flush(struct olsr_stream_session *con);
@@ -198,6 +208,6 @@ EXPORT void olsr_stream_close(struct olsr_stream_session *con);
 EXPORT void olsr_stream_add_managed(struct olsr_stream_managed *);
 EXPORT int olsr_stream_apply_managed(struct olsr_stream_managed *,
     struct olsr_stream_managed_config *);
-EXPORT void olsr_stream_remove_managed(struct olsr_stream_managed *);
+EXPORT void olsr_stream_remove_managed(struct olsr_stream_managed *, bool force);
 
 #endif /* OLSR_STREAM_SOCKET_H_ */
index 83aac6b..2dfaea5 100644 (file)
@@ -155,7 +155,7 @@ olsr_telnet_cleanup(void) {
   if (olsr_subsystem_cleanup(&_telnet_state))
     return;
 
-  olsr_stream_remove_managed(&_telnet_managed);
+  olsr_stream_remove_managed(&_telnet_managed, true);
 
   cfg_delta_remove_handler(olsr_cfg_get_delta(), &telnet_handler);
   cfg_schema_remove_section(olsr_cfg_get_schema(), &telnet_section);