diff --git a/sentinel/data-connectors/README.md b/sentinel/data-connectors/README.md new file mode 100644 index 0000000..4ca1261 --- /dev/null +++ b/sentinel/data-connectors/README.md @@ -0,0 +1,96 @@ +# RetailShield — Data Connector Definition + +This directory contains the ARM template that provisions the two custom Log Analytics tables used by RetailShield detection rules. + +## Custom Tables + +### RetailShield_POS_CL + +Point-of-sale transaction events from retail terminals. Used by: + +| Detection Rule | Event Types | +|---------------|-------------| +| `pos_void_refund.kql` | `pos_void_refund` | +| `gift_card_fraud.kql` | `gift_card_fraud` | +| `pos_anomaly.kql` | `pos_anomaly` (also uses `RetailShield_Logs_CL`) | + +**Key columns:** + +| Column | Type | Description | +|--------|------|-------------| +| `TerminalID` | string | POS terminal identifier | +| `StoreID` | string | Retail store identifier | +| `EmployeeID` | string | Cashier/operator who performed the transaction | +| `OperatorID` | string | Supervisor who authorised void/refund | +| `TransactionType` | string | Sale, Void, Refund, GiftCardIssue, GiftCardReload, GiftCardRedeem, NoSale | +| `TransactionAmount` | real | Value in GBP | +| `VoidCount` | int | Void transaction count in detection window | +| `RefundCount` | int | Refund transaction count in detection window | +| `TotalGiftCardValue` | real | Aggregate gift card transaction value | +| `RiskScore` | int | RetailShield risk score (0–100) | +| `PlaybookTrigger` | string | Recommended response playbook | + +### RetailShield_Logs_CL + +General RetailShield log events covering voice fraud detection and POS device anomalies. Used by: + +| Detection Rule | Event Types | +|---------------|-------------| +| `ai_voice_fraud.kql` | `voice_fraud` | +| `pos_anomaly.kql` | `pos_anomaly` | + +**Key columns:** + +| Column | Type | Description | +|--------|------|-------------| +| `EventType` | string | voice_fraud, pos_anomaly, network_anomaly | +| `CallerID` | string | Caller identifier or phone number (voice fraud) | +| `CallCount` | int | Call volume within detection window | +| `VoiceAnomalyScore` | real | AI anomaly score 0.0–1.0 | +| `ProcessName` | string | Suspicious process name (POS anomaly) | +| `DeviceName` | string | Host device name | +| `RiskScore` | int | RetailShield risk score (0–100) | +| `PlaybookTrigger` | string | Recommended response playbook | + +## Deployment + +```bash +az deployment group create \ + --resource-group \ + --template-file sentinel/data-connectors/retailshield-connector.json \ + --parameters workspaceName= +``` + +This creates both custom tables in the workspace. Tables must exist before deploying analytics rules or ingesting data. + +## Data Ingestion + +Use the [Log Analytics Data Collector API](https://learn.microsoft.com/en-us/azure/azure-monitor/logs/data-collector-api) or the Azure Monitor Ingestion API (DCR-based) to send records to these tables from your POS middleware or SIEM feed: + +```bash +# Example ingestion via REST (simplified) +POST https://.ods.opinsights.azure.com/api/logs?api-version=2016-04-01 +Content-Type: application/json +Log-Type: RetailShield_POS +x-ms-date: +Authorization: SharedKey : + +[{"TerminalID":"POS-001","StoreID":"ST-042","EmployeeID":"EMP-123",...}] +``` + +## Standard Tables Also Used + +The following built-in Sentinel/Defender tables are referenced by RetailShield rules and require the corresponding data connectors to be enabled in your workspace: + +| Table | Data Connector | +|-------|---------------| +| `SigninLogs` | Azure Active Directory | +| `AuditLogs` | Azure Active Directory | +| `DeviceNetworkEvents` | Microsoft Defender for Endpoint | +| `DeviceFileEvents` | Microsoft Defender for Endpoint | +| `DeviceProcessEvents` | Microsoft Defender for Endpoint | +| `DeviceEvents` | Microsoft Defender for Endpoint | +| `DeviceLogonEvents` | Microsoft Defender for Endpoint | +| `EmailAttachmentInfo` | Microsoft Defender for Office 365 | +| `EmailEvents` | Microsoft Defender for Office 365 | +| `AzureDiagnostics` | Azure Diagnostics | diff --git a/sentinel/data-connectors/retailshield-connector.json b/sentinel/data-connectors/retailshield-connector.json new file mode 100644 index 0000000..774cc9b --- /dev/null +++ b/sentinel/data-connectors/retailshield-connector.json @@ -0,0 +1,231 @@ +{ + "$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#", + "contentVersion": "1.0.0.0", + "parameters": { + "workspaceName": { + "type": "string", + "metadata": { + "description": "Name of the Log Analytics workspace where Microsoft Sentinel is deployed." + } + }, + "location": { + "type": "string", + "defaultValue": "[resourceGroup().location]", + "metadata": { + "description": "Azure region for all resources." + } + } + }, + "variables": { + "connectorId": "RetailShield", + "connectorVersion": "1.0.0" + }, + "resources": [ + { + "type": "Microsoft.OperationalInsights/workspaces/tables", + "apiVersion": "2022-10-01", + "name": "[concat(parameters('workspaceName'), '/RetailShield_POS_CL')]", + "location": "[parameters('location')]", + "properties": { + "schema": { + "name": "RetailShield_POS_CL", + "columns": [ + { + "name": "TimeGenerated", + "type": "datetime", + "description": "Timestamp when the POS event was ingested into the workspace." + }, + { + "name": "TerminalID", + "type": "string", + "description": "Unique identifier of the POS terminal that generated the event." + }, + { + "name": "StoreID", + "type": "string", + "description": "Retail store identifier." + }, + { + "name": "EmployeeID", + "type": "string", + "description": "Operator or cashier identifier performing the transaction." + }, + { + "name": "OperatorID", + "type": "string", + "description": "Supervisor or override operator identifier (used for void/refund authorisation)." + }, + { + "name": "TransactionType", + "type": "string", + "description": "Type of POS transaction: Sale, Void, Refund, GiftCardIssue, GiftCardReload, GiftCardRedeem, NoSale." + }, + { + "name": "TransactionAmount", + "type": "real", + "description": "Transaction value in GBP." + }, + { + "name": "TransactionCount", + "type": "int", + "description": "Number of transactions within the aggregation window." + }, + { + "name": "PaymentMethod", + "type": "string", + "description": "Payment method: Cash, Card, GiftCard, ContactlessMobile." + }, + { + "name": "CardLast4", + "type": "string", + "description": "Last 4 digits of payment card (if applicable, for correlation only)." + }, + { + "name": "GiftCardID", + "type": "string", + "description": "Gift card identifier (for gift card transaction types)." + }, + { + "name": "TotalGiftCardValue", + "type": "real", + "description": "Aggregated value of gift card transactions within the detection window." + }, + { + "name": "VoidCount", + "type": "int", + "description": "Number of void transactions by the operator in the lookback window." + }, + { + "name": "RefundCount", + "type": "int", + "description": "Number of refund transactions by the operator in the lookback window." + }, + { + "name": "EventType", + "type": "string", + "description": "RetailShield event classification: pos_void_refund, gift_card_fraud, pos_no_sale." + }, + { + "name": "RiskScore", + "type": "int", + "description": "RetailShield risk score 0-100; higher values indicate greater anomaly confidence." + }, + { + "name": "SignalName", + "type": "string", + "description": "RetailShield detection signal that triggered this record." + }, + { + "name": "PlaybookTrigger", + "type": "string", + "description": "Recommended playbook to invoke: notify_soc, suspend_terminal, block_ip." + }, + { + "name": "SourceSystem", + "type": "string", + "description": "Source system tag — always 'RetailShield-POS' for this table." + } + ] + } + } + }, + { + "type": "Microsoft.OperationalInsights/workspaces/tables", + "apiVersion": "2022-10-01", + "name": "[concat(parameters('workspaceName'), '/RetailShield_Logs_CL')]", + "location": "[parameters('location')]", + "properties": { + "schema": { + "name": "RetailShield_Logs_CL", + "columns": [ + { + "name": "TimeGenerated", + "type": "datetime", + "description": "Timestamp when the log event was ingested into the workspace." + }, + { + "name": "EventType", + "type": "string", + "description": "RetailShield event classification: voice_fraud, pos_anomaly, network_anomaly." + }, + { + "name": "DeviceName", + "type": "string", + "description": "Hostname of the device generating the log event." + }, + { + "name": "TerminalID", + "type": "string", + "description": "POS terminal identifier (for POS-related log events)." + }, + { + "name": "StoreID", + "type": "string", + "description": "Retail store identifier." + }, + { + "name": "CallerID", + "type": "string", + "description": "Caller identifier or phone number (for voice fraud events)." + }, + { + "name": "CallCount", + "type": "int", + "description": "Number of calls from the CallerID within the detection window." + }, + { + "name": "CallDurationSeconds", + "type": "int", + "description": "Total or average call duration in seconds." + }, + { + "name": "VoiceAnomalyScore", + "type": "real", + "description": "AI-derived anomaly score for voice pattern analysis (0.0–1.0)." + }, + { + "name": "ProcessName", + "type": "string", + "description": "Process name involved in POS anomaly events." + }, + { + "name": "ProcessCommandLine", + "type": "string", + "description": "Process command line for POS anomaly events." + }, + { + "name": "NetworkDestination", + "type": "string", + "description": "Destination IP or hostname for network anomaly events." + }, + { + "name": "BytesSent", + "type": "long", + "description": "Bytes sent to network destination." + }, + { + "name": "RiskScore", + "type": "int", + "description": "RetailShield risk score 0-100." + }, + { + "name": "SignalName", + "type": "string", + "description": "RetailShield detection signal that triggered this record." + }, + { + "name": "PlaybookTrigger", + "type": "string", + "description": "Recommended playbook to invoke." + }, + { + "name": "SourceSystem", + "type": "string", + "description": "Source system tag — always 'RetailShield-Logs' for this table." + } + ] + } + } + } + ] +} diff --git a/tests/detection-rules/test_kql_rules.py b/tests/detection-rules/test_kql_rules.py index 62c6525..4b9d1e0 100644 --- a/tests/detection-rules/test_kql_rules.py +++ b/tests/detection-rules/test_kql_rules.py @@ -1 +1,1262 @@ -"""\nRetailShield — KQL Rule Validation Test Suite\nValidates syntax structure, required metadata, and logic correctness\nfor all detection-rules/*.kql files without a live Sentinel connection.\n"""\n\nimport re\nfrom pathlib import Path\n\nRULES_DIR = Path(__file__).parent.parent.parent / \"detection-rules\"\n\n\ndef load_rule(filename: str) -> str:\n path = RULES_DIR / filename\n assert path.exists(), f\"Rule file not found: {path}\"\n return path.read_text(encoding=\"utf-8\")\n\n\n# ── Helpers ─────────────────────────────────────────────────────────────────────────────\n\ndef assert_metadata(content: str, key: str, value: str):\n pattern = rf\"//\\s*{re.escape(key)}\\s*:.*{re.escape(value)}\"\n assert re.search(pattern, content, re.IGNORECASE), (\n f\"Missing or incorrect metadata '{key}: {value}'\"\n )\n\n\ndef assert_contains(content: str, *fragments: str):\n for fragment in fragments:\n assert fragment in content, f\"Expected '{fragment}' not found in rule\"\n\n\ndef assert_not_contains(content: str, *fragments: str):\n for fragment in fragments:\n assert fragment not in content, f\"Unexpected fragment '{fragment}' found in rule\"\n\n\n# ── phishing_detection.kql ─────────────────────────────────────────────────────────────────────\n\nclass TestPhishingDetection:\n RULE = \"retail/phishing_detection.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1566.001\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Initial Access\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"High\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"5 minutes\")\n\n def test_queries_email_attachment_table(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"EmailAttachmentInfo\")\n\n def test_queries_email_events_table(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"EmailEvents\")\n\n def test_filters_inbound_delivered_emails(self):\n content = load_rule(self.RULE)\n assert_contains(content, '\"Delivered\"', '\"Inbound\"')\n\n def test_suspicious_extensions_defined(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"SuspiciousExtensions\",\n '\".exe\"',\n '\".ps1\"',\n '\".vbs\"',\n '\".docm\"',\n )\n\n def test_suppresses_approved_senders(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"ApprovedSenderDomains\", \"leftanti\")\n\n def test_lure_classification_present(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"LureCategory\",\n \"Financial\",\n \"Credential\",\n )\n\n def test_playbook_trigger_field_exposed(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"quarantine_email\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content), (\n \"Rule contains a hardcoded GUID — use a watchlist or parameter instead\"\n )\n\n def test_uses_ingestion_time_not_static_timestamp(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"ingestion_time()\")\n assert_not_contains(content, \"datetime(2\")\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n required_fields = [\n \"SenderAddress\",\n \"RecipientEmailAddress\",\n \"Subject\",\n \"AttachmentFileName\",\n \"AttachmentSHA256\",\n \"NetworkMessageId\",\n \"AlertSeverity\",\n ]\n for field in required_fields:\n assert_contains(content, field)\n\n\n# ── ransomware_indicator.kql ──────────────────────────────────────────────────────────────────\n\nclass TestRansomwareIndicator:\n RULE = \"retail/ransomware_indicator.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1486\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Impact\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"Critical\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"5 minutes\")\n\n def test_mass_file_rename_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"DeviceFileEvents\", \"FileRenamed\", \"MassRenameThresh\")\n\n def test_shadow_copy_deletion_signal(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"vssadmin delete shadows\",\n \"wmic shadowcopy delete\",\n \"bcdedit /set recoveryenabled no\",\n \"ShadowCopyDeletion\",\n )\n\n def test_known_ransomware_process_names(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"lockbit\",\n \"blackcat\",\n \"conti\",\n \"ryuk\",\n \"KnownRansomwareProcess\",\n )\n\n def test_c2_beacon_signal(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"DeviceNetworkEvents\",\n \"RetailIOCWatchlist\",\n \"BeaconThresh\",\n \"C2BeaconToRansomwareIP\",\n )\n\n def test_ioc_watchlist_used(self):\n content = load_rule(self.RULE)\n assert_contains(content, '_GetWatchlist(\"RetailIOCWatchlist\")')\n\n def test_private_ip_exclusions(self):\n content = load_rule(self.RULE)\n assert_contains(content, '\"10.\"', '\"192.168.\"', '\"127.0.0.1\"')\n\n def test_union_combines_all_signals(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"MassFileRename\",\n \"ShadowCopyDeletion\",\n \"RansomwareProcesses\",\n \"C2Beaconing\",\n )\n\n def test_playbook_trigger_field_exposed(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"isolate_endpoint\")\n\n def test_risk_score_is_100(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskScore\", \"100\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content), (\n \"Rule contains a hardcoded GUID — use a watchlist or parameter instead\"\n )\n\n def test_uses_ingestion_time_not_static_timestamp(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"ingestion_time()\")\n assert_not_contains(content, \"datetime(2\")\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n required_fields = [\n \"DeviceName\",\n \"DeviceId\",\n \"ThreatSignal\",\n \"ProcessCommandLine\",\n \"RenameCount\",\n \"SampleFiles\",\n \"AffectedFolders\",\n \"RemoteIP\",\n \"BeaconCount\",\n \"AlertSeverity\",\n \"MitreTechnique\",\n \"MitreTactic\",\n \"PlaybookTrigger\",\n \"RiskScore\",\n ]\n for field in required_fields:\n assert_contains(content, field)\n\n\n# ── pos_anomaly.kql ─────────────────────────────────────────────────────────────────────────────\n\nclass TestPosAnomaly:\n RULE = \"retail/pos_anomaly.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1056.001\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Collection\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"High\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"15 minutes\")\n\n def test_pos_process_names_defined(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"POSProcessNames\", \"xstore.exe\", \"aloha.exe\", \"posready.exe\")\n\n def test_retail_terminal_prefix_defined(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RetailTerminalPrefix\", '\"pos-\"', '\"till-\"', '\"kiosk-\"')\n\n def test_unknown_dll_injection_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"UnknownDLLInjection\", \"DeviceEvents\", \"ImageLoaded\")\n\n def test_abnormal_transaction_volume_signal(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"AbnormalTransactionVolume\",\n \"RetailShield_Logs_CL\",\n \"TransactionVolumeThreshold\",\n )\n\n def test_process_memory_dump_signal(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"ProcessMemoryDump\",\n \"ProcessDumped\",\n \"ProcDump\",\n )\n\n def test_suspicious_network_signal(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"SuspiciousNetworkFromPOS\",\n \"DeviceNetworkEvents\",\n \"RetailIOCWatchlist\",\n )\n\n def test_playbook_trigger_field_exposed(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"suspend_terminal\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_uses_ingestion_time_not_static_timestamp(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"ingestion_time()\")\n assert_not_contains(content, \"datetime(2\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content), (\n \"Rule contains a hardcoded GUID — use a watchlist or parameter instead\"\n )\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n required_fields = [\n \"AlertTitle\",\n \"DeviceName\",\n \"DeviceId\",\n \"AlertSeverity\",\n \"RiskScore\",\n \"SignalCount\",\n \"Signals\",\n \"MitreTechnique\",\n \"MitreTactic\",\n \"PlaybookTrigger\",\n \"FirstSeen\",\n \"LastSeen\",\n ]\n for field in required_fields:\n assert_contains(content, field)\n\n\n# ── ai_voice_fraud.kql ─────────────────────────────────────────────────────────────────────\n\nclass TestAiVoiceFraud:\n RULE = \"retail/ai_voice_fraud.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1598\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Reconnaissance\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"High\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"30 minutes\")\n\n def test_ai_confidence_threshold_defined(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"AIConfidenceThreshold\")\n\n def test_fraud_keywords_defined(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"FraudKeywords\",\n '\"urgent\"',\n '\"override\"',\n '\"bypass\"',\n '\"transfer\"',\n )\n\n def test_business_hours_defined(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"BusinessHoursStart\", \"BusinessHoursEnd\")\n\n def test_high_confidence_voice_fraud_signal(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"HighConfidenceVoiceFraud\",\n \"RetailShield_Logs_CL\",\n \"AI_Voice_Fraud\",\n )\n\n def test_urgent_financial_request_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"UrgentFinancialRequest\")\n\n def test_spoofed_caller_id_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"SpoofedCallerID\")\n\n def test_after_hours_voice_fraud_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"AfterHoursVoiceFraud\", \"hourofday\")\n\n def test_playbook_trigger_field_exposed(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"notify_soc\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_uses_ingestion_time_not_static_timestamp(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"ingestion_time()\")\n assert_not_contains(content, \"datetime(2\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content), (\n \"Rule contains a hardcoded GUID — use a watchlist or parameter instead\"\n )\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n required_fields = [\n \"AlertTitle\",\n \"DeviceName\",\n \"TargetEmployee\",\n \"ImpersonatingEntity\",\n \"RequestMade\",\n \"AlertSeverity\",\n \"RiskScore\",\n \"SignalCount\",\n \"Signals\",\n \"MitreTechnique\",\n \"MitreTactic\",\n \"PlaybookTrigger\",\n \"FirstSeen\",\n \"LastSeen\",\n ]\n for field in required_fields:\n assert_contains(content, field)\n\n\n# ── supply_chain_anomaly.kql ──────────────────────────────────────────────────────────────────\n\nclass TestSupplyChainAnomaly:\n RULE = \"retail/supply_chain_anomaly.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1195\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Initial Access\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"High\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"30 minutes\")\n\n def test_admin_endpoints_defined(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"AdminEndpoints\",\n '\"/admin\"',\n '\"/management\"',\n '\"/config\"',\n )\n\n def test_agreeable_endpoints_defined(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"AgreeableEndpoints\",\n '\"/inventory\"',\n '\"/orders\"',\n )\n\n def test_bulk_export_threshold_defined(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"BulkExportThreshold\")\n\n def test_supplier_admin_access_signal(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"SupplierAdminAccess\",\n \"AzureDiagnostics\",\n \"supplier_api_key\",\n )\n\n def test_new_service_principal_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"NewServicePrincipal\", \"AuditLogs\")\n\n def test_unauthorised_endpoint_access_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"UnauthorisedEndpointAccess\")\n\n def test_mass_data_export_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MassDataExport\", \"BulkExportThreshold\")\n\n def test_playbook_trigger_field_exposed(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"notify_soc\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_uses_ingestion_time_not_static_timestamp(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"ingestion_time()\")\n assert_not_contains(content, \"datetime(2\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content), (\n \"Rule contains a hardcoded GUID — use a watchlist or parameter instead\"\n )\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n required_fields = [\n \"AlertTitle\",\n \"DeviceName\",\n \"SupplierKey\",\n \"AlertSeverity\",\n \"RiskScore\",\n \"SignalCount\",\n \"Signals\",\n \"MitreTechnique\",\n \"MitreTactic\",\n \"PlaybookTrigger\",\n \"FirstSeen\",\n \"LastSeen\",\n ]\n for field in required_fields:\n assert_contains(content, field)\n\n\n# ── pos_void_refund.kql ──────────────────────────────────────────────────────────────────\n\nclass TestPosVoidRefund:\n RULE = \"retail/pos_void_refund.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1056.001\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Collection\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"High\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"5 minutes\")\n\n def test_source_table(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RetailShield_POS_CL\")\n\n def test_business_hours_params(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"BusinessHourStart\", \"BusinessHourEnd\")\n\n def test_threshold_params(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"VoidRefundThresh\", \"HighValueThresh\")\n\n def test_after_hours_void_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"AfterHoursVoidRefund\")\n\n def test_high_volume_void_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"HighVolumeVoidRefund\")\n\n def test_high_value_void_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"HighValueVoidNoOverride\")\n\n def test_tender_mismatch_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"TenderMismatchRefund\", \"CASH\", \"GIFT_CARD\")\n\n def test_playbook_trigger(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"notify_soc\")\n\n def test_risk_score(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskScore\", \"80\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content)\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n for field in [\n \"OperatorId_s\", \"StoreId_s\", \"TerminalId_s\", \"ThreatSignal\",\n \"TotalValue\", \"AlertSeverity\", \"MitreTechnique\", \"MitreTactic\",\n \"PlaybookTrigger\", \"RiskScore\",\n ]:\n assert_contains(content, field)\n\n\n# ── gift_card_fraud.kql ──────────────────────────────────────────────────────────────────\n\nclass TestGiftCardFraud:\n RULE = \"retail/gift_card_fraud.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1657\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Impact\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"High\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"5 minutes\")\n\n def test_source_table(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RetailShield_POS_CL\")\n\n def test_activation_threshold(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"GiftCardActivThresh\")\n\n def test_structuring_params(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"StructuringThreshold\", \"StructuringBand\")\n\n def test_multi_terminal_threshold(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MultiTerminalThresh\")\n\n def test_high_velocity_activation_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"HighVelocityGiftCardActivation\")\n\n def test_structured_purchase_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"StructuredGiftCardPurchase\")\n\n def test_drain_and_reload_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"GiftCardDrainAndReload\")\n\n def test_multi_terminal_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"GiftCardMultiTerminalUse\")\n\n def test_playbook_trigger(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"notify_soc\")\n\n def test_risk_score(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskScore\", \"85\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content)\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n for field in [\n \"GiftCardNumber_s\", \"OperatorId_s\", \"StoreId_s\", \"TerminalId_s\",\n \"ThreatSignal\", \"ActivationCount\", \"UniqueCards\", \"TotalValue\",\n \"TerminalCount\", \"TerminalList\", \"AlertSeverity\", \"MitreTechnique\",\n \"MitreTactic\", \"PlaybookTrigger\", \"RiskScore\",\n ]:\n assert_contains(content, field)\n\n\n# ── mfa_fatigue.kql ──────────────────────────────────────────────────────────────────\n\nclass TestMfaFatigue:\n RULE = \"retail/mfa_fatigue.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1621\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Credential Access\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"High\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"5 minutes\")\n\n def test_source_table(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"SigninLogs\")\n\n def test_mfa_prompt_threshold(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MFAPromptThresh\")\n\n def test_mfa_result_codes(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MFAResultCodes\", \"50074\", \"50076\", \"50079\")\n\n def test_prompt_flood_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MFAPromptFlood\")\n\n def test_fatigue_acceptance_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"FatigueAcceptance\")\n\n def test_distributed_mfa_flood_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"DistributedMFAFlood\")\n\n def test_risky_signin_post_flood_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskySigninPostMFAFlood\")\n\n def test_playbook_trigger(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"block_ip\")\n\n def test_risk_score(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskScore\", \"85\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content)\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n for field in [\n \"UserPrincipalName\", \"ThreatSignal\", \"PromptCount\", \"SourceIPs\",\n \"SourceIPList\", \"Locations\", \"AppNames\", \"SuccessIP\", \"SuccessCountry\",\n \"RiskLevel\", \"RiskyIP\", \"AlertSeverity\", \"MitreTechnique\",\n \"MitreTactic\", \"PlaybookTrigger\", \"RiskScore\",\n ]:\n assert_contains(content, field)\n\n\n# ── credential_stuffing.kql ──────────────────────────────────────────────────────────────────\n\nclass TestCredentialStuffing:\n RULE = \"retail/credential_stuffing.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1110.004\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Credential Access\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"High\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"5 minutes\")\n\n def test_source_table(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"SigninLogs\")\n\n def test_abuse_ipdb_watchlist(self):\n content = load_rule(self.RULE)\n assert_contains(content, '_GetWatchlist(\"AbuseIPDBWatchlist\")')\n\n def test_threshold_params(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"FailThreshPerAcct\", \"MinSourceIPs\")\n\n def test_distributed_login_failure_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"DistributedLoginFailure\")\n\n def test_account_takeover_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"AccountTakeoverAfterStuffing\")\n\n def test_blacklisted_ip_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"StuffingFromBlacklistedIP\")\n\n def test_risky_signin_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskySigninAfterStuffing\")\n\n def test_playbook_trigger(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"block_ip\")\n\n def test_risk_score(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskScore\", \"80\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content)\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n for field in [\n \"UserPrincipalName\", \"ThreatSignal\", \"FailCount\", \"SourceIPs\",\n \"SourceIPList\", \"IPAddress\", \"TargetAccts\", \"TargetList\",\n \"SuccessIP\", \"SuccessCountry\", \"RiskLevel\", \"AppNames\",\n \"AlertSeverity\", \"MitreTechnique\", \"MitreTactic\",\n \"PlaybookTrigger\", \"RiskScore\",\n ]:\n assert_contains(content, field)\n\n\n# ── after_hours_access.kql ──────────────────────────────────────────────────────────────────\n\nclass TestAfterHoursAccess:\n RULE = \"retail/after_hours_access.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1078\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Persistence\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"Medium\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"5 minutes\")\n\n def test_source_tables(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"SigninLogs\", \"AuditLogs\", \"DeviceLogonEvents\")\n\n def test_service_accounts_watchlist(self):\n content = load_rule(self.RULE)\n assert_contains(content, '_GetWatchlist(\"RetailServiceAccounts\")', \"leftanti\")\n\n def test_business_hours_params(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"BusinessHourStart\", \"BusinessHourEnd\")\n\n def test_after_hours_interactive_login_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"AfterHoursInteractiveLogin\")\n\n def test_after_hours_privileged_operation_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"AfterHoursPrivilegedOperation\")\n\n def test_after_hours_device_logon_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"AfterHoursSensitiveDeviceLogon\")\n\n def test_playbook_trigger(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"notify_soc\")\n\n def test_risk_score(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskScore\", \"60\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content)\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n for field in [\n \"Timestamp\", \"AccountName\", \"ThreatSignal\", \"LoginCount\",\n \"SourceIPs\", \"Locations\", \"AppNames\", \"Devices\",\n \"RemoteIP\", \"Operations\", \"TargetObjects\", \"AlertSeverity\",\n \"MitreTechnique\", \"MitreTactic\", \"PlaybookTrigger\", \"RiskScore\",\n ]:\n assert_contains(content, field)\n\n\n# ── data_exfiltration.kql ──────────────────────────────────────────────────────────────────\n\nclass TestDataExfiltration:\n RULE = \"retail/data_exfiltration.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1048\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Exfiltration\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"Critical\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"5 minutes\")\n\n def test_source_tables(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"DeviceNetworkEvents\", \"DeviceFileEvents\")\n\n def test_ioc_watchlist(self):\n content = load_rule(self.RULE)\n assert_contains(content, '_GetWatchlist(\"RetailIOCWatchlist\")')\n\n def test_threshold_params(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"DNSQueryThresh\", \"SubdomainLenThresh\",\n \"OutboundMBThresh\", \"StagingFileThresh\")\n\n def test_private_ip_exclusions(self):\n content = load_rule(self.RULE)\n assert_contains(content, '\"10.\"', '\"192.168.\"', '\"172.\"', '\"127.0.0.1\"')\n\n def test_dns_tunneling_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"DNSTunneling\")\n\n def test_large_outbound_transfer_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"LargeOutboundTransfer\")\n\n def test_ioc_matched_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"IOCMatchedExfilTarget\")\n\n def test_data_staging_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"DataStagingToExfil\")\n\n def test_playbook_trigger(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"data_exfil_contain\")\n\n def test_risk_score(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskScore\", \"90\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content)\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n for field in [\n \"Timestamp\", \"DeviceName\", \"DeviceId\", \"AccountName\", \"ThreatSignal\",\n \"NormalisedTarget\", \"QueryCount\", \"UniqueSubdomains\", \"TotalSentMB\",\n \"TotalSentBytes\", \"FileReadCount\", \"SampleFiles\", \"UniqueFolders\",\n \"ExfilTargets\", \"AlertSeverity\", \"MitreTechnique\", \"MitreTactic\",\n \"PlaybookTrigger\", \"RiskScore\",\n ]:\n assert_contains(content, field)\n\n\n# ── supplier_impossible_travel.kql ──────────────────────────────────────────────────────────\n\nclass TestSupplierImpossibleTravel:\n RULE = \"retail/supplier_impossible_travel.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1199\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Initial Access\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"Medium\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"15 minutes\")\n\n def test_source_table(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"SigninLogs\")\n\n def test_supplier_accounts_watchlist(self):\n content = load_rule(self.RULE)\n assert_contains(content, '_GetWatchlist(\"RetailSupplierAccounts\")')\n\n def test_impossible_speed_threshold(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"ImpossibleSpeedKmph\", \"900\")\n\n def test_high_risk_countries(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"HighRiskCountries\", '\"RU\"', '\"CN\"', '\"KP\"', '\"IR\"')\n\n def test_geo_distance_function(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"geo_distance_2points\")\n\n def test_impossible_travel_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"ImpossibleTravel\", \"RequiredSpeedKmph\")\n\n def test_new_country_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"NewCountryForSupplier\")\n\n def test_high_risk_country_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"HighRiskCountrySignin\")\n\n def test_playbook_trigger(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"notify_soc\")\n\n def test_risk_score(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskScore\", \"70\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content)\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n for field in [\n \"Timestamp\", \"UserPrincipalName\", \"SupplierName\", \"ThreatSignal\",\n \"IPAddress\", \"CountryCode\", \"City\", \"SecondCountry\", \"SecondCity\",\n \"RequiredSpeedKmph\", \"DistanceKm\", \"AlertSeverity\", \"MitreTechnique\",\n \"MitreTactic\", \"PlaybookTrigger\", \"RiskScore\",\n ]:\n assert_contains(content, field)\n\n\n# ── privileged_role_addition.kql ──────────────────────────────────────────────────────────\n\nclass TestPrivilegedRoleAddition:\n RULE = \"retail/privileged_role_addition.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1098\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Persistence\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"High\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"5 minutes\")\n\n def test_source_table(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"AuditLogs\")\n\n def test_sensitive_roles_defined(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"SensitiveRoles\",\n \"Global Administrator\",\n \"Privileged Role Administrator\",\n \"Security Administrator\",\n )\n\n def test_sensitive_role_assigned_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"SensitiveRoleAssigned\")\n\n def test_after_hours_role_addition_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"AfterHoursRoleAddition\", \"HourOfDay\")\n\n def test_mfa_change_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RoleAdditionFollowedByMFAChange\")\n\n def test_sensitive_group_member_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"SensitiveGroupMemberAdded\")\n\n def test_playbook_trigger(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"notify_soc\")\n\n def test_risk_score(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskScore\", \"85\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content)\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n for field in [\n \"Timestamp\", \"InitiatedByUser\", \"TargetUser\", \"RoleDisplayName\",\n \"ThreatSignal\", \"HourOfDay\", \"MFAOperation\", \"AlertSeverity\",\n \"MitreTechnique\", \"MitreTactic\", \"PlaybookTrigger\", \"RiskScore\",\n ]:\n assert_contains(content, field)\n \ No newline at end of file +""" +RetailShield — KQL Rule Validation Test Suite +Validates syntax structure, required metadata, and logic correctness +for all detection-rules/*.kql files without a live Sentinel connection. +""" + +import re +from pathlib import Path + +RULES_DIR = Path(__file__).parent.parent.parent / "detection-rules" + + +def load_rule(filename: str) -> str: + path = RULES_DIR / filename + assert path.exists(), f"Rule file not found: {path}" + return path.read_text(encoding="utf-8") + + +# ── Helpers ───────────────────────────────────────────────────────────────────────────── + +def assert_metadata(content: str, key: str, value: str): + pattern = rf"//\s*{re.escape(key)}\s*:.*{re.escape(value)}" + assert re.search(pattern, content, re.IGNORECASE), ( + f"Missing or incorrect metadata '{key}: {value}'" + ) + + +def assert_contains(content: str, *fragments: str): + for fragment in fragments: + assert fragment in content, f"Expected '{fragment}' not found in rule" + + +def assert_not_contains(content: str, *fragments: str): + for fragment in fragments: + assert fragment not in content, f"Unexpected fragment '{fragment}' found in rule" + + +# ── phishing_detection.kql ───────────────────────────────────────────────────────────────────── + +class TestPhishingDetection: + RULE = "retail/phishing_detection.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1566.001") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Initial Access") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "High") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "5 minutes") + + def test_queries_email_attachment_table(self): + content = load_rule(self.RULE) + assert_contains(content, "EmailAttachmentInfo") + + def test_queries_email_events_table(self): + content = load_rule(self.RULE) + assert_contains(content, "EmailEvents") + + def test_filters_inbound_delivered_emails(self): + content = load_rule(self.RULE) + assert_contains(content, '"Delivered"', '"Inbound"') + + def test_suspicious_extensions_defined(self): + content = load_rule(self.RULE) + assert_contains( + content, + "SuspiciousExtensions", + '".exe"', + '".ps1"', + '".vbs"', + '".docm"', + ) + + def test_suppresses_approved_senders(self): + content = load_rule(self.RULE) + assert_contains(content, "ApprovedSenderDomains", "leftanti") + + def test_lure_classification_present(self): + content = load_rule(self.RULE) + assert_contains( + content, + "LureCategory", + "Financial", + "Credential", + ) + + def test_playbook_trigger_field_exposed(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "quarantine_email") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content), ( + "Rule contains a hardcoded GUID — use a watchlist or parameter instead" + ) + + def test_uses_ingestion_time_not_static_timestamp(self): + content = load_rule(self.RULE) + assert_contains(content, "ingestion_time()") + assert_not_contains(content, "datetime(2") + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + required_fields = [ + "SenderAddress", + "RecipientEmailAddress", + "Subject", + "AttachmentFileName", + "AttachmentSHA256", + "NetworkMessageId", + "AlertSeverity", + ] + for field in required_fields: + assert_contains(content, field) + + +# ── ransomware_indicator.kql ────────────────────────────────────────────────────────────────── + +class TestRansomwareIndicator: + RULE = "retail/ransomware_indicator.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1486") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Impact") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "Critical") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "5 minutes") + + def test_mass_file_rename_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "DeviceFileEvents", "FileRenamed", "MassRenameThresh") + + def test_shadow_copy_deletion_signal(self): + content = load_rule(self.RULE) + assert_contains( + content, + "vssadmin delete shadows", + "wmic shadowcopy delete", + "bcdedit /set recoveryenabled no", + "ShadowCopyDeletion", + ) + + def test_known_ransomware_process_names(self): + content = load_rule(self.RULE) + assert_contains( + content, + "lockbit", + "blackcat", + "conti", + "ryuk", + "KnownRansomwareProcess", + ) + + def test_c2_beacon_signal(self): + content = load_rule(self.RULE) + assert_contains( + content, + "DeviceNetworkEvents", + "RetailIOCWatchlist", + "BeaconThresh", + "C2BeaconToRansomwareIP", + ) + + def test_ioc_watchlist_used(self): + content = load_rule(self.RULE) + assert_contains(content, '_GetWatchlist("RetailIOCWatchlist")') + + def test_private_ip_exclusions(self): + content = load_rule(self.RULE) + assert_contains(content, '"10."', '"192.168."', '"127.0.0.1"') + + def test_union_combines_all_signals(self): + content = load_rule(self.RULE) + assert_contains( + content, + "MassFileRename", + "ShadowCopyDeletion", + "RansomwareProcesses", + "C2Beaconing", + ) + + def test_playbook_trigger_field_exposed(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "isolate_endpoint") + + def test_risk_score_is_100(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskScore", "100") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content), ( + "Rule contains a hardcoded GUID — use a watchlist or parameter instead" + ) + + def test_uses_ingestion_time_not_static_timestamp(self): + content = load_rule(self.RULE) + assert_contains(content, "ingestion_time()") + assert_not_contains(content, "datetime(2") + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + required_fields = [ + "DeviceName", + "DeviceId", + "ThreatSignal", + "ProcessCommandLine", + "RenameCount", + "SampleFiles", + "AffectedFolders", + "RemoteIP", + "BeaconCount", + "AlertSeverity", + "MitreTechnique", + "MitreTactic", + "PlaybookTrigger", + "RiskScore", + ] + for field in required_fields: + assert_contains(content, field) + + +# ── pos_anomaly.kql ───────────────────────────────────────────────────────────────────────────── + +class TestPosAnomaly: + RULE = "retail/pos_anomaly.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1056.001") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Collection") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "High") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "15 minutes") + + def test_pos_process_names_defined(self): + content = load_rule(self.RULE) + assert_contains(content, "POSProcessNames", "xstore.exe", "aloha.exe", "posready.exe") + + def test_retail_terminal_prefix_defined(self): + content = load_rule(self.RULE) + assert_contains(content, "RetailTerminalPrefix", '"pos-"', '"till-"', '"kiosk-"') + + def test_unknown_dll_injection_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "UnknownDLLInjection", "DeviceEvents", "ImageLoaded") + + def test_abnormal_transaction_volume_signal(self): + content = load_rule(self.RULE) + assert_contains( + content, + "AbnormalTransactionVolume", + "RetailShield_Logs_CL", + "TransactionVolumeThreshold", + ) + + def test_process_memory_dump_signal(self): + content = load_rule(self.RULE) + assert_contains( + content, + "ProcessMemoryDump", + "ProcessDumped", + "ProcDump", + ) + + def test_suspicious_network_signal(self): + content = load_rule(self.RULE) + assert_contains( + content, + "SuspiciousNetworkFromPOS", + "DeviceNetworkEvents", + "RetailIOCWatchlist", + ) + + def test_playbook_trigger_field_exposed(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "suspend_terminal") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_uses_ingestion_time_not_static_timestamp(self): + content = load_rule(self.RULE) + assert_contains(content, "ingestion_time()") + assert_not_contains(content, "datetime(2") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content), ( + "Rule contains a hardcoded GUID — use a watchlist or parameter instead" + ) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + required_fields = [ + "AlertTitle", + "DeviceName", + "DeviceId", + "AlertSeverity", + "RiskScore", + "SignalCount", + "Signals", + "MitreTechnique", + "MitreTactic", + "PlaybookTrigger", + "FirstSeen", + "LastSeen", + ] + for field in required_fields: + assert_contains(content, field) + + +# ── ai_voice_fraud.kql ───────────────────────────────────────────────────────────────────── + +class TestAiVoiceFraud: + RULE = "retail/ai_voice_fraud.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1598") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Reconnaissance") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "High") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "30 minutes") + + def test_ai_confidence_threshold_defined(self): + content = load_rule(self.RULE) + assert_contains(content, "AIConfidenceThreshold") + + def test_fraud_keywords_defined(self): + content = load_rule(self.RULE) + assert_contains( + content, + "FraudKeywords", + '"urgent"', + '"override"', + '"bypass"', + '"transfer"', + ) + + def test_business_hours_defined(self): + content = load_rule(self.RULE) + assert_contains(content, "BusinessHoursStart", "BusinessHoursEnd") + + def test_high_confidence_voice_fraud_signal(self): + content = load_rule(self.RULE) + assert_contains( + content, + "HighConfidenceVoiceFraud", + "RetailShield_Logs_CL", + "AI_Voice_Fraud", + ) + + def test_urgent_financial_request_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "UrgentFinancialRequest") + + def test_spoofed_caller_id_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "SpoofedCallerID") + + def test_after_hours_voice_fraud_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "AfterHoursVoiceFraud", "hourofday") + + def test_playbook_trigger_field_exposed(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "notify_soc") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_uses_ingestion_time_not_static_timestamp(self): + content = load_rule(self.RULE) + assert_contains(content, "ingestion_time()") + assert_not_contains(content, "datetime(2") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content), ( + "Rule contains a hardcoded GUID — use a watchlist or parameter instead" + ) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + required_fields = [ + "AlertTitle", + "DeviceName", + "TargetEmployee", + "ImpersonatingEntity", + "RequestMade", + "AlertSeverity", + "RiskScore", + "SignalCount", + "Signals", + "MitreTechnique", + "MitreTactic", + "PlaybookTrigger", + "FirstSeen", + "LastSeen", + ] + for field in required_fields: + assert_contains(content, field) + + +# ── supply_chain_anomaly.kql ────────────────────────────────────────────────────────────────── + +class TestSupplyChainAnomaly: + RULE = "retail/supply_chain_anomaly.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1195") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Initial Access") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "High") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "30 minutes") + + def test_admin_endpoints_defined(self): + content = load_rule(self.RULE) + assert_contains( + content, + "AdminEndpoints", + '"/admin"', + '"/management"', + '"/config"', + ) + + def test_agreeable_endpoints_defined(self): + content = load_rule(self.RULE) + assert_contains( + content, + "AgreeableEndpoints", + '"/inventory"', + '"/orders"', + ) + + def test_bulk_export_threshold_defined(self): + content = load_rule(self.RULE) + assert_contains(content, "BulkExportThreshold") + + def test_supplier_admin_access_signal(self): + content = load_rule(self.RULE) + assert_contains( + content, + "SupplierAdminAccess", + "AzureDiagnostics", + "supplier_api_key", + ) + + def test_new_service_principal_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "NewServicePrincipal", "AuditLogs") + + def test_unauthorised_endpoint_access_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "UnauthorisedEndpointAccess") + + def test_mass_data_export_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "MassDataExport", "BulkExportThreshold") + + def test_playbook_trigger_field_exposed(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "notify_soc") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_uses_ingestion_time_not_static_timestamp(self): + content = load_rule(self.RULE) + assert_contains(content, "ingestion_time()") + assert_not_contains(content, "datetime(2") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content), ( + "Rule contains a hardcoded GUID — use a watchlist or parameter instead" + ) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + required_fields = [ + "AlertTitle", + "DeviceName", + "SupplierKey", + "AlertSeverity", + "RiskScore", + "SignalCount", + "Signals", + "MitreTechnique", + "MitreTactic", + "PlaybookTrigger", + "FirstSeen", + "LastSeen", + ] + for field in required_fields: + assert_contains(content, field) + + +# ── pos_void_refund.kql ────────────────────────────────────────────────────────────────── + +class TestPosVoidRefund: + RULE = "retail/pos_void_refund.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1056.001") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Collection") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "High") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "5 minutes") + + def test_source_table(self): + content = load_rule(self.RULE) + assert_contains(content, "RetailShield_POS_CL") + + def test_business_hours_params(self): + content = load_rule(self.RULE) + assert_contains(content, "BusinessHourStart", "BusinessHourEnd") + + def test_threshold_params(self): + content = load_rule(self.RULE) + assert_contains(content, "VoidRefundThresh", "HighValueThresh") + + def test_after_hours_void_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "AfterHoursVoidRefund") + + def test_high_volume_void_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "HighVolumeVoidRefund") + + def test_high_value_void_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "HighValueVoidNoOverride") + + def test_tender_mismatch_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "TenderMismatchRefund", "CASH", "GIFT_CARD") + + def test_playbook_trigger(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "notify_soc") + + def test_risk_score(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskScore", "80") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + for field in [ + "OperatorId_s", "StoreId_s", "TerminalId_s", "ThreatSignal", + "TotalValue", "AlertSeverity", "MitreTechnique", "MitreTactic", + "PlaybookTrigger", "RiskScore", + ]: + assert_contains(content, field) + + +# ── gift_card_fraud.kql ────────────────────────────────────────────────────────────────── + +class TestGiftCardFraud: + RULE = "retail/gift_card_fraud.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1657") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Impact") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "High") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "5 minutes") + + def test_source_table(self): + content = load_rule(self.RULE) + assert_contains(content, "RetailShield_POS_CL") + + def test_activation_threshold(self): + content = load_rule(self.RULE) + assert_contains(content, "GiftCardActivThresh") + + def test_structuring_params(self): + content = load_rule(self.RULE) + assert_contains(content, "StructuringThreshold", "StructuringBand") + + def test_multi_terminal_threshold(self): + content = load_rule(self.RULE) + assert_contains(content, "MultiTerminalThresh") + + def test_high_velocity_activation_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "HighVelocityGiftCardActivation") + + def test_structured_purchase_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "StructuredGiftCardPurchase") + + def test_drain_and_reload_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "GiftCardDrainAndReload") + + def test_multi_terminal_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "GiftCardMultiTerminalUse") + + def test_playbook_trigger(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "notify_soc") + + def test_risk_score(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskScore", "85") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + for field in [ + "GiftCardNumber_s", "OperatorId_s", "StoreId_s", "TerminalId_s", + "ThreatSignal", "ActivationCount", "UniqueCards", "TotalValue", + "TerminalCount", "TerminalList", "AlertSeverity", "MitreTechnique", + "MitreTactic", "PlaybookTrigger", "RiskScore", + ]: + assert_contains(content, field) + + +# ── mfa_fatigue.kql ────────────────────────────────────────────────────────────────── + +class TestMfaFatigue: + RULE = "retail/mfa_fatigue.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1621") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Credential Access") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "High") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "5 minutes") + + def test_source_table(self): + content = load_rule(self.RULE) + assert_contains(content, "SigninLogs") + + def test_mfa_prompt_threshold(self): + content = load_rule(self.RULE) + assert_contains(content, "MFAPromptThresh") + + def test_mfa_result_codes(self): + content = load_rule(self.RULE) + assert_contains(content, "MFAResultCodes", "50074", "50076", "50079") + + def test_prompt_flood_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "MFAPromptFlood") + + def test_fatigue_acceptance_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "FatigueAcceptance") + + def test_distributed_mfa_flood_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "DistributedMFAFlood") + + def test_risky_signin_post_flood_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskySigninPostMFAFlood") + + def test_playbook_trigger(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "block_ip") + + def test_risk_score(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskScore", "85") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + for field in [ + "UserPrincipalName", "ThreatSignal", "PromptCount", "SourceIPs", + "SourceIPList", "Locations", "AppNames", "SuccessIP", "SuccessCountry", + "RiskLevel", "RiskyIP", "AlertSeverity", "MitreTechnique", + "MitreTactic", "PlaybookTrigger", "RiskScore", + ]: + assert_contains(content, field) + + +# ── credential_stuffing.kql ────────────────────────────────────────────────────────────────── + +class TestCredentialStuffing: + RULE = "retail/credential_stuffing.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1110.004") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Credential Access") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "High") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "5 minutes") + + def test_source_table(self): + content = load_rule(self.RULE) + assert_contains(content, "SigninLogs") + + def test_abuse_ipdb_watchlist(self): + content = load_rule(self.RULE) + assert_contains(content, '_GetWatchlist("AbuseIPDBWatchlist")') + + def test_threshold_params(self): + content = load_rule(self.RULE) + assert_contains(content, "FailThreshPerAcct", "MinSourceIPs") + + def test_distributed_login_failure_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "DistributedLoginFailure") + + def test_account_takeover_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "AccountTakeoverAfterStuffing") + + def test_blacklisted_ip_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "StuffingFromBlacklistedIP") + + def test_risky_signin_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskySigninAfterStuffing") + + def test_playbook_trigger(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "block_ip") + + def test_risk_score(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskScore", "80") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + for field in [ + "UserPrincipalName", "ThreatSignal", "FailCount", "SourceIPs", + "SourceIPList", "IPAddress", "TargetAccts", "TargetList", + "SuccessIP", "SuccessCountry", "RiskLevel", "AppNames", + "AlertSeverity", "MitreTechnique", "MitreTactic", + "PlaybookTrigger", "RiskScore", + ]: + assert_contains(content, field) + + +# ── after_hours_access.kql ────────────────────────────────────────────────────────────────── + +class TestAfterHoursAccess: + RULE = "retail/after_hours_access.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1078") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Persistence") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "Medium") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "5 minutes") + + def test_source_tables(self): + content = load_rule(self.RULE) + assert_contains(content, "SigninLogs", "AuditLogs", "DeviceLogonEvents") + + def test_service_accounts_watchlist(self): + content = load_rule(self.RULE) + assert_contains(content, '_GetWatchlist("RetailServiceAccounts")', "leftanti") + + def test_business_hours_params(self): + content = load_rule(self.RULE) + assert_contains(content, "BusinessHourStart", "BusinessHourEnd") + + def test_after_hours_interactive_login_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "AfterHoursInteractiveLogin") + + def test_after_hours_privileged_operation_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "AfterHoursPrivilegedOperation") + + def test_after_hours_device_logon_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "AfterHoursSensitiveDeviceLogon") + + def test_playbook_trigger(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "notify_soc") + + def test_risk_score(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskScore", "60") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + for field in [ + "Timestamp", "AccountName", "ThreatSignal", "LoginCount", + "SourceIPs", "Locations", "AppNames", "Devices", + "RemoteIP", "Operations", "TargetObjects", "AlertSeverity", + "MitreTechnique", "MitreTactic", "PlaybookTrigger", "RiskScore", + ]: + assert_contains(content, field) + + +# ── data_exfiltration.kql ────────────────────────────────────────────────────────────────── + +class TestDataExfiltration: + RULE = "retail/data_exfiltration.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1048") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Exfiltration") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "Critical") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "5 minutes") + + def test_source_tables(self): + content = load_rule(self.RULE) + assert_contains(content, "DeviceNetworkEvents", "DeviceFileEvents") + + def test_ioc_watchlist(self): + content = load_rule(self.RULE) + assert_contains(content, '_GetWatchlist("RetailIOCWatchlist")') + + def test_threshold_params(self): + content = load_rule(self.RULE) + assert_contains( + content, "DNSQueryThresh", "SubdomainLenThresh", + "OutboundMBThresh", "StagingFileThresh" + ) + + def test_private_ip_exclusions(self): + content = load_rule(self.RULE) + assert_contains(content, '"10."', '"192.168."', '"172."', '"127.0.0.1"') + + def test_dns_tunneling_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "DNSTunneling") + + def test_large_outbound_transfer_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "LargeOutboundTransfer") + + def test_ioc_matched_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "IOCMatchedExfilTarget") + + def test_data_staging_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "DataStagingToExfil") + + def test_playbook_trigger(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "data_exfil_contain") + + def test_risk_score(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskScore", "90") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + for field in [ + "Timestamp", "DeviceName", "DeviceId", "AccountName", "ThreatSignal", + "NormalisedTarget", "QueryCount", "UniqueSubdomains", "TotalSentMB", + "TotalSentBytes", "FileReadCount", "SampleFiles", "UniqueFolders", + "ExfilTargets", "AlertSeverity", "MitreTechnique", "MitreTactic", + "PlaybookTrigger", "RiskScore", + ]: + assert_contains(content, field) + + +# ── supplier_impossible_travel.kql ────────────────────────────────────────────────────────── + +class TestSupplierImpossibleTravel: + RULE = "retail/supplier_impossible_travel.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1199") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Initial Access") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "Medium") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "15 minutes") + + def test_source_table(self): + content = load_rule(self.RULE) + assert_contains(content, "SigninLogs") + + def test_supplier_accounts_watchlist(self): + content = load_rule(self.RULE) + assert_contains(content, '_GetWatchlist("RetailSupplierAccounts")') + + def test_impossible_speed_threshold(self): + content = load_rule(self.RULE) + assert_contains(content, "ImpossibleSpeedKmph", "900") + + def test_high_risk_countries(self): + content = load_rule(self.RULE) + assert_contains(content, "HighRiskCountries", '"RU"', '"CN"', '"KP"', '"IR"') + + def test_geo_distance_function(self): + content = load_rule(self.RULE) + assert_contains(content, "geo_distance_2points") + + def test_impossible_travel_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "ImpossibleTravel", "RequiredSpeedKmph") + + def test_new_country_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "NewCountryForSupplier") + + def test_high_risk_country_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "HighRiskCountrySignin") + + def test_playbook_trigger(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "notify_soc") + + def test_risk_score(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskScore", "70") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + for field in [ + "Timestamp", "UserPrincipalName", "SupplierName", "ThreatSignal", + "IPAddress", "CountryCode", "City", "SecondCountry", "SecondCity", + "RequiredSpeedKmph", "DistanceKm", "AlertSeverity", "MitreTechnique", + "MitreTactic", "PlaybookTrigger", "RiskScore", + ]: + assert_contains(content, field) + + +# ── privileged_role_addition.kql ────────────────────────────────────────────────────────── + +class TestPrivilegedRoleAddition: + RULE = "retail/privileged_role_addition.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1098") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Persistence") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "High") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "5 minutes") + + def test_source_table(self): + content = load_rule(self.RULE) + assert_contains(content, "AuditLogs") + + def test_sensitive_roles_defined(self): + content = load_rule(self.RULE) + assert_contains( + content, + "SensitiveRoles", + "Global Administrator", + "Privileged Role Administrator", + "Security Administrator", + ) + + def test_sensitive_role_assigned_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "SensitiveRoleAssigned") + + def test_after_hours_role_addition_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "AfterHoursRoleAddition", "HourOfDay") + + def test_mfa_change_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "RoleAdditionFollowedByMFAChange") + + def test_sensitive_group_member_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "SensitiveGroupMemberAdded") + + def test_playbook_trigger(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "notify_soc") + + def test_risk_score(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskScore", "85") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + for field in [ + "Timestamp", "InitiatedByUser", "TargetUser", "RoleDisplayName", + "ThreatSignal", "HourOfDay", "MFAOperation", "AlertSeverity", + "MitreTechnique", "MitreTactic", "PlaybookTrigger", "RiskScore", + ]: + assert_contains(content, field)