From d153ab29f3fe7acbce7e3d399d231493a01c0610 Mon Sep 17 00:00:00 2001 From: Tanvir Farhad Date: Thu, 4 Jun 2026 23:19:20 +0000 Subject: [PATCH 1/3] Add capability matrix covering all 13 retail rules, 6 generic rules, and 5 playbooks Documents every detection rule and response playbook with MITRE technique, tactic, severity, frequency, data sources, playbook trigger, and honest status labels (Complete vs Placeholder for generic stubs). Includes tactic coverage summary and watchlist/custom table dependency tables. Closes #73 --- docs/capability-matrix.md | 99 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 99 insertions(+) create mode 100644 docs/capability-matrix.md diff --git a/docs/capability-matrix.md b/docs/capability-matrix.md new file mode 100644 index 0000000..64542bf --- /dev/null +++ b/docs/capability-matrix.md @@ -0,0 +1,99 @@ +# RetailShield Capability Matrix + +This document maps every detection rule and response playbook to its threat coverage, MITRE ATT&CK alignment, data sources, and current implementation status. + +**Status key:** + +| Status | Meaning | +|--------|---------| +| ✅ Complete | Full KQL/ARM implementation, tested | +| 🔶 ARM template pending | KQL complete; Sentinel ARM template not yet in Content Hub package | +| ⬜ Placeholder | File exists with MITRE comment only; KQL logic not yet written | + +--- + +## Retail Detection Rules (13) + +These rules are tailored to UK retail environments and ingest from both standard Sentinel tables and RetailShield custom tables. + +| Rule | File | MITRE Technique | Tactic | Severity | Frequency | Data Sources | Playbook | Status | +|------|------|----------------|--------|----------|-----------|-------------|----------|--------| +| Retail Phishing Email Detection | `phishing_detection.kql` | T1566.001 | Initial Access | High | 5 min | EmailAttachmentInfo, EmailEvents | quarantine_email | ✅ Complete | +| POS Void/Refund Fraud Pattern | `pos_void_refund.kql` | T1056.001 | Collection | High | 5 min | RetailShield_POS_CL | notify_soc | ✅ Complete | +| MFA Fatigue / Push Bombing | `mfa_fatigue.kql` | T1621 | Credential Access | High | 5 min | SigninLogs | block_ip | ✅ Complete | +| Data Exfiltration via Alt Protocol | `data_exfiltration.kql` | T1048 | Exfiltration | Critical | 5 min | DeviceNetworkEvents, DeviceFileEvents, RetailIOCWatchlist | data_exfil_contain | ✅ Complete | +| AI-Assisted Voice Fraud (Vishing) | `ai_voice_fraud.kql` | T1598 | Reconnaissance | High | 30 min | RetailShield_Logs_CL | notify_soc | ✅ Complete | +| Credential Stuffing Attack | `credential_stuffing.kql` | T1110.004 | Credential Access | High | 5 min | SigninLogs, AbuseIPDBWatchlist | block_ip | ✅ Complete | +| After-Hours System Access | `after_hours_access.kql` | T1078 | Persistence | Medium | 5 min | SigninLogs, AuditLogs, DeviceLogonEvents, RetailServiceAccounts | notify_soc | ✅ Complete | +| Gift Card Fraud Pattern | `gift_card_fraud.kql` | T1657 | Impact | High | 5 min | RetailShield_POS_CL | notify_soc | ✅ Complete | +| POS Terminal Anomaly | `pos_anomaly.kql` | T1056.001 | Collection | High | 15 min | DeviceEvents, RetailShield_Logs_CL, DeviceNetworkEvents | suspend_terminal | ✅ Complete | +| Ransomware Indicator | `ransomware_indicator.kql` | T1486 | Impact | Critical | 5 min | DeviceFileEvents, DeviceProcessEvents, DeviceNetworkEvents, RetailIOCWatchlist | isolate_endpoint | ✅ Complete | +| Supply Chain / Third-Party Anomaly | `supply_chain_anomaly.kql` | T1195 | Initial Access | High | 30 min | AzureDiagnostics, AuditLogs | notify_soc | ✅ Complete | +| Supplier Impossible Travel | `supplier_impossible_travel.kql` | T1199, T1078 | Initial Access | Medium | 15 min | SigninLogs, RetailSupplierAccounts | notify_soc | ✅ Complete | +| Privileged Role Assignment | `privileged_role_addition.kql` | T1098, T1078 | Persistence, Privilege Escalation | High | 5 min | AuditLogs | notify_soc | ✅ Complete | + +--- + +## Generic Detection Rules (6) + +These rules cover cross-industry threats applicable to any Sentinel workspace. All are currently placeholder stubs — the MITRE mapping is defined but the KQL logic has not been written yet. + +| Rule | File | MITRE Technique | Tactic | Status | +|------|------|----------------|--------|--------| +| Brute Force Login | `brute-force-login.kql` | T1110 | Credential Access | ⬜ Placeholder | +| Bulk File Access | `bulk-file-access.kql` | T1005 | Collection | ⬜ Placeholder | +| C2 Beacon Detection | `c2-beacon.kql` | T1041 | Command and Control | ⬜ Placeholder | +| DNS Exfiltration | `dns-exfil.kql` | T1048 | Exfiltration | ⬜ Placeholder | +| RDP Lateral Movement | `rdp-lateral-movement.kql` | T1021.001 | Lateral Movement | ⬜ Placeholder | +| Suspicious PowerShell | `suspicious-powershell.kql` | T1059.001 | Execution | ⬜ Placeholder | + +--- + +## Response Playbooks (5) + +| Playbook | Directory | Trigger | Description | Status | +|----------|-----------|---------|-------------|--------| +| block-ip | `logic-apps/block-ip/` | block_ip | Blocks attacker IP via Microsoft Defender for Endpoint custom indicators | ✅ Complete | +| isolate-endpoint | `logic-apps/isolate-endpoint/` | isolate_endpoint | Network-isolates an endpoint via Microsoft Defender for Endpoint | ✅ Complete | +| quarantine-email | `logic-apps/quarantine-email/` | quarantine_email | Quarantines a malicious email via Microsoft Defender for Office 365 | ✅ Complete | +| suspend-terminal | `logic-apps/suspend-terminal/` | suspend_terminal | Disables a compromised POS terminal via a custom webhook | ✅ Complete | +| incident-reporting | `logic-apps/incident-reporting/` | High/Critical severity | UK compliance assistant: drafts ICO/NCSC report, calculates 24h/72h deadlines, emails internal compliance contact | ✅ Complete | + +--- + +## MITRE ATT&CK Tactic Coverage + +| Tactic | Rules Covering It | +|--------|-----------------| +| Initial Access | phishing_detection, supply_chain_anomaly, supplier_impossible_travel | +| Execution | (generic) suspicious-powershell ⬜ | +| Persistence | after_hours_access, privileged_role_addition | +| Privilege Escalation | privileged_role_addition | +| Credential Access | mfa_fatigue, credential_stuffing, (generic) brute-force-login ⬜ | +| Collection | pos_void_refund, pos_anomaly, (generic) bulk-file-access ⬜ | +| Exfiltration | data_exfiltration, (generic) dns-exfil ⬜ | +| Command and Control | (generic) c2-beacon ⬜ | +| Lateral Movement | (generic) rdp-lateral-movement ⬜ | +| Impact | gift_card_fraud, ransomware_indicator | +| Reconnaissance | ai_voice_fraud | + +--- + +## Watchlist Dependencies + +| Watchlist | Rules Using It | +|-----------|---------------| +| RetailIOCWatchlist | data_exfiltration, ransomware_indicator | +| AbuseIPDBWatchlist | credential_stuffing | +| RetailServiceAccounts | after_hours_access | +| RetailSupplierAccounts | supplier_impossible_travel | +| RetailApprovedSenders | *(reserved for future phishing allowlist use)* | + +--- + +## Custom Table Dependencies + +| Table | Rules Using It | +|-------|---------------| +| RetailShield_POS_CL | pos_void_refund, gift_card_fraud, pos_anomaly | +| RetailShield_Logs_CL | ai_voice_fraud, pos_anomaly | From 29902f7cdcd1da2a35c46ecea6b891fd84fe94b2 Mon Sep 17 00:00:00 2001 From: Tanvir Farhad Date: Fri, 5 Jun 2026 13:59:11 +0000 Subject: [PATCH 2/3] Fix corrupted test file: restore proper newlines in test_kql_rules.py The file was stored with literal \n sequences instead of real newlines (MCP push_files encoding bug from previous session), causing syntax errors in pytest and flake8 on every branch forked from dev. --- tests/detection-rules/test_kql_rules.py | 1261 ++++++++++++++++++++++- 1 file changed, 1260 insertions(+), 1 deletion(-) diff --git a/tests/detection-rules/test_kql_rules.py b/tests/detection-rules/test_kql_rules.py index 62c6525..bc294a0 100644 --- a/tests/detection-rules/test_kql_rules.py +++ b/tests/detection-rules/test_kql_rules.py @@ -1 +1,1260 @@ -"""\nRetailShield — KQL Rule Validation Test Suite\nValidates syntax structure, required metadata, and logic correctness\nfor all detection-rules/*.kql files without a live Sentinel connection.\n"""\n\nimport re\nfrom pathlib import Path\n\nRULES_DIR = Path(__file__).parent.parent.parent / \"detection-rules\"\n\n\ndef load_rule(filename: str) -> str:\n path = RULES_DIR / filename\n assert path.exists(), f\"Rule file not found: {path}\"\n return path.read_text(encoding=\"utf-8\")\n\n\n# ── Helpers ─────────────────────────────────────────────────────────────────────────────\n\ndef assert_metadata(content: str, key: str, value: str):\n pattern = rf\"//\\s*{re.escape(key)}\\s*:.*{re.escape(value)}\"\n assert re.search(pattern, content, re.IGNORECASE), (\n f\"Missing or incorrect metadata '{key}: {value}'\"\n )\n\n\ndef assert_contains(content: str, *fragments: str):\n for fragment in fragments:\n assert fragment in content, f\"Expected '{fragment}' not found in rule\"\n\n\ndef assert_not_contains(content: str, *fragments: str):\n for fragment in fragments:\n assert fragment not in content, f\"Unexpected fragment '{fragment}' found in rule\"\n\n\n# ── phishing_detection.kql ─────────────────────────────────────────────────────────────────────\n\nclass TestPhishingDetection:\n RULE = \"retail/phishing_detection.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1566.001\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Initial Access\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"High\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"5 minutes\")\n\n def test_queries_email_attachment_table(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"EmailAttachmentInfo\")\n\n def test_queries_email_events_table(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"EmailEvents\")\n\n def test_filters_inbound_delivered_emails(self):\n content = load_rule(self.RULE)\n assert_contains(content, '\"Delivered\"', '\"Inbound\"')\n\n def test_suspicious_extensions_defined(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"SuspiciousExtensions\",\n '\".exe\"',\n '\".ps1\"',\n '\".vbs\"',\n '\".docm\"',\n )\n\n def test_suppresses_approved_senders(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"ApprovedSenderDomains\", \"leftanti\")\n\n def test_lure_classification_present(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"LureCategory\",\n \"Financial\",\n \"Credential\",\n )\n\n def test_playbook_trigger_field_exposed(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"quarantine_email\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content), (\n \"Rule contains a hardcoded GUID — use a watchlist or parameter instead\"\n )\n\n def test_uses_ingestion_time_not_static_timestamp(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"ingestion_time()\")\n assert_not_contains(content, \"datetime(2\")\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n required_fields = [\n \"SenderAddress\",\n \"RecipientEmailAddress\",\n \"Subject\",\n \"AttachmentFileName\",\n \"AttachmentSHA256\",\n \"NetworkMessageId\",\n \"AlertSeverity\",\n ]\n for field in required_fields:\n assert_contains(content, field)\n\n\n# ── ransomware_indicator.kql ──────────────────────────────────────────────────────────────────\n\nclass TestRansomwareIndicator:\n RULE = \"retail/ransomware_indicator.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1486\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Impact\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"Critical\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"5 minutes\")\n\n def test_mass_file_rename_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"DeviceFileEvents\", \"FileRenamed\", \"MassRenameThresh\")\n\n def test_shadow_copy_deletion_signal(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"vssadmin delete shadows\",\n \"wmic shadowcopy delete\",\n \"bcdedit /set recoveryenabled no\",\n \"ShadowCopyDeletion\",\n )\n\n def test_known_ransomware_process_names(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"lockbit\",\n \"blackcat\",\n \"conti\",\n \"ryuk\",\n \"KnownRansomwareProcess\",\n )\n\n def test_c2_beacon_signal(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"DeviceNetworkEvents\",\n \"RetailIOCWatchlist\",\n \"BeaconThresh\",\n \"C2BeaconToRansomwareIP\",\n )\n\n def test_ioc_watchlist_used(self):\n content = load_rule(self.RULE)\n assert_contains(content, '_GetWatchlist(\"RetailIOCWatchlist\")')\n\n def test_private_ip_exclusions(self):\n content = load_rule(self.RULE)\n assert_contains(content, '\"10.\"', '\"192.168.\"', '\"127.0.0.1\"')\n\n def test_union_combines_all_signals(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"MassFileRename\",\n \"ShadowCopyDeletion\",\n \"RansomwareProcesses\",\n \"C2Beaconing\",\n )\n\n def test_playbook_trigger_field_exposed(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"isolate_endpoint\")\n\n def test_risk_score_is_100(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskScore\", \"100\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content), (\n \"Rule contains a hardcoded GUID — use a watchlist or parameter instead\"\n )\n\n def test_uses_ingestion_time_not_static_timestamp(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"ingestion_time()\")\n assert_not_contains(content, \"datetime(2\")\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n required_fields = [\n \"DeviceName\",\n \"DeviceId\",\n \"ThreatSignal\",\n \"ProcessCommandLine\",\n \"RenameCount\",\n \"SampleFiles\",\n \"AffectedFolders\",\n \"RemoteIP\",\n \"BeaconCount\",\n \"AlertSeverity\",\n \"MitreTechnique\",\n \"MitreTactic\",\n \"PlaybookTrigger\",\n \"RiskScore\",\n ]\n for field in required_fields:\n assert_contains(content, field)\n\n\n# ── pos_anomaly.kql ─────────────────────────────────────────────────────────────────────────────\n\nclass TestPosAnomaly:\n RULE = \"retail/pos_anomaly.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1056.001\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Collection\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"High\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"15 minutes\")\n\n def test_pos_process_names_defined(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"POSProcessNames\", \"xstore.exe\", \"aloha.exe\", \"posready.exe\")\n\n def test_retail_terminal_prefix_defined(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RetailTerminalPrefix\", '\"pos-\"', '\"till-\"', '\"kiosk-\"')\n\n def test_unknown_dll_injection_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"UnknownDLLInjection\", \"DeviceEvents\", \"ImageLoaded\")\n\n def test_abnormal_transaction_volume_signal(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"AbnormalTransactionVolume\",\n \"RetailShield_Logs_CL\",\n \"TransactionVolumeThreshold\",\n )\n\n def test_process_memory_dump_signal(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"ProcessMemoryDump\",\n \"ProcessDumped\",\n \"ProcDump\",\n )\n\n def test_suspicious_network_signal(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"SuspiciousNetworkFromPOS\",\n \"DeviceNetworkEvents\",\n \"RetailIOCWatchlist\",\n )\n\n def test_playbook_trigger_field_exposed(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"suspend_terminal\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_uses_ingestion_time_not_static_timestamp(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"ingestion_time()\")\n assert_not_contains(content, \"datetime(2\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content), (\n \"Rule contains a hardcoded GUID — use a watchlist or parameter instead\"\n )\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n required_fields = [\n \"AlertTitle\",\n \"DeviceName\",\n \"DeviceId\",\n \"AlertSeverity\",\n \"RiskScore\",\n \"SignalCount\",\n \"Signals\",\n \"MitreTechnique\",\n \"MitreTactic\",\n \"PlaybookTrigger\",\n \"FirstSeen\",\n \"LastSeen\",\n ]\n for field in required_fields:\n assert_contains(content, field)\n\n\n# ── ai_voice_fraud.kql ─────────────────────────────────────────────────────────────────────\n\nclass TestAiVoiceFraud:\n RULE = \"retail/ai_voice_fraud.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1598\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Reconnaissance\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"High\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"30 minutes\")\n\n def test_ai_confidence_threshold_defined(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"AIConfidenceThreshold\")\n\n def test_fraud_keywords_defined(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"FraudKeywords\",\n '\"urgent\"',\n '\"override\"',\n '\"bypass\"',\n '\"transfer\"',\n )\n\n def test_business_hours_defined(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"BusinessHoursStart\", \"BusinessHoursEnd\")\n\n def test_high_confidence_voice_fraud_signal(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"HighConfidenceVoiceFraud\",\n \"RetailShield_Logs_CL\",\n \"AI_Voice_Fraud\",\n )\n\n def test_urgent_financial_request_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"UrgentFinancialRequest\")\n\n def test_spoofed_caller_id_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"SpoofedCallerID\")\n\n def test_after_hours_voice_fraud_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"AfterHoursVoiceFraud\", \"hourofday\")\n\n def test_playbook_trigger_field_exposed(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"notify_soc\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_uses_ingestion_time_not_static_timestamp(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"ingestion_time()\")\n assert_not_contains(content, \"datetime(2\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content), (\n \"Rule contains a hardcoded GUID — use a watchlist or parameter instead\"\n )\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n required_fields = [\n \"AlertTitle\",\n \"DeviceName\",\n \"TargetEmployee\",\n \"ImpersonatingEntity\",\n \"RequestMade\",\n \"AlertSeverity\",\n \"RiskScore\",\n \"SignalCount\",\n \"Signals\",\n \"MitreTechnique\",\n \"MitreTactic\",\n \"PlaybookTrigger\",\n \"FirstSeen\",\n \"LastSeen\",\n ]\n for field in required_fields:\n assert_contains(content, field)\n\n\n# ── supply_chain_anomaly.kql ──────────────────────────────────────────────────────────────────\n\nclass TestSupplyChainAnomaly:\n RULE = \"retail/supply_chain_anomaly.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1195\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Initial Access\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"High\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"30 minutes\")\n\n def test_admin_endpoints_defined(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"AdminEndpoints\",\n '\"/admin\"',\n '\"/management\"',\n '\"/config\"',\n )\n\n def test_agreeable_endpoints_defined(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"AgreeableEndpoints\",\n '\"/inventory\"',\n '\"/orders\"',\n )\n\n def test_bulk_export_threshold_defined(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"BulkExportThreshold\")\n\n def test_supplier_admin_access_signal(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"SupplierAdminAccess\",\n \"AzureDiagnostics\",\n \"supplier_api_key\",\n )\n\n def test_new_service_principal_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"NewServicePrincipal\", \"AuditLogs\")\n\n def test_unauthorised_endpoint_access_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"UnauthorisedEndpointAccess\")\n\n def test_mass_data_export_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MassDataExport\", \"BulkExportThreshold\")\n\n def test_playbook_trigger_field_exposed(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"notify_soc\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_uses_ingestion_time_not_static_timestamp(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"ingestion_time()\")\n assert_not_contains(content, \"datetime(2\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content), (\n \"Rule contains a hardcoded GUID — use a watchlist or parameter instead\"\n )\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n required_fields = [\n \"AlertTitle\",\n \"DeviceName\",\n \"SupplierKey\",\n \"AlertSeverity\",\n \"RiskScore\",\n \"SignalCount\",\n \"Signals\",\n \"MitreTechnique\",\n \"MitreTactic\",\n \"PlaybookTrigger\",\n \"FirstSeen\",\n \"LastSeen\",\n ]\n for field in required_fields:\n assert_contains(content, field)\n\n\n# ── pos_void_refund.kql ──────────────────────────────────────────────────────────────────\n\nclass TestPosVoidRefund:\n RULE = \"retail/pos_void_refund.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1056.001\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Collection\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"High\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"5 minutes\")\n\n def test_source_table(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RetailShield_POS_CL\")\n\n def test_business_hours_params(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"BusinessHourStart\", \"BusinessHourEnd\")\n\n def test_threshold_params(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"VoidRefundThresh\", \"HighValueThresh\")\n\n def test_after_hours_void_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"AfterHoursVoidRefund\")\n\n def test_high_volume_void_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"HighVolumeVoidRefund\")\n\n def test_high_value_void_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"HighValueVoidNoOverride\")\n\n def test_tender_mismatch_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"TenderMismatchRefund\", \"CASH\", \"GIFT_CARD\")\n\n def test_playbook_trigger(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"notify_soc\")\n\n def test_risk_score(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskScore\", \"80\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content)\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n for field in [\n \"OperatorId_s\", \"StoreId_s\", \"TerminalId_s\", \"ThreatSignal\",\n \"TotalValue\", \"AlertSeverity\", \"MitreTechnique\", \"MitreTactic\",\n \"PlaybookTrigger\", \"RiskScore\",\n ]:\n assert_contains(content, field)\n\n\n# ── gift_card_fraud.kql ──────────────────────────────────────────────────────────────────\n\nclass TestGiftCardFraud:\n RULE = \"retail/gift_card_fraud.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1657\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Impact\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"High\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"5 minutes\")\n\n def test_source_table(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RetailShield_POS_CL\")\n\n def test_activation_threshold(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"GiftCardActivThresh\")\n\n def test_structuring_params(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"StructuringThreshold\", \"StructuringBand\")\n\n def test_multi_terminal_threshold(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MultiTerminalThresh\")\n\n def test_high_velocity_activation_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"HighVelocityGiftCardActivation\")\n\n def test_structured_purchase_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"StructuredGiftCardPurchase\")\n\n def test_drain_and_reload_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"GiftCardDrainAndReload\")\n\n def test_multi_terminal_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"GiftCardMultiTerminalUse\")\n\n def test_playbook_trigger(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"notify_soc\")\n\n def test_risk_score(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskScore\", \"85\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content)\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n for field in [\n \"GiftCardNumber_s\", \"OperatorId_s\", \"StoreId_s\", \"TerminalId_s\",\n \"ThreatSignal\", \"ActivationCount\", \"UniqueCards\", \"TotalValue\",\n \"TerminalCount\", \"TerminalList\", \"AlertSeverity\", \"MitreTechnique\",\n \"MitreTactic\", \"PlaybookTrigger\", \"RiskScore\",\n ]:\n assert_contains(content, field)\n\n\n# ── mfa_fatigue.kql ──────────────────────────────────────────────────────────────────\n\nclass TestMfaFatigue:\n RULE = \"retail/mfa_fatigue.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1621\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Credential Access\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"High\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"5 minutes\")\n\n def test_source_table(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"SigninLogs\")\n\n def test_mfa_prompt_threshold(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MFAPromptThresh\")\n\n def test_mfa_result_codes(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MFAResultCodes\", \"50074\", \"50076\", \"50079\")\n\n def test_prompt_flood_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MFAPromptFlood\")\n\n def test_fatigue_acceptance_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"FatigueAcceptance\")\n\n def test_distributed_mfa_flood_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"DistributedMFAFlood\")\n\n def test_risky_signin_post_flood_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskySigninPostMFAFlood\")\n\n def test_playbook_trigger(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"block_ip\")\n\n def test_risk_score(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskScore\", \"85\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content)\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n for field in [\n \"UserPrincipalName\", \"ThreatSignal\", \"PromptCount\", \"SourceIPs\",\n \"SourceIPList\", \"Locations\", \"AppNames\", \"SuccessIP\", \"SuccessCountry\",\n \"RiskLevel\", \"RiskyIP\", \"AlertSeverity\", \"MitreTechnique\",\n \"MitreTactic\", \"PlaybookTrigger\", \"RiskScore\",\n ]:\n assert_contains(content, field)\n\n\n# ── credential_stuffing.kql ──────────────────────────────────────────────────────────────────\n\nclass TestCredentialStuffing:\n RULE = \"retail/credential_stuffing.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1110.004\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Credential Access\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"High\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"5 minutes\")\n\n def test_source_table(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"SigninLogs\")\n\n def test_abuse_ipdb_watchlist(self):\n content = load_rule(self.RULE)\n assert_contains(content, '_GetWatchlist(\"AbuseIPDBWatchlist\")')\n\n def test_threshold_params(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"FailThreshPerAcct\", \"MinSourceIPs\")\n\n def test_distributed_login_failure_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"DistributedLoginFailure\")\n\n def test_account_takeover_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"AccountTakeoverAfterStuffing\")\n\n def test_blacklisted_ip_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"StuffingFromBlacklistedIP\")\n\n def test_risky_signin_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskySigninAfterStuffing\")\n\n def test_playbook_trigger(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"block_ip\")\n\n def test_risk_score(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskScore\", \"80\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content)\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n for field in [\n \"UserPrincipalName\", \"ThreatSignal\", \"FailCount\", \"SourceIPs\",\n \"SourceIPList\", \"IPAddress\", \"TargetAccts\", \"TargetList\",\n \"SuccessIP\", \"SuccessCountry\", \"RiskLevel\", \"AppNames\",\n \"AlertSeverity\", \"MitreTechnique\", \"MitreTactic\",\n \"PlaybookTrigger\", \"RiskScore\",\n ]:\n assert_contains(content, field)\n\n\n# ── after_hours_access.kql ──────────────────────────────────────────────────────────────────\n\nclass TestAfterHoursAccess:\n RULE = \"retail/after_hours_access.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1078\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Persistence\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"Medium\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"5 minutes\")\n\n def test_source_tables(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"SigninLogs\", \"AuditLogs\", \"DeviceLogonEvents\")\n\n def test_service_accounts_watchlist(self):\n content = load_rule(self.RULE)\n assert_contains(content, '_GetWatchlist(\"RetailServiceAccounts\")', \"leftanti\")\n\n def test_business_hours_params(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"BusinessHourStart\", \"BusinessHourEnd\")\n\n def test_after_hours_interactive_login_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"AfterHoursInteractiveLogin\")\n\n def test_after_hours_privileged_operation_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"AfterHoursPrivilegedOperation\")\n\n def test_after_hours_device_logon_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"AfterHoursSensitiveDeviceLogon\")\n\n def test_playbook_trigger(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"notify_soc\")\n\n def test_risk_score(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskScore\", \"60\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content)\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n for field in [\n \"Timestamp\", \"AccountName\", \"ThreatSignal\", \"LoginCount\",\n \"SourceIPs\", \"Locations\", \"AppNames\", \"Devices\",\n \"RemoteIP\", \"Operations\", \"TargetObjects\", \"AlertSeverity\",\n \"MitreTechnique\", \"MitreTactic\", \"PlaybookTrigger\", \"RiskScore\",\n ]:\n assert_contains(content, field)\n\n\n# ── data_exfiltration.kql ──────────────────────────────────────────────────────────────────\n\nclass TestDataExfiltration:\n RULE = \"retail/data_exfiltration.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1048\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Exfiltration\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"Critical\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"5 minutes\")\n\n def test_source_tables(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"DeviceNetworkEvents\", \"DeviceFileEvents\")\n\n def test_ioc_watchlist(self):\n content = load_rule(self.RULE)\n assert_contains(content, '_GetWatchlist(\"RetailIOCWatchlist\")')\n\n def test_threshold_params(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"DNSQueryThresh\", \"SubdomainLenThresh\",\n \"OutboundMBThresh\", \"StagingFileThresh\")\n\n def test_private_ip_exclusions(self):\n content = load_rule(self.RULE)\n assert_contains(content, '\"10.\"', '\"192.168.\"', '\"172.\"', '\"127.0.0.1\"')\n\n def test_dns_tunneling_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"DNSTunneling\")\n\n def test_large_outbound_transfer_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"LargeOutboundTransfer\")\n\n def test_ioc_matched_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"IOCMatchedExfilTarget\")\n\n def test_data_staging_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"DataStagingToExfil\")\n\n def test_playbook_trigger(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"data_exfil_contain\")\n\n def test_risk_score(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskScore\", \"90\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content)\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n for field in [\n \"Timestamp\", \"DeviceName\", \"DeviceId\", \"AccountName\", \"ThreatSignal\",\n \"NormalisedTarget\", \"QueryCount\", \"UniqueSubdomains\", \"TotalSentMB\",\n \"TotalSentBytes\", \"FileReadCount\", \"SampleFiles\", \"UniqueFolders\",\n \"ExfilTargets\", \"AlertSeverity\", \"MitreTechnique\", \"MitreTactic\",\n \"PlaybookTrigger\", \"RiskScore\",\n ]:\n assert_contains(content, field)\n\n\n# ── supplier_impossible_travel.kql ──────────────────────────────────────────────────────────\n\nclass TestSupplierImpossibleTravel:\n RULE = \"retail/supplier_impossible_travel.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1199\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Initial Access\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"Medium\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"15 minutes\")\n\n def test_source_table(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"SigninLogs\")\n\n def test_supplier_accounts_watchlist(self):\n content = load_rule(self.RULE)\n assert_contains(content, '_GetWatchlist(\"RetailSupplierAccounts\")')\n\n def test_impossible_speed_threshold(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"ImpossibleSpeedKmph\", \"900\")\n\n def test_high_risk_countries(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"HighRiskCountries\", '\"RU\"', '\"CN\"', '\"KP\"', '\"IR\"')\n\n def test_geo_distance_function(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"geo_distance_2points\")\n\n def test_impossible_travel_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"ImpossibleTravel\", \"RequiredSpeedKmph\")\n\n def test_new_country_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"NewCountryForSupplier\")\n\n def test_high_risk_country_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"HighRiskCountrySignin\")\n\n def test_playbook_trigger(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"notify_soc\")\n\n def test_risk_score(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskScore\", \"70\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content)\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n for field in [\n \"Timestamp\", \"UserPrincipalName\", \"SupplierName\", \"ThreatSignal\",\n \"IPAddress\", \"CountryCode\", \"City\", \"SecondCountry\", \"SecondCity\",\n \"RequiredSpeedKmph\", \"DistanceKm\", \"AlertSeverity\", \"MitreTechnique\",\n \"MitreTactic\", \"PlaybookTrigger\", \"RiskScore\",\n ]:\n assert_contains(content, field)\n\n\n# ── privileged_role_addition.kql ──────────────────────────────────────────────────────────\n\nclass TestPrivilegedRoleAddition:\n RULE = \"retail/privileged_role_addition.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1098\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Persistence\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"High\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"5 minutes\")\n\n def test_source_table(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"AuditLogs\")\n\n def test_sensitive_roles_defined(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"SensitiveRoles\",\n \"Global Administrator\",\n \"Privileged Role Administrator\",\n \"Security Administrator\",\n )\n\n def test_sensitive_role_assigned_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"SensitiveRoleAssigned\")\n\n def test_after_hours_role_addition_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"AfterHoursRoleAddition\", \"HourOfDay\")\n\n def test_mfa_change_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RoleAdditionFollowedByMFAChange\")\n\n def test_sensitive_group_member_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"SensitiveGroupMemberAdded\")\n\n def test_playbook_trigger(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"notify_soc\")\n\n def test_risk_score(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskScore\", \"85\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content)\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n for field in [\n \"Timestamp\", \"InitiatedByUser\", \"TargetUser\", \"RoleDisplayName\",\n \"ThreatSignal\", \"HourOfDay\", \"MFAOperation\", \"AlertSeverity\",\n \"MitreTechnique\", \"MitreTactic\", \"PlaybookTrigger\", \"RiskScore\",\n ]:\n assert_contains(content, field)\n \ No newline at end of file +""" +RetailShield — KQL Rule Validation Test Suite +Validates syntax structure, required metadata, and logic correctness +for all detection-rules/*.kql files without a live Sentinel connection. +""" + +import re +from pathlib import Path + +RULES_DIR = Path(__file__).parent.parent.parent / "detection-rules" + + +def load_rule(filename: str) -> str: + path = RULES_DIR / filename + assert path.exists(), f"Rule file not found: {path}" + return path.read_text(encoding="utf-8") + + +# ── Helpers ───────────────────────────────────────────────────────────────────────────── + +def assert_metadata(content: str, key: str, value: str): + pattern = rf"//\s*{re.escape(key)}\s*:.*{re.escape(value)}" + assert re.search(pattern, content, re.IGNORECASE), ( + f"Missing or incorrect metadata '{key}: {value}'" + ) + + +def assert_contains(content: str, *fragments: str): + for fragment in fragments: + assert fragment in content, f"Expected '{fragment}' not found in rule" + + +def assert_not_contains(content: str, *fragments: str): + for fragment in fragments: + assert fragment not in content, f"Unexpected fragment '{fragment}' found in rule" + + +# ── phishing_detection.kql ───────────────────────────────────────────────────────────────────── + +class TestPhishingDetection: + RULE = "retail/phishing_detection.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1566.001") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Initial Access") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "High") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "5 minutes") + + def test_queries_email_attachment_table(self): + content = load_rule(self.RULE) + assert_contains(content, "EmailAttachmentInfo") + + def test_queries_email_events_table(self): + content = load_rule(self.RULE) + assert_contains(content, "EmailEvents") + + def test_filters_inbound_delivered_emails(self): + content = load_rule(self.RULE) + assert_contains(content, '"Delivered"', '"Inbound"') + + def test_suspicious_extensions_defined(self): + content = load_rule(self.RULE) + assert_contains( + content, + "SuspiciousExtensions", + '".exe"', + '".ps1"', + '".vbs"', + '".docm"', + ) + + def test_suppresses_approved_senders(self): + content = load_rule(self.RULE) + assert_contains(content, "ApprovedSenderDomains", "leftanti") + + def test_lure_classification_present(self): + content = load_rule(self.RULE) + assert_contains( + content, + "LureCategory", + "Financial", + "Credential", + ) + + def test_playbook_trigger_field_exposed(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "quarantine_email") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content), ( + "Rule contains a hardcoded GUID — use a watchlist or parameter instead" + ) + + def test_uses_ingestion_time_not_static_timestamp(self): + content = load_rule(self.RULE) + assert_contains(content, "ingestion_time()") + assert_not_contains(content, "datetime(2") + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + required_fields = [ + "SenderAddress", + "RecipientEmailAddress", + "Subject", + "AttachmentFileName", + "AttachmentSHA256", + "NetworkMessageId", + "AlertSeverity", + ] + for field in required_fields: + assert_contains(content, field) + + +# ── ransomware_indicator.kql ────────────────────────────────────────────────────────────────── + +class TestRansomwareIndicator: + RULE = "retail/ransomware_indicator.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1486") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Impact") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "Critical") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "5 minutes") + + def test_mass_file_rename_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "DeviceFileEvents", "FileRenamed", "MassRenameThresh") + + def test_shadow_copy_deletion_signal(self): + content = load_rule(self.RULE) + assert_contains( + content, + "vssadmin delete shadows", + "wmic shadowcopy delete", + "bcdedit /set recoveryenabled no", + "ShadowCopyDeletion", + ) + + def test_known_ransomware_process_names(self): + content = load_rule(self.RULE) + assert_contains( + content, + "lockbit", + "blackcat", + "conti", + "ryuk", + "KnownRansomwareProcess", + ) + + def test_c2_beacon_signal(self): + content = load_rule(self.RULE) + assert_contains( + content, + "DeviceNetworkEvents", + "RetailIOCWatchlist", + "BeaconThresh", + "C2BeaconToRansomwareIP", + ) + + def test_ioc_watchlist_used(self): + content = load_rule(self.RULE) + assert_contains(content, '_GetWatchlist("RetailIOCWatchlist")') + + def test_private_ip_exclusions(self): + content = load_rule(self.RULE) + assert_contains(content, '"10."', '"192.168."', '"127.0.0.1"') + + def test_union_combines_all_signals(self): + content = load_rule(self.RULE) + assert_contains( + content, + "MassFileRename", + "ShadowCopyDeletion", + "RansomwareProcesses", + "C2Beaconing", + ) + + def test_playbook_trigger_field_exposed(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "isolate_endpoint") + + def test_risk_score_is_100(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskScore", "100") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content), ( + "Rule contains a hardcoded GUID — use a watchlist or parameter instead" + ) + + def test_uses_ingestion_time_not_static_timestamp(self): + content = load_rule(self.RULE) + assert_contains(content, "ingestion_time()") + assert_not_contains(content, "datetime(2") + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + required_fields = [ + "DeviceName", + "DeviceId", + "ThreatSignal", + "ProcessCommandLine", + "RenameCount", + "SampleFiles", + "AffectedFolders", + "RemoteIP", + "BeaconCount", + "AlertSeverity", + "MitreTechnique", + "MitreTactic", + "PlaybookTrigger", + "RiskScore", + ] + for field in required_fields: + assert_contains(content, field) + + +# ── pos_anomaly.kql ───────────────────────────────────────────────────────────────────────────── + +class TestPosAnomaly: + RULE = "retail/pos_anomaly.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1056.001") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Collection") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "High") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "15 minutes") + + def test_pos_process_names_defined(self): + content = load_rule(self.RULE) + assert_contains(content, "POSProcessNames", "xstore.exe", "aloha.exe", "posready.exe") + + def test_retail_terminal_prefix_defined(self): + content = load_rule(self.RULE) + assert_contains(content, "RetailTerminalPrefix", '"pos-"', '"till-"', '"kiosk-"') + + def test_unknown_dll_injection_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "UnknownDLLInjection", "DeviceEvents", "ImageLoaded") + + def test_abnormal_transaction_volume_signal(self): + content = load_rule(self.RULE) + assert_contains( + content, + "AbnormalTransactionVolume", + "RetailShield_Logs_CL", + "TransactionVolumeThreshold", + ) + + def test_process_memory_dump_signal(self): + content = load_rule(self.RULE) + assert_contains( + content, + "ProcessMemoryDump", + "ProcessDumped", + "ProcDump", + ) + + def test_suspicious_network_signal(self): + content = load_rule(self.RULE) + assert_contains( + content, + "SuspiciousNetworkFromPOS", + "DeviceNetworkEvents", + "RetailIOCWatchlist", + ) + + def test_playbook_trigger_field_exposed(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "suspend_terminal") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_uses_ingestion_time_not_static_timestamp(self): + content = load_rule(self.RULE) + assert_contains(content, "ingestion_time()") + assert_not_contains(content, "datetime(2") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content), ( + "Rule contains a hardcoded GUID — use a watchlist or parameter instead" + ) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + required_fields = [ + "AlertTitle", + "DeviceName", + "DeviceId", + "AlertSeverity", + "RiskScore", + "SignalCount", + "Signals", + "MitreTechnique", + "MitreTactic", + "PlaybookTrigger", + "FirstSeen", + "LastSeen", + ] + for field in required_fields: + assert_contains(content, field) + + +# ── ai_voice_fraud.kql ───────────────────────────────────────────────────────────────────── + +class TestAiVoiceFraud: + RULE = "retail/ai_voice_fraud.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1598") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Reconnaissance") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "High") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "30 minutes") + + def test_ai_confidence_threshold_defined(self): + content = load_rule(self.RULE) + assert_contains(content, "AIConfidenceThreshold") + + def test_fraud_keywords_defined(self): + content = load_rule(self.RULE) + assert_contains( + content, + "FraudKeywords", + '"urgent"', + '"override"', + '"bypass"', + '"transfer"', + ) + + def test_business_hours_defined(self): + content = load_rule(self.RULE) + assert_contains(content, "BusinessHoursStart", "BusinessHoursEnd") + + def test_high_confidence_voice_fraud_signal(self): + content = load_rule(self.RULE) + assert_contains( + content, + "HighConfidenceVoiceFraud", + "RetailShield_Logs_CL", + "AI_Voice_Fraud", + ) + + def test_urgent_financial_request_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "UrgentFinancialRequest") + + def test_spoofed_caller_id_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "SpoofedCallerID") + + def test_after_hours_voice_fraud_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "AfterHoursVoiceFraud", "hourofday") + + def test_playbook_trigger_field_exposed(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "notify_soc") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_uses_ingestion_time_not_static_timestamp(self): + content = load_rule(self.RULE) + assert_contains(content, "ingestion_time()") + assert_not_contains(content, "datetime(2") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content), ( + "Rule contains a hardcoded GUID — use a watchlist or parameter instead" + ) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + required_fields = [ + "AlertTitle", + "DeviceName", + "TargetEmployee", + "ImpersonatingEntity", + "RequestMade", + "AlertSeverity", + "RiskScore", + "SignalCount", + "Signals", + "MitreTechnique", + "MitreTactic", + "PlaybookTrigger", + "FirstSeen", + "LastSeen", + ] + for field in required_fields: + assert_contains(content, field) + + +# ── supply_chain_anomaly.kql ────────────────────────────────────────────────────────────────── + +class TestSupplyChainAnomaly: + RULE = "retail/supply_chain_anomaly.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1195") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Initial Access") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "High") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "30 minutes") + + def test_admin_endpoints_defined(self): + content = load_rule(self.RULE) + assert_contains( + content, + "AdminEndpoints", + '"/admin"', + '"/management"', + '"/config"', + ) + + def test_agreeable_endpoints_defined(self): + content = load_rule(self.RULE) + assert_contains( + content, + "AgreeableEndpoints", + '"/inventory"', + '"/orders"', + ) + + def test_bulk_export_threshold_defined(self): + content = load_rule(self.RULE) + assert_contains(content, "BulkExportThreshold") + + def test_supplier_admin_access_signal(self): + content = load_rule(self.RULE) + assert_contains( + content, + "SupplierAdminAccess", + "AzureDiagnostics", + "supplier_api_key", + ) + + def test_new_service_principal_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "NewServicePrincipal", "AuditLogs") + + def test_unauthorised_endpoint_access_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "UnauthorisedEndpointAccess") + + def test_mass_data_export_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "MassDataExport", "BulkExportThreshold") + + def test_playbook_trigger_field_exposed(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "notify_soc") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_uses_ingestion_time_not_static_timestamp(self): + content = load_rule(self.RULE) + assert_contains(content, "ingestion_time()") + assert_not_contains(content, "datetime(2") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content), ( + "Rule contains a hardcoded GUID — use a watchlist or parameter instead" + ) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + required_fields = [ + "AlertTitle", + "DeviceName", + "SupplierKey", + "AlertSeverity", + "RiskScore", + "SignalCount", + "Signals", + "MitreTechnique", + "MitreTactic", + "PlaybookTrigger", + "FirstSeen", + "LastSeen", + ] + for field in required_fields: + assert_contains(content, field) + + +# ── pos_void_refund.kql ────────────────────────────────────────────────────────────────── + +class TestPosVoidRefund: + RULE = "retail/pos_void_refund.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1056.001") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Collection") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "High") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "5 minutes") + + def test_source_table(self): + content = load_rule(self.RULE) + assert_contains(content, "RetailShield_POS_CL") + + def test_business_hours_params(self): + content = load_rule(self.RULE) + assert_contains(content, "BusinessHourStart", "BusinessHourEnd") + + def test_threshold_params(self): + content = load_rule(self.RULE) + assert_contains(content, "VoidRefundThresh", "HighValueThresh") + + def test_after_hours_void_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "AfterHoursVoidRefund") + + def test_high_volume_void_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "HighVolumeVoidRefund") + + def test_high_value_void_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "HighValueVoidNoOverride") + + def test_tender_mismatch_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "TenderMismatchRefund", "CASH", "GIFT_CARD") + + def test_playbook_trigger(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "notify_soc") + + def test_risk_score(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskScore", "80") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + for field in [ + "OperatorId_s", "StoreId_s", "TerminalId_s", "ThreatSignal", + "TotalValue", "AlertSeverity", "MitreTechnique", "MitreTactic", + "PlaybookTrigger", "RiskScore", + ]: + assert_contains(content, field) + + +# ── gift_card_fraud.kql ────────────────────────────────────────────────────────────────── + +class TestGiftCardFraud: + RULE = "retail/gift_card_fraud.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1657") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Impact") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "High") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "5 minutes") + + def test_source_table(self): + content = load_rule(self.RULE) + assert_contains(content, "RetailShield_POS_CL") + + def test_activation_threshold(self): + content = load_rule(self.RULE) + assert_contains(content, "GiftCardActivThresh") + + def test_structuring_params(self): + content = load_rule(self.RULE) + assert_contains(content, "StructuringThreshold", "StructuringBand") + + def test_multi_terminal_threshold(self): + content = load_rule(self.RULE) + assert_contains(content, "MultiTerminalThresh") + + def test_high_velocity_activation_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "HighVelocityGiftCardActivation") + + def test_structured_purchase_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "StructuredGiftCardPurchase") + + def test_drain_and_reload_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "GiftCardDrainAndReload") + + def test_multi_terminal_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "GiftCardMultiTerminalUse") + + def test_playbook_trigger(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "notify_soc") + + def test_risk_score(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskScore", "85") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + for field in [ + "GiftCardNumber_s", "OperatorId_s", "StoreId_s", "TerminalId_s", + "ThreatSignal", "ActivationCount", "UniqueCards", "TotalValue", + "TerminalCount", "TerminalList", "AlertSeverity", "MitreTechnique", + "MitreTactic", "PlaybookTrigger", "RiskScore", + ]: + assert_contains(content, field) + + +# ── mfa_fatigue.kql ────────────────────────────────────────────────────────────────── + +class TestMfaFatigue: + RULE = "retail/mfa_fatigue.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1621") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Credential Access") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "High") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "5 minutes") + + def test_source_table(self): + content = load_rule(self.RULE) + assert_contains(content, "SigninLogs") + + def test_mfa_prompt_threshold(self): + content = load_rule(self.RULE) + assert_contains(content, "MFAPromptThresh") + + def test_mfa_result_codes(self): + content = load_rule(self.RULE) + assert_contains(content, "MFAResultCodes", "50074", "50076", "50079") + + def test_prompt_flood_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "MFAPromptFlood") + + def test_fatigue_acceptance_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "FatigueAcceptance") + + def test_distributed_mfa_flood_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "DistributedMFAFlood") + + def test_risky_signin_post_flood_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskySigninPostMFAFlood") + + def test_playbook_trigger(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "block_ip") + + def test_risk_score(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskScore", "85") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + for field in [ + "UserPrincipalName", "ThreatSignal", "PromptCount", "SourceIPs", + "SourceIPList", "Locations", "AppNames", "SuccessIP", "SuccessCountry", + "RiskLevel", "RiskyIP", "AlertSeverity", "MitreTechnique", + "MitreTactic", "PlaybookTrigger", "RiskScore", + ]: + assert_contains(content, field) + + +# ── credential_stuffing.kql ────────────────────────────────────────────────────────────────── + +class TestCredentialStuffing: + RULE = "retail/credential_stuffing.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1110.004") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Credential Access") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "High") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "5 minutes") + + def test_source_table(self): + content = load_rule(self.RULE) + assert_contains(content, "SigninLogs") + + def test_abuse_ipdb_watchlist(self): + content = load_rule(self.RULE) + assert_contains(content, '_GetWatchlist("AbuseIPDBWatchlist")') + + def test_threshold_params(self): + content = load_rule(self.RULE) + assert_contains(content, "FailThreshPerAcct", "MinSourceIPs") + + def test_distributed_login_failure_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "DistributedLoginFailure") + + def test_account_takeover_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "AccountTakeoverAfterStuffing") + + def test_blacklisted_ip_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "StuffingFromBlacklistedIP") + + def test_risky_signin_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskySigninAfterStuffing") + + def test_playbook_trigger(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "block_ip") + + def test_risk_score(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskScore", "80") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + for field in [ + "UserPrincipalName", "ThreatSignal", "FailCount", "SourceIPs", + "SourceIPList", "IPAddress", "TargetAccts", "TargetList", + "SuccessIP", "SuccessCountry", "RiskLevel", "AppNames", + "AlertSeverity", "MitreTechnique", "MitreTactic", + "PlaybookTrigger", "RiskScore", + ]: + assert_contains(content, field) + + +# ── after_hours_access.kql ────────────────────────────────────────────────────────────────── + +class TestAfterHoursAccess: + RULE = "retail/after_hours_access.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1078") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Persistence") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "Medium") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "5 minutes") + + def test_source_tables(self): + content = load_rule(self.RULE) + assert_contains(content, "SigninLogs", "AuditLogs", "DeviceLogonEvents") + + def test_service_accounts_watchlist(self): + content = load_rule(self.RULE) + assert_contains(content, '_GetWatchlist("RetailServiceAccounts")', "leftanti") + + def test_business_hours_params(self): + content = load_rule(self.RULE) + assert_contains(content, "BusinessHourStart", "BusinessHourEnd") + + def test_after_hours_interactive_login_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "AfterHoursInteractiveLogin") + + def test_after_hours_privileged_operation_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "AfterHoursPrivilegedOperation") + + def test_after_hours_device_logon_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "AfterHoursSensitiveDeviceLogon") + + def test_playbook_trigger(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "notify_soc") + + def test_risk_score(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskScore", "60") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + for field in [ + "Timestamp", "AccountName", "ThreatSignal", "LoginCount", + "SourceIPs", "Locations", "AppNames", "Devices", + "RemoteIP", "Operations", "TargetObjects", "AlertSeverity", + "MitreTechnique", "MitreTactic", "PlaybookTrigger", "RiskScore", + ]: + assert_contains(content, field) + + +# ── data_exfiltration.kql ────────────────────────────────────────────────────────────────── + +class TestDataExfiltration: + RULE = "retail/data_exfiltration.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1048") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Exfiltration") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "Critical") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "5 minutes") + + def test_source_tables(self): + content = load_rule(self.RULE) + assert_contains(content, "DeviceNetworkEvents", "DeviceFileEvents") + + def test_ioc_watchlist(self): + content = load_rule(self.RULE) + assert_contains(content, '_GetWatchlist("RetailIOCWatchlist")') + + def test_threshold_params(self): + content = load_rule(self.RULE) + assert_contains(content, "DNSQueryThresh", "SubdomainLenThresh", + "OutboundMBThresh", "StagingFileThresh") + + def test_private_ip_exclusions(self): + content = load_rule(self.RULE) + assert_contains(content, '"10."', '"192.168."', '"172."', '"127.0.0.1"') + + def test_dns_tunneling_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "DNSTunneling") + + def test_large_outbound_transfer_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "LargeOutboundTransfer") + + def test_ioc_matched_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "IOCMatchedExfilTarget") + + def test_data_staging_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "DataStagingToExfil") + + def test_playbook_trigger(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "data_exfil_contain") + + def test_risk_score(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskScore", "90") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + for field in [ + "Timestamp", "DeviceName", "DeviceId", "AccountName", "ThreatSignal", + "NormalisedTarget", "QueryCount", "UniqueSubdomains", "TotalSentMB", + "TotalSentBytes", "FileReadCount", "SampleFiles", "UniqueFolders", + "ExfilTargets", "AlertSeverity", "MitreTechnique", "MitreTactic", + "PlaybookTrigger", "RiskScore", + ]: + assert_contains(content, field) + + +# ── supplier_impossible_travel.kql ────────────────────────────────────────────────────────── + +class TestSupplierImpossibleTravel: + RULE = "retail/supplier_impossible_travel.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1199") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Initial Access") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "Medium") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "15 minutes") + + def test_source_table(self): + content = load_rule(self.RULE) + assert_contains(content, "SigninLogs") + + def test_supplier_accounts_watchlist(self): + content = load_rule(self.RULE) + assert_contains(content, '_GetWatchlist("RetailSupplierAccounts")') + + def test_impossible_speed_threshold(self): + content = load_rule(self.RULE) + assert_contains(content, "ImpossibleSpeedKmph", "900") + + def test_high_risk_countries(self): + content = load_rule(self.RULE) + assert_contains(content, "HighRiskCountries", '"RU"', '"CN"', '"KP"', '"IR"') + + def test_geo_distance_function(self): + content = load_rule(self.RULE) + assert_contains(content, "geo_distance_2points") + + def test_impossible_travel_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "ImpossibleTravel", "RequiredSpeedKmph") + + def test_new_country_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "NewCountryForSupplier") + + def test_high_risk_country_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "HighRiskCountrySignin") + + def test_playbook_trigger(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "notify_soc") + + def test_risk_score(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskScore", "70") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + for field in [ + "Timestamp", "UserPrincipalName", "SupplierName", "ThreatSignal", + "IPAddress", "CountryCode", "City", "SecondCountry", "SecondCity", + "RequiredSpeedKmph", "DistanceKm", "AlertSeverity", "MitreTechnique", + "MitreTactic", "PlaybookTrigger", "RiskScore", + ]: + assert_contains(content, field) + + +# ── privileged_role_addition.kql ────────────────────────────────────────────────────────── + +class TestPrivilegedRoleAddition: + RULE = "retail/privileged_role_addition.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1098") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Persistence") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "High") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "5 minutes") + + def test_source_table(self): + content = load_rule(self.RULE) + assert_contains(content, "AuditLogs") + + def test_sensitive_roles_defined(self): + content = load_rule(self.RULE) + assert_contains( + content, + "SensitiveRoles", + "Global Administrator", + "Privileged Role Administrator", + "Security Administrator", + ) + + def test_sensitive_role_assigned_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "SensitiveRoleAssigned") + + def test_after_hours_role_addition_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "AfterHoursRoleAddition", "HourOfDay") + + def test_mfa_change_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "RoleAdditionFollowedByMFAChange") + + def test_sensitive_group_member_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "SensitiveGroupMemberAdded") + + def test_playbook_trigger(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "notify_soc") + + def test_risk_score(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskScore", "85") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + for field in [ + "Timestamp", "InitiatedByUser", "TargetUser", "RoleDisplayName", + "ThreatSignal", "HourOfDay", "MFAOperation", "AlertSeverity", + "MitreTechnique", "MitreTactic", "PlaybookTrigger", "RiskScore", + ]: + assert_contains(content, field) From 9a26359853f9a8d1dfe1ea01e9aa2983c1891989 Mon Sep 17 00:00:00 2001 From: Tanvir Farhad Date: Fri, 5 Jun 2026 14:00:04 +0000 Subject: [PATCH 3/3] Fix E127 flake8 in test_kql_rules.py (continuation line indent) --- tests/detection-rules/test_kql_rules.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/detection-rules/test_kql_rules.py b/tests/detection-rules/test_kql_rules.py index bc294a0..4b9d1e0 100644 --- a/tests/detection-rules/test_kql_rules.py +++ b/tests/detection-rules/test_kql_rules.py @@ -1034,8 +1034,10 @@ def test_ioc_watchlist(self): def test_threshold_params(self): content = load_rule(self.RULE) - assert_contains(content, "DNSQueryThresh", "SubdomainLenThresh", - "OutboundMBThresh", "StagingFileThresh") + assert_contains( + content, "DNSQueryThresh", "SubdomainLenThresh", + "OutboundMBThresh", "StagingFileThresh" + ) def test_private_ip_exclusions(self): content = load_rule(self.RULE)