diff --git a/src/detectmatelibrary/common/_config/_formats.py b/src/detectmatelibrary/common/_config/_formats.py index 564d5e5..dcee7e2 100644 --- a/src/detectmatelibrary/common/_config/_formats.py +++ b/src/detectmatelibrary/common/_config/_formats.py @@ -6,16 +6,17 @@ # Sub-formats ********************************************************+ class Variable(BaseModel): - pos: int - name: str + pos: str | int + name: str = "" params: Dict[str, Any] = {} def to_dict(self) -> Dict[str, Any]: """Convert Variable to YAML-compatible dictionary.""" result: Dict[str, Any] = { "pos": self.pos, - "name": self.name, } + if self.name: + result["name"] = self.name if self.params: result["params"] = self.params return result @@ -38,7 +39,7 @@ def to_dict(self) -> Dict[str, Any]: class _EventInstance(BaseModel): """Configuration for a specific instance within an event.""" params: Dict[str, Any] = {} - variables: Dict[int, Variable] = {} + variables: Dict[str | int, Variable] = {} header_variables: Dict[str, Header] = {} @classmethod @@ -79,7 +80,7 @@ def _init(cls, instances_dict: Dict[str, Dict[str, Any]]) -> "_EventConfig": return cls(instances=instances) @property - def variables(self) -> Dict[int, Variable]: + def variables(self) -> Dict[str | int, Variable]: """Pass-through to first instance for compatibility.""" if self.instances: return next(iter(self.instances.values())).variables diff --git a/src/detectmatelibrary/common/detector.py b/src/detectmatelibrary/common/detector.py index e224f68..e3fad55 100644 --- a/src/detectmatelibrary/common/detector.py +++ b/src/detectmatelibrary/common/detector.py @@ -56,7 +56,7 @@ def get_configured_variables( # Extract template variables by position if hasattr(event_config, "variables"): for pos, var in event_config.variables.items(): - if pos < len(input_["variables"]): + if isinstance(pos, int) and pos < len(input_["variables"]): result[var.name] = input_["variables"][pos] # Extract header/log format variables by name diff --git a/src/detectmatelibrary/parsers/template_matcher/_matcher_op.py b/src/detectmatelibrary/parsers/template_matcher/_matcher_op.py index 8760ab2..399edfd 100644 --- a/src/detectmatelibrary/parsers/template_matcher/_matcher_op.py +++ b/src/detectmatelibrary/parsers/template_matcher/_matcher_op.py @@ -1,8 +1,17 @@ from collections import defaultdict -from typing import Dict, List, Any, Tuple +from typing import Dict, List, Any, Tuple, TypedDict import regex import re +from detectmatelibrary.common._config._formats import ( + EventsConfig, _EventConfig, _EventInstance, Variable +) + + +class TemplateMetadata(TypedDict): + event_id_label: str | None + labels: list[str] + def safe_search(pattern: str, string: str, timeout: int = 1) -> regex.Match[str] | None: """Perform regex search with a timeout to prevent catastrophic @@ -64,6 +73,7 @@ class TemplatesManager: def __init__( self, template_list: list[str], + metadata: dict[int, TemplateMetadata] | None = None, remove_spaces: bool = True, remove_punctuation: bool = True, lowercase: bool = True @@ -96,6 +106,61 @@ def __init__( first = tokens[0] if tokens else "" self._prefix_index[first].append(idx) + _metadata: dict[int, TemplateMetadata] = metadata or {} + self._event_label_to_idx: dict[str, int] = { + m["event_id_label"]: i + for i, m in _metadata.items() + if m["event_id_label"] + } + self._idx_to_var_map: dict[int, dict[str, int]] = { + i: {label: pos for pos, label in enumerate(m["labels"])} + for i, m in _metadata.items() + if m["labels"] + } + + def compile_events_config(self, events_config: EventsConfig) -> EventsConfig: + """Resolve named event IDs and named variable labels to positional + ints. + + Translates user-friendly named format to the internal positional + representation. Returns a new EventsConfig with only int keys + and int positions. + """ + new_events: Dict[Any, _EventConfig] = {} + + for event_key, event_config in events_config.events.items(): + if isinstance(event_key, str) and event_key in self._event_label_to_idx: + resolved_key: str | int = self._event_label_to_idx[event_key] + else: + resolved_key = event_key + + var_map = self._idx_to_var_map.get(resolved_key if isinstance(resolved_key, int) else -1, {}) + + new_instances: Dict[str, _EventInstance] = {} + for instance_id, instance in event_config.instances.items(): + new_vars: Dict[str | int, Variable] = {} + for pos, var in instance.variables.items(): + if isinstance(pos, str): + if pos not in var_map: + raise ValueError( + f"Label '{pos}' not found in template for event '{event_key}'. " + f"Available labels: {list(var_map)}" + ) + resolved_pos = var_map[pos] + new_vars[resolved_pos] = Variable( + pos=resolved_pos, name=pos, params=var.params + ) + else: + new_vars[pos] = var + new_instances[instance_id] = _EventInstance( + params=instance.params, + variables=new_vars, + header_variables=instance.header_variables, + ) + new_events[resolved_key] = _EventConfig(instances=new_instances) + + return EventsConfig(events=new_events) + def candidate_indices(self, s: str) -> Tuple[str, List[int]]: pre_s = self.preprocess(s) candidates = [] @@ -110,17 +175,28 @@ class TemplateMatcher: def __init__( self, template_list: list[str], + metadata: dict[int, TemplateMetadata] | None = None, remove_spaces: bool = True, remove_punctuation: bool = True, lowercase: bool = True ) -> None: self.manager = TemplatesManager( template_list=template_list, + metadata=metadata, remove_spaces=remove_spaces, remove_punctuation=remove_punctuation, lowercase=lowercase ) + def compile_detector_config(self, events_config: EventsConfig) -> EventsConfig: + """Resolve named event IDs and variable labels to positional ints. + + Call once at setup time. Returns a new EventsConfig using the + internal positional representation, compatible with + get_configured_variables(). + """ + return self.manager.compile_events_config(events_config) + @staticmethod def extract_parameters(log: str, template: str) -> tuple[str, ...] | None: """Extract parameters from the log based on the template.""" diff --git a/src/detectmatelibrary/parsers/template_matcher/_parser.py b/src/detectmatelibrary/parsers/template_matcher/_parser.py index 67fb5a8..edcae99 100644 --- a/src/detectmatelibrary/parsers/template_matcher/_parser.py +++ b/src/detectmatelibrary/parsers/template_matcher/_parser.py @@ -1,10 +1,13 @@ -from detectmatelibrary.parsers.template_matcher._matcher_op import TemplateMatcher +from detectmatelibrary.parsers.template_matcher._matcher_op import TemplateMatcher, TemplateMetadata from detectmatelibrary.common.parser import CoreParser, CoreParserConfig from detectmatelibrary import schemas from typing import Any import csv import os +import re + +_NAMED_WC_RE = re.compile(r'<([A-Za-z_]\w*)>') class TemplatesNotFoundError(Exception): @@ -15,34 +18,93 @@ class TemplateNoPermissionError(Exception): pass -def load_templates(path: str) -> list[str]: +def _compile_templates( + raw_templates: list[str], + event_id_labels: list[str | None] | None = None, +) -> tuple[list[str], dict[int, TemplateMetadata]]: + """Convert named wildcards to <*> and record label order and event ID + labels. + + Args: + raw_templates: Raw template strings, possibly containing named wildcards. + event_id_labels: Optional per-template event ID labels (from CSV EventId column). + If provided, must have the same length as raw_templates. + + Returns: + compiled: Template strings with only <*> wildcards, ready for TemplatesManager. + metadata: Mapping of template index to TemplateMetadata. + + Raises: + ValueError: If a template mixes <*> and named wildcards. + """ + compiled: list[str] = [] + metadata: dict[int, TemplateMetadata] = {} + + for i, raw in enumerate(raw_templates): + has_anon = "<*>" in raw + labels = _NAMED_WC_RE.findall(raw) + has_named = bool(labels) + + if has_anon and has_named: + raise ValueError( + f"Template mixes <*> and named wildcards: {raw!r}. " + "Use either <*> (positional) or