Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 58 additions & 5 deletions modules/pcap/daq_pcap.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@

#include <errno.h>
#include <pcap.h>
#include <poll.h>
#include <pthread.h>
#include <stdbool.h>
#include <stdlib.h>
#include <string.h>
#include <sys/eventfd.h>
#include <unistd.h>

#include "daq_module_api.h"

Expand Down Expand Up @@ -78,6 +81,7 @@ typedef struct _pcap_context
uint32_t netmask;
bool nonblocking;
volatile bool interrupted;
int interrupt_fd;
/* Readback timeout state */
struct timeval last_recv;
PcapPktDesc *pending_desc;
Expand Down Expand Up @@ -251,6 +255,7 @@ static int pcap_daq_instantiate(const DAQ_ModuleConfig_h modcfg, DAQ_ModuleInsta
return DAQ_ERROR_NOMEM;
}
pc->modinst = modinst;
pc->interrupt_fd = -1;

pc->snaplen = daq_base_api.config_get_snaplen(modcfg);
pc->timeout = daq_base_api.config_get_timeout(modcfg);
Expand Down Expand Up @@ -350,6 +355,8 @@ static void pcap_daq_destroy(void *handle)
free(pc->device);
if (pc->filter_string)
free(pc->filter_string);
if (pc->interrupt_fd >= 0)
close(pc->interrupt_fd);
destroy_packet_pool(pc);
free(pc);
}
Expand Down Expand Up @@ -452,6 +459,7 @@ static int pcap_daq_start(void *handle)
goto fail;
if (pcap_lookupnet(pc->device, &localnet, &netmask, pc->pcap_errbuf) < 0)
netmask = htonl(defaultnet);
pc->interrupt_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
}
else
{
Expand Down Expand Up @@ -517,6 +525,12 @@ static int pcap_daq_interrupt(void *handle)
Pcap_Context_t *pc = (Pcap_Context_t *) handle;

pc->interrupted = true;
if (pc->interrupt_fd >= 0)
{
uint64_t val = 1;
if (write(pc->interrupt_fd, &val, sizeof(val)) < 0 && errno != EAGAIN)
return DAQ_ERROR;
}

return DAQ_SUCCESS;
}
Expand All @@ -532,6 +546,11 @@ static int pcap_daq_stop(void *handle)
pcap_close(pc->handle);
pc->handle = NULL;
}
if (pc->interrupt_fd >= 0)
{
close(pc->interrupt_fd);
pc->interrupt_fd = -1;
}

return DAQ_SUCCESS;
}
Expand Down Expand Up @@ -649,7 +668,10 @@ static unsigned pcap_daq_msg_receive(void *handle, const unsigned max_recv, cons
}

/* When dealing with a live interface, try to get the first packet in non-blocking mode.
If there's nothing to receive, switch to blocking mode. */
If there's nothing to receive, wait on poll() so that pcap_daq_interrupt()
can wake the thread immediately via interrupt_fd; falling back to a blocking
pcap_next_ex() would otherwise hang indefinitely on an idle interface
(TPACKET_V3, virtual NICs) because the in-pcap timeout isn't always honored. */
int pcap_rval;
if (pc->mode != DAQ_MODE_READ_FILE && idx == 0)
{
Expand All @@ -661,12 +683,43 @@ static unsigned pcap_daq_msg_receive(void *handle, const unsigned max_recv, cons
pcap_rval = pcap_next_ex(pc->handle, &pcaphdr, &data);
if (pcap_rval == 0)
{
if (set_nonblocking(pc, false) != DAQ_SUCCESS)
int pcap_fd = pcap_get_selectable_fd(pc->handle);
if (pcap_fd >= 0 && pc->interrupt_fd >= 0)
{
*rstat = DAQ_RSTAT_ERROR;
break;
struct pollfd fds[2];
fds[0].fd = pcap_fd;
fds[0].events = POLLIN;
fds[0].revents = 0;
fds[1].fd = pc->interrupt_fd;
fds[1].events = POLLIN;
fds[1].revents = 0;
int poll_timeout = (pc->timeout > 0) ? pc->timeout : 1000;
int pret = poll(fds, 2, poll_timeout);
if (pret > 0 && (fds[1].revents & POLLIN))
{
uint64_t val;
ssize_t r = read(pc->interrupt_fd, &val, sizeof(val));
(void) r;
pc->interrupted = false;
*rstat = DAQ_RSTAT_INTERRUPTED;
break;
}
if (pret <= 0)
{
*rstat = DAQ_RSTAT_TIMEOUT;
break;
}
pcap_rval = pcap_next_ex(pc->handle, &pcaphdr, &data);
}
else
{
if (set_nonblocking(pc, false) != DAQ_SUCCESS)
{
*rstat = DAQ_RSTAT_ERROR;
break;
}
pcap_rval = pcap_next_ex(pc->handle, &pcaphdr, &data);
}
pcap_rval = pcap_next_ex(pc->handle, &pcaphdr, &data);
}
}
else
Expand Down