Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions src/detectmatelibrary/common/_config/_formats.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@

# Sub-formats ********************************************************+
class Variable(BaseModel):
pos: int
name: str
pos: str | int
name: str = ""
params: Dict[str, Any] = {}

def to_dict(self) -> Dict[str, Any]:
"""Convert Variable to YAML-compatible dictionary."""
result: Dict[str, Any] = {
"pos": self.pos,
"name": self.name,
}
if self.name:
result["name"] = self.name
if self.params:
result["params"] = self.params
return result
Expand All @@ -38,7 +39,7 @@ def to_dict(self) -> Dict[str, Any]:
class _EventInstance(BaseModel):
"""Configuration for a specific instance within an event."""
params: Dict[str, Any] = {}
variables: Dict[int, Variable] = {}
variables: Dict[str | int, Variable] = {}
header_variables: Dict[str, Header] = {}

@classmethod
Expand Down Expand Up @@ -79,7 +80,7 @@ def _init(cls, instances_dict: Dict[str, Dict[str, Any]]) -> "_EventConfig":
return cls(instances=instances)

@property
def variables(self) -> Dict[int, Variable]:
def variables(self) -> Dict[str | int, Variable]:
"""Pass-through to first instance for compatibility."""
if self.instances:
return next(iter(self.instances.values())).variables
Expand Down
2 changes: 1 addition & 1 deletion src/detectmatelibrary/common/detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def get_configured_variables(
# Extract template variables by position
if hasattr(event_config, "variables"):
for pos, var in event_config.variables.items():
if pos < len(input_["variables"]):
if isinstance(pos, int) and pos < len(input_["variables"]):
result[var.name] = input_["variables"][pos]

# Extract header/log format variables by name
Expand Down
78 changes: 77 additions & 1 deletion src/detectmatelibrary/parsers/template_matcher/_matcher_op.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
from collections import defaultdict
from typing import Dict, List, Any, Tuple
from typing import Dict, List, Any, Tuple, TypedDict
import regex
import re

from detectmatelibrary.common._config._formats import (
EventsConfig, _EventConfig, _EventInstance, Variable
)


class TemplateMetadata(TypedDict):
event_id_label: str | None
labels: list[str]


def safe_search(pattern: str, string: str, timeout: int = 1) -> regex.Match[str] | None:
"""Perform regex search with a timeout to prevent catastrophic
Expand Down Expand Up @@ -64,6 +73,7 @@ class TemplatesManager:
def __init__(
self,
template_list: list[str],
metadata: dict[int, TemplateMetadata] | None = None,
remove_spaces: bool = True,
remove_punctuation: bool = True,
lowercase: bool = True
Expand Down Expand Up @@ -96,6 +106,61 @@ def __init__(
first = tokens[0] if tokens else ""
self._prefix_index[first].append(idx)

_metadata: dict[int, TemplateMetadata] = metadata or {}
self._event_label_to_idx: dict[str, int] = {
m["event_id_label"]: i
for i, m in _metadata.items()
if m["event_id_label"]
}
self._idx_to_var_map: dict[int, dict[str, int]] = {
i: {label: pos for pos, label in enumerate(m["labels"])}
for i, m in _metadata.items()
if m["labels"]
}

def compile_events_config(self, events_config: EventsConfig) -> EventsConfig:
"""Resolve named event IDs and named variable labels to positional
ints.

Translates user-friendly named format to the internal positional
representation. Returns a new EventsConfig with only int keys
and int positions.
"""
new_events: Dict[Any, _EventConfig] = {}

for event_key, event_config in events_config.events.items():
if isinstance(event_key, str) and event_key in self._event_label_to_idx:
resolved_key: str | int = self._event_label_to_idx[event_key]
else:
resolved_key = event_key

var_map = self._idx_to_var_map.get(resolved_key if isinstance(resolved_key, int) else -1, {})

new_instances: Dict[str, _EventInstance] = {}
for instance_id, instance in event_config.instances.items():
new_vars: Dict[str | int, Variable] = {}
for pos, var in instance.variables.items():
if isinstance(pos, str):
if pos not in var_map:
raise ValueError(
f"Label '{pos}' not found in template for event '{event_key}'. "
f"Available labels: {list(var_map)}"
)
resolved_pos = var_map[pos]
new_vars[resolved_pos] = Variable(
pos=resolved_pos, name=pos, params=var.params
)
else:
new_vars[pos] = var
new_instances[instance_id] = _EventInstance(
params=instance.params,
variables=new_vars,
header_variables=instance.header_variables,
)
new_events[resolved_key] = _EventConfig(instances=new_instances)

return EventsConfig(events=new_events)

def candidate_indices(self, s: str) -> Tuple[str, List[int]]:
pre_s = self.preprocess(s)
candidates = []
Expand All @@ -110,17 +175,28 @@ class TemplateMatcher:
def __init__(
self,
template_list: list[str],
metadata: dict[int, TemplateMetadata] | None = None,
remove_spaces: bool = True,
remove_punctuation: bool = True,
lowercase: bool = True
) -> None:
self.manager = TemplatesManager(
template_list=template_list,
metadata=metadata,
remove_spaces=remove_spaces,
remove_punctuation=remove_punctuation,
lowercase=lowercase
)

def compile_detector_config(self, events_config: EventsConfig) -> EventsConfig:
"""Resolve named event IDs and variable labels to positional ints.

Call once at setup time. Returns a new EventsConfig using the
internal positional representation, compatible with
get_configured_variables().
"""
return self.manager.compile_events_config(events_config)

@staticmethod
def extract_parameters(log: str, template: str) -> tuple[str, ...] | None:
"""Extract parameters from the log based on the template."""
Expand Down
91 changes: 78 additions & 13 deletions src/detectmatelibrary/parsers/template_matcher/_parser.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from detectmatelibrary.parsers.template_matcher._matcher_op import TemplateMatcher
from detectmatelibrary.parsers.template_matcher._matcher_op import TemplateMatcher, TemplateMetadata
from detectmatelibrary.common.parser import CoreParser, CoreParserConfig
from detectmatelibrary import schemas

from typing import Any
import csv
import os
import re

_NAMED_WC_RE = re.compile(r'<([A-Za-z_]\w*)>')


class TemplatesNotFoundError(Exception):
Expand All @@ -15,34 +18,93 @@ class TemplateNoPermissionError(Exception):
pass


def load_templates(path: str) -> list[str]:
def _compile_templates(
raw_templates: list[str],
event_id_labels: list[str | None] | None = None,
) -> tuple[list[str], dict[int, TemplateMetadata]]:
"""Convert named wildcards to <*> and record label order and event ID
labels.

Args:
raw_templates: Raw template strings, possibly containing named wildcards.
event_id_labels: Optional per-template event ID labels (from CSV EventId column).
If provided, must have the same length as raw_templates.

Returns:
compiled: Template strings with only <*> wildcards, ready for TemplatesManager.
metadata: Mapping of template index to TemplateMetadata.

Raises:
ValueError: If a template mixes <*> and named wildcards.
"""
compiled: list[str] = []
metadata: dict[int, TemplateMetadata] = {}

for i, raw in enumerate(raw_templates):
has_anon = "<*>" in raw
labels = _NAMED_WC_RE.findall(raw)
has_named = bool(labels)

if has_anon and has_named:
raise ValueError(
f"Template mixes <*> and named wildcards: {raw!r}. "
"Use either <*> (positional) or <label> (named) exclusively."
)

compiled_tpl = _NAMED_WC_RE.sub("<*>", raw) if has_named else raw
idx = len(compiled)
compiled.append(compiled_tpl)
eid_label = event_id_labels[i] if event_id_labels else None
metadata[idx] = TemplateMetadata(event_id_label=eid_label, labels=labels)

return compiled, metadata


def load_templates(path: str) -> tuple[list[str], list[str | None]]:
"""Load templates from a .txt or .csv file.

Returns:
A tuple of (template_strings, event_id_labels). For .txt files, all
event_id_labels are None (positional IDs only). For .csv files, an
optional EventId column provides named event ID labels.
"""
if not os.path.exists(path):
raise TemplatesNotFoundError(f"Templates file not found at: {path}")
if not os.access(path, os.R_OK):
raise TemplateNoPermissionError(
f"You do not have the permission to access the templates file: {path}"
)
templates: list[str] = []
eid_labels: list[str | None] = []
if path.endswith(".txt"):
with open(path, "r") as f:
templates = [line.strip() for line in f if line.strip()]
for line in f:
s = line.strip()
if s:
templates.append(s)
eid_labels.append(None)
elif path.endswith(".csv"):
templates = []
# Use the lightweight built-in csv module instead of pandas
# Expect a header with a 'template' column
with open(path, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
if reader.fieldnames is None or "EventTemplate" not in reader.fieldnames:
raise ValueError("CSV file must contain a 'EventTemplate' column.")
has_event_id_col = "EventId" in (reader.fieldnames or [])
for row in reader:
val = row.get("EventTemplate")
if val is None:
continue
s = str(val).strip()
if s:
templates.append(s)
if not s:
continue
templates.append(s)
if has_event_id_col:
eid = str(row.get("EventId", "")).strip()
eid_labels.append(eid or None)
else:
eid_labels.append(None)
else:
raise ValueError("Unsupported template file format. Use .txt or .csv files.")
return templates
return templates, eid_labels


class MatcherParserConfig(CoreParserConfig):
Expand All @@ -67,11 +129,14 @@ def __init__(
super().__init__(name=name, config=config)
self.config: MatcherParserConfig

templates = load_templates(
self.config.path_templates
) if self.config.path_templates is not None else []
if self.config.path_templates is not None:
raw_templates, eid_labels = load_templates(self.config.path_templates)
else:
raw_templates, eid_labels = [], []
compiled_templates, metadata = _compile_templates(raw_templates, eid_labels)
self.template_matcher = TemplateMatcher(
template_list=templates,
template_list=compiled_templates,
metadata=metadata,
remove_spaces=self.config.remove_spaces,
remove_punctuation=self.config.remove_punctuation,
lowercase=self.config.lowercase,
Expand Down
87 changes: 87 additions & 0 deletions tests/test_common/test_config_roundtrip.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,3 +290,90 @@ def test_parser_true_roundtrip(self):

# Dicts should be identical
assert dict1 == dict2


class TestNamedVariablesRoundtrip:
"""Test that named wildcard positions and named event IDs round-trip
correctly."""

def test_named_pos_preserved_as_string(self):
"""String pos values must survive yaml -> pydantic -> yaml
unchanged."""
config_yaml = load_test_config()
method_id = "detector_named_pos"

config = MockupDetectorConfig.from_dict(config_yaml, method_id)
result_dict = config.to_dict(method_id)

original = config_yaml["detectors"][method_id]
result = result_dict["detectors"][method_id]

orig_vars = original["events"][1]["example_detector_1"]["variables"]
result_vars = result["events"][1]["example_detector_1"]["variables"]

assert len(result_vars) == len(orig_vars)
for orig_var, result_var in zip(orig_vars, result_vars):
assert result_var["pos"] == orig_var["pos"]
assert isinstance(result_var["pos"], str) # must stay a string, not coerced to int

def test_named_pos_no_name_field_when_omitted(self):
"""Variables with only a label pos must not emit a 'name' key."""
config_yaml = load_test_config()
config = MockupDetectorConfig.from_dict(config_yaml, "detector_named_pos")
result_dict = config.to_dict("detector_named_pos")

instance = result_dict["detectors"]["detector_named_pos"]["events"][1]["example_detector_1"]
no_params_var = instance["variables"][0] # pos: pid, no params

assert "pos" in no_params_var
assert no_params_var["pos"] == "pid"
assert "name" not in no_params_var

def test_named_pos_params_preserved(self):
"""Named pos variable with params must preserve those params."""
config_yaml = load_test_config()
config = MockupDetectorConfig.from_dict(config_yaml, "detector_named_pos")
result_dict = config.to_dict("detector_named_pos")

instance = result_dict["detectors"]["detector_named_pos"]["events"][1]["example_detector_1"]
with_params_var = instance["variables"][1] # pos: op, params: {threshold: 0.5}

assert with_params_var["pos"] == "op"
assert with_params_var["params"] == {"threshold": 0.5}

def test_named_event_id_preserved_as_string(self):
"""String event ID keys must survive yaml -> pydantic -> yaml
unchanged."""
config_yaml = load_test_config()
method_id = "detector_named_event_id"

config = MockupDetectorConfig.from_dict(config_yaml, method_id)
result_dict = config.to_dict(method_id)

result_events = result_dict["detectors"][method_id]["events"]
assert "login_failure" in result_events
assert isinstance(list(result_events.keys())[0], str)

def test_named_pos_true_roundtrip(self):
"""Yaml -> pydantic -> yaml -> pydantic produces identical objects."""
config_yaml = load_test_config()
method_id = "detector_named_pos"

config1 = MockupDetectorConfig.from_dict(config_yaml, method_id)
dict1 = config1.to_dict(method_id)
config2 = MockupDetectorConfig.from_dict(dict1, method_id)
dict2 = config2.to_dict(method_id)

assert dict1 == dict2

def test_named_event_id_true_roundtrip(self):
"""Yaml -> pydantic -> yaml -> pydantic produces identical objects."""
config_yaml = load_test_config()
method_id = "detector_named_event_id"

config1 = MockupDetectorConfig.from_dict(config_yaml, method_id)
dict1 = config1.to_dict(method_id)
config2 = MockupDetectorConfig.from_dict(dict1, method_id)
dict2 = config2.to_dict(method_id)

assert dict1 == dict2
Loading
Loading