Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ These plugins extract protocol-specific or behavioral information from packets a
| [`smtp`](./src/plugins/process/smtp/README.md) | extracts SMTP envelope data (from, to, subject, etc.) |
| [`ssaDetector`](./src/plugins/process/ssaDetector/README.md) | performs simple anomaly detection based on traffic patterns |
| [`ssdp`](./src/plugins/process/ssdp/README.md) | parses SSDP (UPnP discovery) protocol |
| [`vlan`](./src/plugins/process/vlan/README.md) | extracts VLAN IDs and QinQ encapsulation |
| [`vlan`](./src/plugins/process/vlan/README.md) | extracts VLAN IDs |
| [`qinq`](./src/plugins/process/qinq/README.md) | extracts QinQ outer and inner VLAN IDs. |

---
### Output Plugins
Expand Down
7 changes: 7 additions & 0 deletions include/ipfixprobe/ipfix-elements.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ namespace ipxp {
#define ETHERTYPE(F) F(0, 256, 2, nullptr)

#define VLAN_ID(F) F(0, 58, 2, nullptr)
#define DOT1Q_VLAN_ID(F) F(0, 243, 2, nullptr)
#define DOT1Q_CUSTOMER_VLAN_ID(F) F(0, 245, 2, nullptr)

#define L2_SRC_MAC(F) F(0, 56, 6, flow.src_mac)
#define L2_DST_MAC(F) F(0, 80, 6, flow.dst_mac)
Expand Down Expand Up @@ -564,6 +566,10 @@ namespace ipxp {

#define IPFIX_VLAN_TEMPLATE(F) F(VLAN_ID)

#define IPFIX_QINQ_TEMPLATE(F) \
F(DOT1Q_VLAN_ID) \
F(DOT1Q_CUSTOMER_VLAN_ID)

#define IPFIX_NETTISA_TEMPLATE(F) \
F(NTS_MEAN) \
F(NTS_MIN) \
Expand Down Expand Up @@ -615,6 +621,7 @@ namespace ipxp {
IPFIX_SSADETECTOR_TEMPLATE(F) \
IPFIX_ICMP_TEMPLATE(F) \
IPFIX_VLAN_TEMPLATE(F) \
IPFIX_QINQ_TEMPLATE(F) \
IPFIX_NETTISA_TEMPLATE(F) \
IPFIX_FLOW_HASH_TEMPLATE(F)

Expand Down
2 changes: 2 additions & 0 deletions include/ipfixprobe/packet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ struct Packet : public Record {
ipaddr_t src_ip;
ipaddr_t dst_ip;
uint32_t vlan_id;
uint32_t vlan_id2;
uint32_t frag_id;
uint16_t frag_off;
bool more_fragments;
Expand Down Expand Up @@ -119,6 +120,7 @@ struct Packet : public Record {
, src_ip({0})
, dst_ip({0})
, vlan_id(0)
, vlan_id2(0)
, frag_id(0)
, frag_off(0)
, more_fragments(false)
Expand Down
37 changes: 37 additions & 0 deletions init/config2args.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ def process_input_dpdk_plugin(settings):
rss_offload = settings.get("rss_offload", None)
if rss_offload is not None:
primary_param += f"rss={rss_offload};"
rss_offload_suppress = settings.get("rss_offload_suppress", None)
if rss_offload_suppress:
primary_param += f"s;"
primary_param += f"eal={eal}\""

params = []
Expand Down Expand Up @@ -336,6 +339,40 @@ def process_storage(config):
cache_params.append(f"s={cache['size_exponent']}")
if "line_size_exponent" in cache:
cache_params.append(f"l={cache['line_size_exponent']}")
if "source_optimization" in cache:
so_value = "true" if cache['source_optimization'] else "false"
cache_params.append(f"so={so_value}")
if "source_optimization_network" in cache:
son_networks = cache.get("source_optimization_network")
if son_networks:
# Handle source_optimization_network - can be a list of dicts or a single dict
if isinstance(son_networks, dict):
# If it's a single dict with main/exclude, convert to list
son_networks = [son_networks]
elif not isinstance(son_networks, list):
son_networks = [son_networks]

# Generate -son arguments for each network group
for network_group in son_networks:
if isinstance(network_group, dict):
main = network_group.get("main")
exclude = network_group.get("exclude", "")

if main:
# Combine main network with exclusions
networks = [main.strip()]
if exclude:
# Handle exclude as comma-separated string
if isinstance(exclude, str):
excludes = [e.strip() for e in exclude.split(",")]
networks.extend(excludes)
elif isinstance(exclude, list):
networks.extend(exclude)

cache_params.append(f"son={','.join(networks)}")
elif isinstance(network_group, str):
# If it's just a string, use it directly
cache_params.append(f"son={network_group}")
if cache_params:
params.append(f"{';'.join(cache_params)}")

Expand Down
24 changes: 24 additions & 0 deletions init/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@
"integer",
"null"
]
},
"rss_offload_suppress": {
"type": "boolean"
}
},
"required": [
Expand Down Expand Up @@ -412,6 +415,27 @@
"line_size_exponent": {
"type": "integer",
"minimum": 1
},
"source_optimization": {
"type": "boolean"
},
"source_optimization_network": {
"type": "array",
"items": {
"type": "object",
"properties": {
"main": {
"type": "string"
},
"exclude": {
"type": "string"
}
},
"required": [
"main"
],
"additionalProperties": false
}
}
},
"additionalProperties": false
Expand Down
4 changes: 2 additions & 2 deletions src/plugins/input/dpdk/src/dpdk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,13 @@ void DpdkCore::configure(const char* params)
m_mBufsCount = parser.pkt_buffer_size();
uint16_t mtuSize = parser.mtu_size();
uint64_t rssOffload = parser.rss_offload();

bool rssOffloadSuppress = parser.rss_offload_suppress();
configureEal(parser.eal_params());

m_dpdkDevices.reserve(parser.port_numbers().size());
for (auto portID : parser.port_numbers()) {
m_dpdkDevices
.emplace_back(portID, rxQueueCount, mempoolSize, m_mBufsCount, mtuSize, rssOffload);
.emplace_back(portID, rxQueueCount, mempoolSize, m_mBufsCount, mtuSize, rssOffload, rssOffloadSuppress);
}

isConfigured = true;
Expand Down
15 changes: 14 additions & 1 deletion src/plugins/input/dpdk/src/dpdk.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class DpdkOptParser : public OptionsParser {
std::string eal_;
uint16_t mtu_;
uint64_t rss_offload_ = 0;

bool rss_offload_suppress_ = false;
std::vector<uint16_t> parsePortNumbers(std::string arg)
{
std::string delimiter = ",";
Expand Down Expand Up @@ -138,6 +138,17 @@ class DpdkOptParser : public OptionsParser {
return true;
},
RequiredArgument);
register_option(
"s",
"rss_offload_suppress",
"",
"RSS offload suppress value, ignore if rss is missing for multiqueue (Used for VM to VM where packets are ordered by the inserting process). Default: false",
[this](const char* arg) {
(void)arg;
rss_offload_suppress_ = true;
return true;
},
OptionFlags::NoArgument);
register_option(
"e",
"eal",
Expand Down Expand Up @@ -177,6 +188,8 @@ class DpdkOptParser : public OptionsParser {
uint16_t mtu_size() const { return mtu_; }

uint64_t rss_offload() const { return rss_offload_; }

bool rss_offload_suppress() const { return rss_offload_suppress_; }
};

class DpdkCore {
Expand Down
15 changes: 12 additions & 3 deletions src/plugins/input/dpdk/src/dpdkDevice.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ DpdkDevice::DpdkDevice(
uint16_t memPoolSize,
uint16_t mbufsCount,
uint16_t mtuSize,
uint64_t rssOffload)
uint64_t rssOffload,
bool rssOffloadSuppress)
: m_portID(portID)
, m_rxQueueCount(rxQueueCount)
, m_txQueueCount(0)
Expand All @@ -53,6 +54,7 @@ DpdkDevice::DpdkDevice(
, m_supportedHWTimestamp(false)
, m_mtuSize(mtuSize)
, m_rssOffload(rssOffload)
, m_rssOffloadSuppress(rssOffloadSuppress)
{
validatePort();
recognizeDriver();
Expand Down Expand Up @@ -145,8 +147,15 @@ rte_eth_conf DpdkDevice::createPortConfig()
if (m_rxQueueCount > 1 && !m_supportedRSS) {
std::cerr << "RSS is not supported by card, multiple queues will not work as expected."
<< std::endl;
throw PluginError(
"DpdkDevice::createPortConfig() has failed. Required RSS for q>1 is not supported.");
if( m_rssOffloadSuppress ) {
std::cerr << "WARNING: "<< std::endl
<< " Multiple queues are configured without RSS." << std::endl
<< " Packets need to be ordered by the sending NIC, " << std::endl
<< " this only works for virtual cards (vm to vm)." << std::endl;
} else {
throw PluginError(
"DpdkDevice::createPortConfig() has failed. Required RSS for q>1 is not supported.");
}
}

#if RTE_VERSION >= RTE_VERSION_NUM(21, 11, 0, 0)
Expand Down
4 changes: 3 additions & 1 deletion src/plugins/input/dpdk/src/dpdkDevice.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ class DpdkDevice {
uint16_t memPoolSize,
uint16_t mbufsCount,
uint16_t mtuSize,
uint64_t rssOffload);
uint64_t rssOffload,
bool rssOffloadSuppress);

/**
* @brief Receives packets from the specified receive queue of the DPDK device.
Expand Down Expand Up @@ -105,6 +106,7 @@ class DpdkDevice {
int m_rxTimestampDynflag;
uint16_t m_mtuSize;
uint64_t m_rssOffload = 0;
bool m_rssOffloadSuppress;
};

} // namespace ipxp
11 changes: 9 additions & 2 deletions src/plugins/input/parser/parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ inline uint16_t parse_eth_hdr(const u_char* data_ptr, uint16_t data_len, Packet*

// set the default value in case there is no VLAN ID
pkt->vlan_id = 0;

pkt->vlan_id2 = 0;
if (ethertype == ETH_P_8021AD || ethertype == ETH_P_8021Q) {
if (4 > data_len - hdr_len) {
throw "Parser detected malformed packet";
Expand All @@ -138,7 +138,8 @@ inline uint16_t parse_eth_hdr(const u_char* data_ptr, uint16_t data_len, Packet*
if (4 > data_len - hdr_len) {
throw "Parser detected malformed packet";
}
DEBUG_CODE(uint16_t vlan = ntohs(*(uint16_t*) (data_ptr + hdr_len)));
uint16_t vlan = ntohs(*(uint16_t*) (data_ptr + hdr_len));
pkt->vlan_id2 = vlan & 0x0FFF;
DEBUG_MSG("\t802.1q field:\n");
DEBUG_MSG("\t\tPriority:\t%u\n", ((vlan & 0xE000) >> 12));
DEBUG_MSG("\t\tCFI:\t\t%u\n", ((vlan & 0x1000) >> 11));
Expand Down Expand Up @@ -773,6 +774,9 @@ void parse_packet(
if (pkt->vlan_id) {
stats.vlan_packets++;
}
if( pkt->vlan_id2) {
stats.vlan_packets++;
}

if (pkt->ethertype == ETH_P_IP) {
stats.ipv4_packets++;
Expand Down Expand Up @@ -804,6 +808,9 @@ void parse_packet(
pkt->payload = pkt->packet + data_offset;

stats.vlan_stats[pkt->vlan_id].update(*pkt);
if( pkt->vlan_id2) {
stats.vlan_stats[pkt->vlan_id2].update(*pkt);
}

DEBUG_MSG("Payload length:\t%u\n", pkt->payload_len);
DEBUG_MSG("Packet parser exits: packet parsed\n");
Expand Down
1 change: 1 addition & 0 deletions src/plugins/process/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ add_subdirectory(smtp)
add_subdirectory(quic)
add_subdirectory(tls)
add_subdirectory(http)
add_subdirectory(qinq)

if (ENABLE_PROCESS_EXPERIMENTAL)
add_subdirectory(sip)
Expand Down
27 changes: 27 additions & 0 deletions src/plugins/process/qinq/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
project(ipfixprobe-process-qinq VERSION 1.0.0 DESCRIPTION "ipfixprobe-process-qinq plugin")

add_library(ipfixprobe-process-qinq MODULE
src/qinq.cpp
src/qinq.hpp
)

set_target_properties(ipfixprobe-process-qinq PROPERTIES
CXX_VISIBILITY_PRESET hidden
VISIBILITY_INLINES_HIDDEN YES
)

target_include_directories(ipfixprobe-process-qinq PRIVATE
${CMAKE_SOURCE_DIR}/include/
)

if(ENABLE_NEMEA)
target_link_libraries(ipfixprobe-process-qinq PRIVATE
-Wl,--whole-archive ipfixprobe-nemea-fields -Wl,--no-whole-archive
unirec::unirec
trap::trap
)
endif()

install(TARGETS ipfixprobe-process-qinq
LIBRARY DESTINATION "${INSTALL_DIR_LIB}/ipfixprobe/process/"
)
1 change: 1 addition & 0 deletions src/plugins/process/qinq/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
QinQ process plugin for parsing QinQ traffic, outputs outer and inner VLAN IDs.
56 changes: 56 additions & 0 deletions src/plugins/process/qinq/src/qinq.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* @file
* @brief Plugin for parsing basicplus traffic.
* @author Jakub Antonín Štigler xstigl00@xstigl00@stud.fit.vut.cz
* @author Pavel Siska <siska@cesnet.cz>
* @date 2025
*
* Copyright (c) 2025 CESNET
*
* SPDX-License-Identifier: BSD-3-Clause
*/

#include "qinq.hpp"

#include <iostream>

#include <ipfixprobe/pluginFactory/pluginManifest.hpp>
#include <ipfixprobe/pluginFactory/pluginRegistrar.hpp>

namespace ipxp {

static const PluginManifest qinqPluginManifest = {
.name = "qinq",
.description = "QinQ process plugin for parsing QinQ traffic, outputs outer and inner VLAN IDs.",
.pluginVersion = "1.0.0",
.apiVersion = "1.0.0",
.usage =
[]() {
OptionsParser parser("qinq", "Parse qinq traffic");
parser.usage(std::cout);
},
};

QinQPlugin::QinQPlugin(const std::string& params, int pluginID)
: ProcessPlugin(pluginID)
{
init(params.c_str());
}

ProcessPlugin* QinQPlugin::copy()
{
return new QinQPlugin(*this);
}

int QinQPlugin::post_create(Flow& rec, const Packet& pkt)
{
auto ext = new RecordExtQinQ(m_pluginID);
ext->vlan_id = pkt.vlan_id;
ext->vlan_id2 = pkt.vlan_id2;
rec.add_extension(ext);
return 0;
}

static const PluginRegistrar<QinQPlugin, ProcessPluginFactory> qinqRegistrar(qinqPluginManifest);

} // namespace ipxp
Loading