Sigma: support EVTX periodic snapshots and analyzer noise filtering#553
Sigma: support EVTX periodic snapshots and analyzer noise filtering#553wmetcalf wants to merge 2 commits intoCAPESandbox:masterfrom
Conversation
- Handle multiple evtx zip files (snapshots + final) instead of single evtx.zip - Extract each snapshot into separate subdirectory for zircolite processing - Add evtx_dump-based pre-filtering to strip analyzer process noise - Load exclude patterns from filters.json pre_filters config - Formatting fixes (black/ruff compliance) Companion to kevoreilly/CAPEv2#2964 which adds the periodic EVTX snapshot collection in the guest analyzer.
There was a problem hiding this comment.
Code Review
This pull request updates the Sigma processing module to handle multiple EVTX snapshots by extracting all zip files in the evtx directory into separate subdirectories. It also introduces a pre-filtering step using evtx_dump to remove analyzer noise based on configurable process and path exclusions before running Zircolite. Feedback focuses on addressing potential memory exhaustion and performance overhead during log filtering, preventing file name collisions when processing multiple snapshots, and mitigating security evasion risks caused by broad substring matching. Additionally, it is recommended to make the evtx_dump binary path configurable.
modules/processing/sigma.py
Outdated
| result = subprocess.run( | ||
| [evtx_dump_bin, "-o", "jsonl", evtx_file], | ||
| capture_output=True, text=True, timeout=60 | ||
| ) | ||
| if result.returncode != 0: | ||
| continue | ||
| basename = os.path.basename(evtx_file).rsplit(".", 1)[0] + ".json" | ||
| jsonl_path = os.path.join(filtered_dir, basename) | ||
| with open(jsonl_path, "w") as out: | ||
| for line in result.stdout.splitlines(): | ||
| skip = False | ||
| line_lower = line.lower() | ||
| for parent in analyzer_exclude: | ||
| if parent in line_lower: | ||
| skip = True | ||
| break | ||
| if not skip and line.strip(): | ||
| out.write(line + "\n") |
There was a problem hiding this comment.
The current implementation of evtx_dump filtering has significant performance and memory overhead:
-
Memory Usage:
subprocess.run(capture_output=True)andresult.stdout.splitlines()load the entire JSONL output into memory. For large EVTX files, this can lead to Out-Of-Memory (OOM) conditions. -
CPU Efficiency: Iterating through the
analyzer_excludeset for every line of the log is inefficient ($O(N \times M)$).
It is highly recommended to use subprocess.Popen to stream the output line-by-line and use a single compiled regular expression for filtering.
There was a problem hiding this comment.
Fixed in 63da4f4 — switched to subprocess.Popen streaming and compiled regex for filtering.
modules/processing/sigma.py
Outdated
| basename = os.path.basename(evtx_file).rsplit(".", 1)[0] + ".json" | ||
| jsonl_path = os.path.join(filtered_dir, basename) |
There was a problem hiding this comment.
When multiple EVTX snapshots are processed, files with the same name (e.g., System.evtx) from different snapshot subdirectories will overwrite each other in the filtered directory because only the filename is used for the output. This causes data loss, as only the last snapshot's data will be scanned by Zircolite.
rel_path = os.path.relpath(evtx_file, tmpdir)
basename = rel_path.replace(os.sep, "_").rsplit(".", 1)[0] + ".json"
jsonl_path = os.path.join(filtered_dir, basename)There was a problem hiding this comment.
Fixed in 63da4f4 — using relative path from tmpdir to generate unique filenames per snapshot.
modules/processing/sigma.py
Outdated
| skip = False | ||
| line_lower = line.lower() | ||
| for parent in analyzer_exclude: | ||
| if parent in line_lower: |
There was a problem hiding this comment.
The substring match if parent in line_lower on the entire JSON line is prone to evasion. An attacker could bypass Sigma detection by including one of the excluded strings (e.g., python.exe) in their process name or command-line arguments. To improve security, consider parsing the JSON and checking specific fields like Image or ParentImage, or at least using more restrictive patterns (e.g., anchored regex) to avoid accidental suppression of malicious activity.
There was a problem hiding this comment.
Fixed in 63da4f4 — now parses JSON and checks specific fields (Image, ParentImage, TargetFilename) instead of substring matching on the whole line.
modules/processing/sigma.py
Outdated
| # Filter analyzer noise: convert evtx to JSONL via evtx_dump, | ||
| # strip events from the CAPE analyzer parent process, and | ||
| # feed clean JSONL to zircolite. | ||
| evtx_dump_bin = "/usr/local/bin/evtx_dump" |
There was a problem hiding this comment.
The path to the evtx_dump binary is hardcoded. It is recommended to make this configurable via the module options (e.g., self.options.get("evtx_dump_bin")) to allow for different installation paths across environments.
| evtx_dump_bin = "/usr/local/bin/evtx_dump" | |
| evtx_dump_bin = self.options.get("evtx_dump_bin", "/usr/local/bin/evtx_dump") |
There was a problem hiding this comment.
Fixed in 63da4f4 — now configurable via self.options.get("evtx_dump_bin").
There was a problem hiding this comment.
Pull request overview
This PR updates CAPEv2’s Sigma processing to handle periodic EVTX snapshot archives (in addition to a final EVTX zip) and introduces an evtx_dump-based pre-filter step intended to remove analyzer-generated noise before Zircolite Sigma matching.
Changes:
- Scan all EVTX zip archives found under the analysis
evtx/directory (snapshots + final), extracting each into its own subdirectory before Zircolite runs. - Add optional
evtx_dumpJSONL conversion + pre-filtering based onfilters.json/filters_local.jsonpre_filtersconfiguration (with defaults). - Minor formatting/refactor touch-ups in filter parsing/application and Zircolite invocation.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| tmpdir = None | ||
| try: | ||
| tmpdir = tempfile.mkdtemp(prefix="cape_sigma_evtx_") | ||
| real_tmpdir = os.path.realpath(tmpdir) |
There was a problem hiding this comment.
real_tmpdir is assigned but never used after the refactor to extract multiple zip files. Removing it will avoid dead code (and potential lint failures).
| real_tmpdir = os.path.realpath(tmpdir) |
| # Extract all evtx zips (snapshots are incremental, each | ||
| # contains events since the last wipe) | ||
| max_extracted = 5 * 1024 * 1024 * 1024 # 5 GB | ||
| for zip_idx, evtx_zip in enumerate(evtx_zips): | ||
| subdir = os.path.join(tmpdir, str(zip_idx)) | ||
| os.makedirs(subdir, exist_ok=True) | ||
| try: | ||
| with zipfile.ZipFile(evtx_zip, "r") as zf: | ||
| total_uncompressed = sum(m.file_size for m in zf.infolist()) | ||
| if total_uncompressed > max_extracted: | ||
| log.warning("evtx zip too large (%d bytes), skipping: %s", total_uncompressed, evtx_zip) | ||
| continue | ||
|
|
There was a problem hiding this comment.
The zip-bomb protection is now applied per-zip (total_uncompressed > max_extracted) but not across all snapshot zips. A crafted analysis could include many zips each under 5GB and still force huge total extraction + disk usage. Consider tracking cumulative extracted bytes across all zips (and/or capping number of zips) and aborting once a global limit is exceeded, similar to the previous single-zip behavior.
modules/processing/sigma.py
Outdated
| for line in result.stdout.splitlines(): | ||
| skip = False | ||
| line_lower = line.lower() | ||
| for parent in analyzer_exclude: | ||
| if parent in line_lower: | ||
| skip = True | ||
| break | ||
| if not skip and line.strip(): |
There was a problem hiding this comment.
The pre-filter currently lowercases each JSONL line and does a substring check for every exclude token. This can suppress unrelated events (e.g., a benign/malicious command line containing python.exe, or a file path containing one of the tokens) because it is not scoped to specific fields. To avoid false negatives, parse each JSON object and apply excludes to the intended keys (e.g., parent/image process fields and target path fields) with exact/regex matching rather than whole-line substring matching.
modules/processing/sigma.py
Outdated
| result = subprocess.run( | ||
| [evtx_dump_bin, "-o", "jsonl", evtx_file], | ||
| capture_output=True, text=True, timeout=60 | ||
| ) | ||
| if result.returncode != 0: | ||
| continue | ||
| basename = os.path.basename(evtx_file).rsplit(".", 1)[0] + ".json" | ||
| jsonl_path = os.path.join(filtered_dir, basename) | ||
| with open(jsonl_path, "w") as out: | ||
| for line in result.stdout.splitlines(): | ||
| skip = False | ||
| line_lower = line.lower() | ||
| for parent in analyzer_exclude: | ||
| if parent in line_lower: | ||
| skip = True | ||
| break | ||
| if not skip and line.strip(): | ||
| out.write(line + "\n") |
There was a problem hiding this comment.
subprocess.run(..., capture_output=True, text=True) buffers the entire evtx_dump JSONL output in memory. EVTX logs can be large, so this can cause high memory usage or OOM. Prefer streaming stdout (e.g., Popen(..., stdout=PIPE) and iterate line-by-line) or writing directly to a temp file and filtering incrementally.
| result = subprocess.run( | |
| [evtx_dump_bin, "-o", "jsonl", evtx_file], | |
| capture_output=True, text=True, timeout=60 | |
| ) | |
| if result.returncode != 0: | |
| continue | |
| basename = os.path.basename(evtx_file).rsplit(".", 1)[0] + ".json" | |
| jsonl_path = os.path.join(filtered_dir, basename) | |
| with open(jsonl_path, "w") as out: | |
| for line in result.stdout.splitlines(): | |
| skip = False | |
| line_lower = line.lower() | |
| for parent in analyzer_exclude: | |
| if parent in line_lower: | |
| skip = True | |
| break | |
| if not skip and line.strip(): | |
| out.write(line + "\n") | |
| basename = os.path.basename(evtx_file).rsplit(".", 1)[0] + ".json" | |
| jsonl_path = os.path.join(filtered_dir, basename) | |
| with open(jsonl_path, "w") as out: | |
| with subprocess.Popen( | |
| [evtx_dump_bin, "-o", "jsonl", evtx_file], | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.DEVNULL, | |
| text=True, | |
| ) as proc: | |
| for line in proc.stdout: | |
| skip = False | |
| line_lower = line.lower() | |
| for parent in analyzer_exclude: | |
| if parent in line_lower: | |
| skip = True | |
| break | |
| if not skip and line.strip(): | |
| out.write(line) | |
| try: | |
| returncode = proc.wait(timeout=60) | |
| except subprocess.TimeoutExpired: | |
| proc.kill() | |
| proc.wait() | |
| if os.path.exists(jsonl_path): | |
| os.remove(jsonl_path) | |
| continue | |
| if returncode != 0: | |
| if os.path.exists(jsonl_path): | |
| os.remove(jsonl_path) | |
| continue |
| for evtx_file in evtx_files: | ||
| try: | ||
| result = subprocess.run( | ||
| [evtx_dump_bin, "-o", "jsonl", evtx_file], | ||
| capture_output=True, text=True, timeout=60 | ||
| ) |
There was a problem hiding this comment.
The evtx_dump step uses a hardcoded timeout=60 per EVTX file and is not bounded by the timeout passed into _run_evtx (which is intended to cap processing time). With multiple snapshots/logs, total runtime can far exceed the configured sigma timeout. Consider deriving a per-file timeout from the remaining overall budget or making it configurable via self.options.
| try: | ||
| filters_path = self.options.get("filters", "data/sigma/filters.json") | ||
| filters_local = self.options.get("filters_local", "data/sigma/filters_local.json") | ||
| for fp in [filters_path, filters_local]: | ||
| if fp and not os.path.isabs(fp): | ||
| fp = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), fp) | ||
| if fp and os.path.exists(fp): | ||
| with open(fp) as _f: | ||
| _data = json.load(_f) | ||
| _pf = _data.get("pre_filters", {}) | ||
| for p in _pf.get("exclude_parent_processes", []): | ||
| analyzer_exclude.add(p.lower()) | ||
| for p in _pf.get("exclude_image_processes", []): | ||
| analyzer_exclude.add(p.lower()) | ||
| for p in _pf.get("exclude_target_paths", []): | ||
| analyzer_exclude.add(p.lower()) | ||
| except Exception: | ||
| pass |
There was a problem hiding this comment.
The pre_filters config load wraps everything in except Exception: pass, which will silently ignore JSON parsing errors / path issues and make misconfiguration hard to diagnose. Consider logging at least a debug/warning with the exception (and which file failed), while still falling back to defaults.
modules/processing/sigma.py
Outdated
| basename = os.path.basename(evtx_file).rsplit(".", 1)[0] + ".json" | ||
| jsonl_path = os.path.join(filtered_dir, basename) | ||
| with open(jsonl_path, "w") as out: |
There was a problem hiding this comment.
When processing multiple snapshot zips, different snapshots will often contain EVTX files with the same basename (e.g., Security.evtx). The filtered JSONL output path is built only from os.path.basename(evtx_file), so later snapshots will overwrite earlier ones in filtered_dir, losing events/detections. Include the snapshot index / relative path in the output filename (or mirror the directory structure) to keep outputs unique.
- Make evtx_dump_bin path configurable via module options - Use unique filenames per snapshot to prevent cross-snapshot collisions - Stream evtx_dump output via Popen instead of loading all into memory - Check specific JSON fields (Image, ParentImage, TargetFilename) instead of substring matching on entire line to prevent evasion - Use compiled regex for efficient exclude pattern matching
Summary
evtx.zipevtx_dump-based pre-filtering to strip analyzer process noise before sigma rule matchingfilters.jsonpre_filtersconfigContext
Companion to kevoreilly/CAPEv2#2964, which adds periodic EVTX snapshot collection (every 30s) in the guest analyzer to protect against malware clearing event logs.
Test plan
evtx.zip(no snapshots)