diff --git a/modules/pcap/daq_pcap.c b/modules/pcap/daq_pcap.c index 51a4b78..c30ec85 100644 --- a/modules/pcap/daq_pcap.c +++ b/modules/pcap/daq_pcap.c @@ -25,10 +25,13 @@ #include #include +#include #include #include #include #include +#include +#include #include "daq_module_api.h" @@ -78,6 +81,7 @@ typedef struct _pcap_context uint32_t netmask; bool nonblocking; volatile bool interrupted; + int interrupt_fd; /* Readback timeout state */ struct timeval last_recv; PcapPktDesc *pending_desc; @@ -251,6 +255,7 @@ static int pcap_daq_instantiate(const DAQ_ModuleConfig_h modcfg, DAQ_ModuleInsta return DAQ_ERROR_NOMEM; } pc->modinst = modinst; + pc->interrupt_fd = -1; pc->snaplen = daq_base_api.config_get_snaplen(modcfg); pc->timeout = daq_base_api.config_get_timeout(modcfg); @@ -350,6 +355,8 @@ static void pcap_daq_destroy(void *handle) free(pc->device); if (pc->filter_string) free(pc->filter_string); + if (pc->interrupt_fd >= 0) + close(pc->interrupt_fd); destroy_packet_pool(pc); free(pc); } @@ -452,6 +459,7 @@ static int pcap_daq_start(void *handle) goto fail; if (pcap_lookupnet(pc->device, &localnet, &netmask, pc->pcap_errbuf) < 0) netmask = htonl(defaultnet); + pc->interrupt_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); } else { @@ -517,6 +525,12 @@ static int pcap_daq_interrupt(void *handle) Pcap_Context_t *pc = (Pcap_Context_t *) handle; pc->interrupted = true; + if (pc->interrupt_fd >= 0) + { + uint64_t val = 1; + if (write(pc->interrupt_fd, &val, sizeof(val)) < 0 && errno != EAGAIN) + return DAQ_ERROR; + } return DAQ_SUCCESS; } @@ -532,6 +546,11 @@ static int pcap_daq_stop(void *handle) pcap_close(pc->handle); pc->handle = NULL; } + if (pc->interrupt_fd >= 0) + { + close(pc->interrupt_fd); + pc->interrupt_fd = -1; + } return DAQ_SUCCESS; } @@ -649,7 +668,10 @@ static unsigned pcap_daq_msg_receive(void *handle, const unsigned max_recv, cons } /* When dealing with a live interface, try to get the first packet in non-blocking mode. - If there's nothing to receive, switch to blocking mode. */ + If there's nothing to receive, wait on poll() so that pcap_daq_interrupt() + can wake the thread immediately via interrupt_fd; falling back to a blocking + pcap_next_ex() would otherwise hang indefinitely on an idle interface + (TPACKET_V3, virtual NICs) because the in-pcap timeout isn't always honored. */ int pcap_rval; if (pc->mode != DAQ_MODE_READ_FILE && idx == 0) { @@ -661,12 +683,43 @@ static unsigned pcap_daq_msg_receive(void *handle, const unsigned max_recv, cons pcap_rval = pcap_next_ex(pc->handle, &pcaphdr, &data); if (pcap_rval == 0) { - if (set_nonblocking(pc, false) != DAQ_SUCCESS) + int pcap_fd = pcap_get_selectable_fd(pc->handle); + if (pcap_fd >= 0 && pc->interrupt_fd >= 0) { - *rstat = DAQ_RSTAT_ERROR; - break; + struct pollfd fds[2]; + fds[0].fd = pcap_fd; + fds[0].events = POLLIN; + fds[0].revents = 0; + fds[1].fd = pc->interrupt_fd; + fds[1].events = POLLIN; + fds[1].revents = 0; + int poll_timeout = (pc->timeout > 0) ? pc->timeout : 1000; + int pret = poll(fds, 2, poll_timeout); + if (pret > 0 && (fds[1].revents & POLLIN)) + { + uint64_t val; + ssize_t r = read(pc->interrupt_fd, &val, sizeof(val)); + (void) r; + pc->interrupted = false; + *rstat = DAQ_RSTAT_INTERRUPTED; + break; + } + if (pret <= 0) + { + *rstat = DAQ_RSTAT_TIMEOUT; + break; + } + pcap_rval = pcap_next_ex(pc->handle, &pcaphdr, &data); + } + else + { + if (set_nonblocking(pc, false) != DAQ_SUCCESS) + { + *rstat = DAQ_RSTAT_ERROR; + break; + } + pcap_rval = pcap_next_ex(pc->handle, &pcaphdr, &data); } - pcap_rval = pcap_next_ex(pc->handle, &pcaphdr, &data); } } else