pud: force pending buffer out if not enough space for our message
[olsrd.git] / lib / pud / src / pud.c
1 #include "pud.h"
2
3 /* Plugin includes */
4 #include "dedup.h"
5 #include "networkInterfaces.h"
6 #include "configuration.h"
7 #include "gpsConversion.h"
8 #include "receiver.h"
9 #include "state.h"
10 #include "compiler.h"
11
12 /* OLSRD includes */
13 #include "olsr.h"
14 #include "ipcalc.h"
15 #include "net_olsr.h"
16 #include "parser.h"
17 #include "log.h"
18
19 /* System includes */
20
21 /** The size of the buffer in which the received NMEA string is stored */
22 #define BUFFER_SIZE_RX_NMEA             2048
23
24 /** The size of the buffer in which the received downlink message is stored */
25 #define BUFFER_SIZE_RX_DOWNLINK 2048
26
27 /** The size of the buffer in which the converted NMEA string is assembled for
28  * transmission over OSLR */
29 #define BUFFER_SIZE_TX_OLSR     512
30
31 /** The de-duplication list */
32 static DeDupList deDupList;
33
34 /** When false, use olsr_printf in pudError, otherwise use olsr_syslog */
35 static bool pudErrorUseSysLog = false;
36
37 /**
38  Report a plugin error.
39
40  @param useErrno
41  when true then errno is used in the error message; the error reason is also
42  reported.
43  @param format
44  a pointer to the format string
45  @param ...
46  arguments to the format string
47  */
48 void pudError(bool useErrno, const char *format, ...) {
49         char strDesc[256];
50         const char *colon;
51         const char *stringErr;
52
53         if ((format == NULL) || (*format == '\0')) {
54                 strDesc[0] = '\0';
55                 colon = "";
56                 if (!useErrno) {
57                         stringErr = "Unknown error";
58                 } else {
59                         stringErr = strerror(errno);
60                 }
61         } else {
62                 va_list arglist;
63
64                 va_start(arglist, format);
65                 vsnprintf(strDesc, sizeof(strDesc), format, arglist);
66                 va_end(arglist);
67
68                 if (useErrno) {
69                         colon = ": ";
70                         stringErr = strerror(errno);
71                 } else {
72                         colon = "";
73                         stringErr = "";
74                 }
75         }
76
77         if (!pudErrorUseSysLog)
78                 olsr_printf(0, "%s: %s%s%s\n", PUD_PLUGIN_ABBR, strDesc, colon, stringErr);
79         else
80                 olsr_syslog(OLSR_LOG_ERR, "%s: %s%s%s\n", PUD_PLUGIN_ABBR, strDesc, colon, stringErr);
81 }
82
83 /**
84  Sends a buffer out on all transmit interfaces
85
86  @param buffer
87  the buffer
88  @param bufferLength
89  the number of bytes in the buffer
90  */
91 static void sendToAllTxInterfaces(unsigned char *buffer,
92                 unsigned int bufferLength) {
93         union olsr_sockaddr * txAddress = getTxMcAddr();
94         void * addr;
95         socklen_t addrSize;
96         TRxTxNetworkInterface *txNetworkInterfaces = getTxNetworkInterfaces();
97
98         if (txAddress->in.sa_family == AF_INET) {
99                 addr = &txAddress->in4;
100                 addrSize = sizeof(struct sockaddr_in);
101         } else {
102                 addr = &txAddress->in6;
103                 addrSize = sizeof(struct sockaddr_in6);
104         }
105
106         while (txNetworkInterfaces != NULL) {
107                 TRxTxNetworkInterface *networkInterface = txNetworkInterfaces;
108                 errno = 0;
109                 if (sendto(networkInterface->socketFd, buffer, bufferLength, 0, addr, addrSize) < 0) {
110                         pudError(true, "Transmit error on interface %s", &networkInterface->name[0]);
111                 }
112                 txNetworkInterfaces = networkInterface->next;
113         }
114 }
115
116 /**
117  Called by OLSR core when a packet for the plugin is received from the OLSR
118  network. It converts the packet into an NMEA string and transmits it over all
119  transmit non-OLSR network interfaces.
120
121  @param olsrMessage
122  a pointer to the received OLSR message
123  @param in_if
124  a pointer to the OLSR network interface on which the packet was received
125  @param ipaddr
126  a pointer to the IP address of the sender
127
128  @return
129  - true when the packet was processed
130  - false otherwise
131  */
132 bool packetReceivedFromOlsr(union olsr_message *olsrMessage,
133                 struct interface *in_if __attribute__ ((unused)), union olsr_ip_addr *ipaddr __attribute__ ((unused))) {
134         const union olsr_ip_addr * originator = getOlsrMessageOriginator(
135                         olsr_cnf->ip_version, olsrMessage);
136         unsigned int transmitStringLength;
137         unsigned char buffer[BUFFER_SIZE_TX_OLSR];
138
139         /* when we do not loopback then check if the message originated from this
140          * node: back off */
141         if (!getUseLoopback() && ipequal(originator, &olsr_cnf->main_addr)) {
142                 return false;
143         }
144
145         /* do deduplication: when we have already seen this message from the same
146          * originator then just back off */
147         if (likely(getUseDeDup())) {
148                 if (isInDeDupList(&deDupList, olsrMessage)) {
149                         return false;
150                 }
151
152                 addToDeDup(&deDupList, olsrMessage);
153         }
154
155         transmitStringLength = gpsFromOlsr(olsrMessage, &buffer[0], sizeof(buffer));
156         if (unlikely(transmitStringLength == 0)) {
157                 return false;
158         }
159
160         sendToAllTxInterfaces(&buffer[0], transmitStringLength);
161
162         return true;
163 }
164
165 /**
166  Called by OLSR core when a packet for the plugin is received from the downlink.
167  It unpacks the messages and distributes them into OLSR and on the LAN.
168
169  @param skfd
170  the socket file descriptor on which the packet is received
171  @param data
172  a pointer to the network interface structure on which the packet was received
173  @param flags
174  unused
175  */
176 static void packetReceivedFromDownlink(int skfd, void *data __attribute__ ((unused)), unsigned int flags __attribute__ ((unused))) {
177         if (skfd >= 0) {
178                 unsigned char rxBuffer[BUFFER_SIZE_RX_DOWNLINK];
179                 ssize_t rxCount = 0;
180                 ssize_t rxIndex = 0;
181
182                 /* Receive the captured Ethernet frame */
183                 errno = 0;
184                 rxCount = recvfrom(skfd, &rxBuffer[0], (sizeof(rxBuffer) - 1), 0, NULL, NULL);
185                 if (rxCount < 0) {
186                         pudError(true, "Receive error in %s, ignoring message.", __func__);
187                         return;
188                 }
189
190                 while (rxIndex < rxCount) {
191                         UplinkMessage * msg = (UplinkMessage *) &rxBuffer[rxIndex];
192                         uint8_t type;
193                         uint16_t uplinkMessageLength;
194                         uint16_t olsrMessageLength;
195                         bool ipv6;
196                         union olsr_message * olsrMessage;
197
198                         type = getUplinkMessageType(&msg->header);
199                         olsrMessageLength = getUplinkMessageLength(&msg->header);
200                         uplinkMessageLength = olsrMessageLength + sizeof(UplinkHeader);
201
202                         if (unlikely((rxIndex + uplinkMessageLength) > rxCount)) {
203                                 pudError(false, "Received wrong length (%d) in %s,"
204                                                 " ignoring the rest of the messages.", olsrMessageLength,
205                                                 __func__);
206                                 return;
207                         }
208
209                         rxIndex += uplinkMessageLength;
210
211                         if (type != POSITION) {
212                                 pudError(false, "Received wrong type (%d) in %s,"
213                                                 " ignoring message.", type, __func__);
214                                 continue;
215                         }
216
217                         ipv6 = getUplinkMessageIPv6(&msg->header);
218                         if (unlikely(ipv6 && (olsr_cnf->ip_version != AF_INET6))) {
219                                 pudError(false, "Received wrong IPv6 status (%s) in %s,"
220                                                 " ignoring message.", (ipv6 ? "true" : "false"),
221                                                 __func__);
222                                 continue;
223                         }
224
225                         olsrMessage = &msg->msg.olsrMessage;
226
227                         /* we now have a position update (olsrMessage) of a certain length
228                          * (olsrMessageLength). this needs to be transmitted over OLSR and on the LAN */
229
230                         /* send out over OLSR interfaces */
231                         {
232                                 int r;
233                                 struct interface *ifn;
234                                 for (ifn = ifnet; ifn; ifn = ifn->int_next) {
235                                         /* force the pending buffer out if there's not enough space for our message */
236                                         if ((int)olsrMessageLength > net_outbuffer_bytes_left(ifn)) {
237                                           net_output(ifn);
238                                         }
239                                         r = net_outbuffer_push(ifn, olsrMessage, olsrMessageLength);
240                                         if (r != (int) olsrMessageLength) {
241                                                 pudError(
242                                                                 false,
243                                                                 "Could not send to OLSR interface %s: %s"
244                                                                                 " (length=%u, r=%d)",
245                                                                 ifn->int_name,
246                                                                 ((r == -1) ? "no buffer was found" :
247                                                                         (r == 0) ? "there was not enough room in the buffer" :
248                                                                                         "unknown reason"), olsrMessageLength, r);
249                                         }
250                                 }
251                         }
252
253                         /* send out over tx interfaces */
254                         (void) packetReceivedFromOlsr(olsrMessage, NULL, NULL);
255                 }
256         }
257 }
258
259 /**
260  Called by OLSR core when a packet for the plugin is received from the non-OLSR
261  network. It converts the packet into the internal OLSR wire format for a
262  position update and transmits it over all OLSR network interfaces.
263
264  @param skfd
265  the socket file descriptor on which the packet is received
266  @param data
267  a pointer to the network interface structure on which the packet was received
268  @param flags
269  unused
270  */
271 static void packetReceivedForOlsr(int skfd, void *data __attribute__ ((unused)), unsigned int flags __attribute__ ((unused))) {
272         if (skfd >= 0) {
273                 unsigned char rxBuffer[BUFFER_SIZE_RX_NMEA];
274                 ssize_t rxCount;
275                 union olsr_sockaddr sender;
276                 socklen_t senderSize = sizeof(sender);
277
278                 assert(data != NULL);
279
280                 /* Receive the captured Ethernet frame */
281                 memset(&sender, 0, senderSize);
282                 errno = 0;
283                 rxCount = recvfrom(skfd, &rxBuffer[0], (sizeof(rxBuffer) - 1), 0,
284                                 (struct sockaddr *)&sender, &senderSize);
285                 if (rxCount < 0) {
286                         pudError(true, "Receive error in %s, ignoring message.", __func__);
287                         return;
288                 }
289
290                 /* make sure the string is null-terminated */
291                 rxBuffer[rxCount] = '\0';
292
293                 /* only accept messages from configured IP addresses */
294                 if (!isRxAllowedSourceIpAddress(&sender)) {
295                         return;
296                 }
297
298                 /* we have the received string in the rxBuffer now */
299
300                 /* hand the NMEA information to the receiver */
301                 (void) receiverUpdateGpsInformation(&rxBuffer[0], rxCount);
302         }
303 }
304
305 /**
306  Initialise the plugin: check the configuration, initialise the NMEA parser,
307  create network interface sockets, hookup the plugin to OLSR and setup data
308  that can be setup in advance.
309
310  @return
311  - false upon failure
312  - true otherwise
313  */
314 bool initPud(void) {
315         if (!checkConfig()) {
316                 pudError(false, "Invalid configuration");
317                 goto error;
318         }
319
320         initState();
321
322         if (!initDeDupList(&deDupList, getDeDupDepth())) {
323                 pudError(false, "Could not initialise de-duplication list");
324                 goto error;
325         }
326
327         if (!startReceiver()) {
328                 pudError(false, "Could not start receiver");
329                 goto error;
330         }
331
332         /*
333          * Creates receive and transmit sockets and register the receive sockets
334          * with the OLSR stack
335          */
336         if (!createNetworkInterfaces(&packetReceivedForOlsr,
337                         &packetReceivedFromDownlink)) {
338                 pudError(false, "Could not create require network interfaces");
339                 goto error;
340         }
341
342         if (!checkRunSetup()) {
343                 pudError(false, "Invalid configuration");
344                 goto error;
345         }
346
347         /*
348          * Tell OLSR to call packetReceivedFromOlsr when the packets for this
349          * plugin arrive from the OLSR network
350          */
351         olsr_parser_add_function(&packetReceivedFromOlsr, PUD_OLSR_MSG_TYPE);
352
353         /* switch to syslog logging, load was succesful */
354         pudErrorUseSysLog = !olsr_cnf->no_fork;
355
356         return true;
357
358         error: closePud();
359         return false;
360 }
361
362 /**
363  Stop the plugin: shut down all created network interface sockets and destroy
364  the NMEA parser.
365  */
366 void closePud(void) {
367         closeNetworkInterfaces();
368         stopReceiver();
369         destroyDeDupList(&deDupList);
370 }