diff --git a/solution/README.md b/solution/README.md new file mode 100644 index 0000000..7eb8a78 --- /dev/null +++ b/solution/README.md @@ -0,0 +1,125 @@ +# RetailShield — Microsoft Sentinel Solution Package + +This directory contains the deployment package for RetailShield as a Microsoft Sentinel Solution. + +## What's included + +| Component | Count | Location | +|-----------|-------|----------| +| Analytics rules | 13 | `sentinel/analytics-rules/` | +| Response playbooks | 5 | `logic-apps/` | +| Custom table schemas | 2 | `sentinel/data-connectors/` | +| Solution orchestration template | 1 | `solution/mainTemplate.json` | +| Azure portal UI definition | 1 | `solution/createUiDefinition.json` | + +## Quick deployment + +### Option A — Deploy everything via Azure CLI + +```bash +az deployment group create \ + --resource-group \ + --template-file solution/mainTemplate.json \ + --parameters \ + workspaceName= \ + complianceContactEmail=security@yourorganisation.com \ + organisationName="Your Organisation Ltd" +``` + +### Option B — Deploy individual components + +```bash +# 1. Create custom tables first +az deployment group create \ + --resource-group \ + --template-file sentinel/data-connectors/retailshield-connector.json \ + --parameters workspaceName= + +# 2. Deploy analytics rules +for f in sentinel/analytics-rules/*.json; do + az deployment group create --resource-group --template-file "$f" \ + --parameters workspaceName= +done + +# 3. Deploy playbooks +for f in logic-apps/*/workflow.json; do + az deployment group create --resource-group --template-file "$f" \ + --parameters workspaceName= +done +``` + +### Option C — Azure portal (Custom deployment) + +1. Navigate to **Deploy a custom template** in the Azure portal +2. Use **Build your own template in the editor** and paste `solution/mainTemplate.json` +3. Use the `createUiDefinition.json` for a guided wizard experience + +## Deployment parameters + +| Parameter | Required | Default | Description | +|-----------|----------|---------|-------------| +| `workspaceName` | Yes | — | Log Analytics workspace name | +| `location` | No | Resource group location | Azure region | +| `complianceContactEmail` | No | *(empty)* | Internal compliance contact email for incident-reporting playbook | +| `organisationName` | No | `Retail Organisation` | Organisation name in compliance emails | +| `deployAnalyticsRules` | No | `true` | Deploy all 13 analytics rules | +| `deployPlaybooks` | No | `true` | Deploy all 5 Logic App playbooks | +| `deployDataConnector` | No | `true` | Create custom Log Analytics tables | + +## Post-deployment steps + +After the ARM deployment completes: + +1. **Authorise API connections** — for each Logic App playbook in the Azure portal, open the Logic App → API Connections → Authorise each connection (azuresentinel, office365, mdfc-connection as applicable) + +2. **Grant Sentinel Responder role** — for each Logic App's system-assigned managed identity: + ```bash + az role assignment create \ + --assignee \ + --role "Microsoft Sentinel Responder" \ + --scope /subscriptions//resourceGroups//providers/Microsoft.OperationalInsights/workspaces/ + ``` + +3. **Create watchlists** — RetailShield rules depend on four watchlists. See `docs/deployment_guide.md` for schema and population instructions: + - `RetailIOCWatchlist` — threat intelligence IOCs + - `AbuseIPDBWatchlist` — known malicious IPs + - `RetailServiceAccounts` — excluded automation accounts + - `RetailSupplierAccounts` — supplier account UPNs + +4. **Create automation rules** — link each analytics rule to its recommended playbook via Sentinel → Automation → Create automation rule + +5. **Validate** — trigger a test incident in each severity tier and confirm the correct playbook fires + +## Architecture + +``` +RetailShield Solution +├── Data Ingestion +│ ├── RetailShield_POS_CL (POS terminal events via Data Collector API) +│ └── RetailShield_Logs_CL (Voice fraud and device events) +│ +├── Detection Layer (13 Scheduled Analytics Rules) +│ ├── Critical: Data Exfiltration, Ransomware Indicator +│ ├── High: Phishing, POS Void/Refund, MFA Fatigue, AI Voice Fraud, +│ │ Credential Stuffing, Gift Card Fraud, POS Anomaly, +│ │ Privileged Role Addition, Supply Chain Anomaly +│ └── Medium: After-Hours Access, Supplier Impossible Travel +│ +└── Response Layer (5 Logic App Playbooks) + ├── block-ip — Block attacker IP via Defender for Endpoint + ├── isolate-endpoint — Device isolation via Defender for Endpoint + ├── quarantine-email — Email quarantine via Defender for Office 365 + ├── suspend-terminal — Disable POS terminal + └── incident-reporting — UK compliance assistant (24h/72h deadlines) +``` + +## Data connector dependencies + +These Sentinel data connectors must be configured separately: + +| Connector | Tables | Used by rules | +|-----------|--------|--------------| +| Azure Active Directory | SigninLogs, AuditLogs | mfa_fatigue, credential_stuffing, after_hours_access, supplier_impossible_travel, privileged_role_addition | +| Microsoft Defender for Endpoint | DeviceNetworkEvents, DeviceFileEvents, DeviceProcessEvents, DeviceEvents, DeviceLogonEvents | data_exfiltration, pos_anomaly, ransomware_indicator, after_hours_access | +| Microsoft Defender for Office 365 | EmailAttachmentInfo, EmailEvents | phishing_detection | +| Azure Diagnostics | AzureDiagnostics | supply_chain_anomaly | diff --git a/solution/createUiDefinition.json b/solution/createUiDefinition.json new file mode 100644 index 0000000..380e101 --- /dev/null +++ b/solution/createUiDefinition.json @@ -0,0 +1,144 @@ +{ + "$schema": "https://schema.management.azure.com/schemas/0.1.2-preview/CreateUIDefinition.MultiVm.json#", + "handler": "Microsoft.Azure.CreateUIDef", + "version": "0.1.2-preview", + "parameters": { + "config": { + "isWizard": false, + "basics": { + "description": "**RetailShield** is a Microsoft Sentinel Solution providing retail-specific threat detection rules, automated response playbooks, and UK compliance assistance. It covers Point-of-Sale fraud, phishing, ransomware, credential attacks, supply chain threats, and more.\n\n**Version:** 1.0.0 \n**Publisher:** ShieldTech Ltd \n**Category:** Security – Retail\n\n> After deployment, authorise the API connections for each Logic App playbook and assign the **Sentinel Responder** role to each playbook's managed identity." + } + }, + "basics": [ + { + "name": "workspaceName", + "type": "Microsoft.Common.TextBox", + "label": "Sentinel workspace name", + "placeholder": "my-sentinel-workspace", + "toolTip": "Name of the existing Log Analytics workspace where Microsoft Sentinel is enabled.", + "constraints": { + "required": true, + "regex": "^[a-zA-Z0-9][a-zA-Z0-9-]{1,61}[a-zA-Z0-9]$", + "validationMessage": "Workspace name must be 3–63 characters, letters, numbers, and hyphens only." + } + }, + { + "name": "location", + "type": "Microsoft.Common.TextBox", + "label": "Azure region", + "defaultValue": "uksouth", + "toolTip": "Azure region where the solution resources will be deployed. Must match the region of your Sentinel workspace.", + "constraints": { + "required": true + } + } + ], + "steps": [ + { + "name": "playbooks", + "label": "Playbooks", + "elements": [ + { + "name": "deployPlaybooks", + "type": "Microsoft.Common.CheckBox", + "label": "Deploy response playbooks (Logic Apps)", + "toolTip": "Deploy the five Logic App playbooks: block-ip, isolate-endpoint, quarantine-email, suspend-terminal, and incident-reporting.", + "defaultValue": true + }, + { + "name": "complianceSection", + "type": "Microsoft.Common.Section", + "label": "UK Compliance Assistant", + "elements": [ + { + "name": "complianceInfo", + "type": "Microsoft.Common.InfoBox", + "visible": "[steps('playbooks').deployPlaybooks]", + "options": { + "icon": "Warning", + "text": "The incident-reporting playbook sends draft compliance notifications to your INTERNAL security/compliance contact when a Critical incident is detected. It does NOT auto-submit reports to regulators. A human must review and submit via the ICO and NCSC portals." + } + }, + { + "name": "complianceContactEmail", + "type": "Microsoft.Common.TextBox", + "label": "Compliance contact email", + "placeholder": "security@yourorganisation.com", + "toolTip": "Internal email address for compliance notifications. NOT an ICO or government address.", + "visible": "[steps('playbooks').deployPlaybooks]", + "constraints": { + "required": false, + "regex": "^[^@]+@[^@]+\\.[^@]+$", + "validationMessage": "Enter a valid email address." + } + }, + { + "name": "organisationName", + "type": "Microsoft.Common.TextBox", + "label": "Organisation name", + "defaultValue": "Retail Organisation", + "toolTip": "Your organisation name, used in compliance notification emails.", + "visible": "[steps('playbooks').deployPlaybooks]", + "constraints": { + "required": false + } + } + ] + } + ] + }, + { + "name": "detection", + "label": "Detection Rules", + "elements": [ + { + "name": "deployAnalyticsRules", + "type": "Microsoft.Common.CheckBox", + "label": "Deploy analytics rules (13 retail detection rules)", + "toolTip": "Deploy all 13 scheduled analytics rules. Requires the custom tables to be created first (handled automatically if 'Deploy data connector' is enabled).", + "defaultValue": true + }, + { + "name": "rulesInfo", + "type": "Microsoft.Common.InfoBox", + "visible": "[steps('detection').deployAnalyticsRules]", + "options": { + "icon": "Info", + "text": "Rules deployed: Phishing Detection, POS Void/Refund Fraud, MFA Fatigue, Data Exfiltration, AI Voice Fraud, Credential Stuffing, After-Hours Access, Gift Card Fraud, POS Anomaly, Ransomware Indicator, Supply Chain Anomaly, Supplier Impossible Travel, Privileged Role Addition." + } + }, + { + "name": "deployDataConnector", + "type": "Microsoft.Common.CheckBox", + "label": "Create custom Log Analytics tables (RetailShield_POS_CL and RetailShield_Logs_CL)", + "toolTip": "Creates the custom table schemas required by POS and voice fraud detection rules. Disable if tables already exist.", + "defaultValue": true + } + ] + }, + { + "name": "prerequisites", + "label": "Pre-deployment checklist", + "elements": [ + { + "name": "prereqInfo", + "type": "Microsoft.Common.InfoBox", + "options": { + "icon": "Warning", + "text": "Before deploying, ensure the following are in place:\n\n1. Microsoft Sentinel is enabled on the target workspace\n2. The following data connectors are configured: Azure Active Directory, Microsoft Defender for Endpoint, Microsoft Defender for Office 365\n3. The following watchlists exist (or will be created manually): RetailIOCWatchlist, AbuseIPDBWatchlist, RetailServiceAccounts, RetailSupplierAccounts, RetailApprovedSenders\n4. After deployment, grant 'Sentinel Responder' role to each Logic App's managed identity\n5. After deployment, authorise the azuresentinel and office365 API connections in each Logic App" + } + } + ] + } + ], + "outputs": { + "workspaceName": "[basics('workspaceName')]", + "location": "[basics('location')]", + "deployPlaybooks": "[steps('playbooks').deployPlaybooks]", + "complianceContactEmail": "[steps('playbooks').complianceSection.complianceContactEmail]", + "organisationName": "[steps('playbooks').complianceSection.organisationName]", + "deployAnalyticsRules": "[steps('detection').deployAnalyticsRules]", + "deployDataConnector": "[steps('detection').deployDataConnector]" + } + } +} diff --git a/solution/mainTemplate.json b/solution/mainTemplate.json new file mode 100644 index 0000000..03147b5 --- /dev/null +++ b/solution/mainTemplate.json @@ -0,0 +1,445 @@ +{ + "$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#", + "contentVersion": "1.0.0.0", + "metadata": { + "author": "ShieldTech Ltd", + "comments": "RetailShield Microsoft Sentinel Solution — main deployment template" + }, + "parameters": { + "workspaceName": { + "type": "string", + "minLength": 1, + "metadata": { + "description": "Name of the Log Analytics workspace where Microsoft Sentinel is deployed." + } + }, + "workspaceId": { + "type": "string", + "minLength": 1, + "metadata": { + "description": "Resource ID of the Log Analytics workspace." + } + }, + "location": { + "type": "string", + "defaultValue": "[resourceGroup().location]", + "metadata": { + "description": "Azure region for all resources. Defaults to the resource group location." + } + }, + "complianceContactEmail": { + "type": "string", + "defaultValue": "", + "metadata": { + "description": "Email address of the retailer's designated compliance/security contact. Required for the incident-reporting playbook." + } + }, + "organisationName": { + "type": "string", + "defaultValue": "Retail Organisation", + "metadata": { + "description": "Organisation name used in compliance notification emails." + } + }, + "deployAnalyticsRules": { + "type": "bool", + "defaultValue": true, + "metadata": { + "description": "Set to false to skip deploying analytics rules (e.g., if deploying rules separately)." + } + }, + "deployPlaybooks": { + "type": "bool", + "defaultValue": true, + "metadata": { + "description": "Set to false to skip deploying Logic App playbooks." + } + }, + "deployDataConnector": { + "type": "bool", + "defaultValue": true, + "metadata": { + "description": "Set to false to skip creating custom Log Analytics tables (if they already exist)." + } + } + }, + "variables": { + "solutionId": "retailshield", + "solutionVersion": "1.0.0", + "contentSchemaVersion": "3.0.0", + "playbooks": [ + "block-ip", + "isolate-endpoint", + "quarantine-email", + "suspend-terminal", + "incident-reporting" + ], + "analyticsRuleNames": [ + "phishing_detection", + "pos_void_refund", + "mfa_fatigue", + "data_exfiltration", + "ai_voice_fraud", + "credential_stuffing", + "after_hours_access", + "gift_card_fraud", + "pos_anomaly", + "ransomware_indicator", + "supply_chain_anomaly", + "supplier_impossible_travel", + "privileged_role_addition" + ] + }, + "resources": [ + { + "condition": "[parameters('deployDataConnector')]", + "type": "Microsoft.Resources/deployments", + "apiVersion": "2022-09-01", + "name": "retailshield-data-connector", + "properties": { + "mode": "Incremental", + "templateLink": { + "relativePath": "../sentinel/data-connectors/retailshield-connector.json" + }, + "parameters": { + "workspaceName": { "value": "[parameters('workspaceName')]" }, + "location": { "value": "[parameters('location')]" } + } + } + }, + { + "condition": "[parameters('deployPlaybooks')]", + "type": "Microsoft.Resources/deployments", + "apiVersion": "2022-09-01", + "name": "retailshield-playbook-block-ip", + "properties": { + "mode": "Incremental", + "templateLink": { + "relativePath": "../logic-apps/block-ip/workflow.json" + }, + "parameters": { + "workspaceName": { "value": "[parameters('workspaceName')]" }, + "location": { "value": "[parameters('location')]" } + } + } + }, + { + "condition": "[parameters('deployPlaybooks')]", + "type": "Microsoft.Resources/deployments", + "apiVersion": "2022-09-01", + "name": "retailshield-playbook-isolate-endpoint", + "properties": { + "mode": "Incremental", + "templateLink": { + "relativePath": "../logic-apps/isolate-endpoint/workflow.json" + }, + "parameters": { + "workspaceName": { "value": "[parameters('workspaceName')]" }, + "location": { "value": "[parameters('location')]" } + } + } + }, + { + "condition": "[parameters('deployPlaybooks')]", + "type": "Microsoft.Resources/deployments", + "apiVersion": "2022-09-01", + "name": "retailshield-playbook-quarantine-email", + "properties": { + "mode": "Incremental", + "templateLink": { + "relativePath": "../logic-apps/quarantine-email/workflow.json" + }, + "parameters": { + "workspaceName": { "value": "[parameters('workspaceName')]" }, + "location": { "value": "[parameters('location')]" } + } + } + }, + { + "condition": "[parameters('deployPlaybooks')]", + "type": "Microsoft.Resources/deployments", + "apiVersion": "2022-09-01", + "name": "retailshield-playbook-suspend-terminal", + "properties": { + "mode": "Incremental", + "templateLink": { + "relativePath": "../logic-apps/suspend-terminal/workflow.json" + }, + "parameters": { + "workspaceName": { "value": "[parameters('workspaceName')]" }, + "location": { "value": "[parameters('location')]" } + } + } + }, + { + "condition": "[parameters('deployPlaybooks')]", + "type": "Microsoft.Resources/deployments", + "apiVersion": "2022-09-01", + "name": "retailshield-playbook-incident-reporting", + "properties": { + "mode": "Incremental", + "templateLink": { + "relativePath": "../logic-apps/incident-reporting/workflow.json" + }, + "parameters": { + "workspaceName": { "value": "[parameters('workspaceName')]" }, + "location": { "value": "[parameters('location')]" }, + "complianceContactEmail": { "value": "[parameters('complianceContactEmail')]" }, + "organisationName": { "value": "[parameters('organisationName')]" } + } + } + }, + { + "condition": "[parameters('deployAnalyticsRules')]", + "type": "Microsoft.Resources/deployments", + "apiVersion": "2022-09-01", + "name": "retailshield-rule-phishing-detection", + "dependsOn": [ + "[resourceId('Microsoft.Resources/deployments', 'retailshield-data-connector')]" + ], + "properties": { + "mode": "Incremental", + "templateLink": { + "relativePath": "../sentinel/analytics-rules/phishing_detection.json" + }, + "parameters": { + "workspaceName": { "value": "[parameters('workspaceName')]" } + } + } + }, + { + "condition": "[parameters('deployAnalyticsRules')]", + "type": "Microsoft.Resources/deployments", + "apiVersion": "2022-09-01", + "name": "retailshield-rule-pos-void-refund", + "dependsOn": [ + "[resourceId('Microsoft.Resources/deployments', 'retailshield-data-connector')]" + ], + "properties": { + "mode": "Incremental", + "templateLink": { + "relativePath": "../sentinel/analytics-rules/pos_void_refund.json" + }, + "parameters": { + "workspaceName": { "value": "[parameters('workspaceName')]" } + } + } + }, + { + "condition": "[parameters('deployAnalyticsRules')]", + "type": "Microsoft.Resources/deployments", + "apiVersion": "2022-09-01", + "name": "retailshield-rule-mfa-fatigue", + "dependsOn": [ + "[resourceId('Microsoft.Resources/deployments', 'retailshield-data-connector')]" + ], + "properties": { + "mode": "Incremental", + "templateLink": { + "relativePath": "../sentinel/analytics-rules/mfa_fatigue.json" + }, + "parameters": { + "workspaceName": { "value": "[parameters('workspaceName')]" } + } + } + }, + { + "condition": "[parameters('deployAnalyticsRules')]", + "type": "Microsoft.Resources/deployments", + "apiVersion": "2022-09-01", + "name": "retailshield-rule-data-exfiltration", + "dependsOn": [ + "[resourceId('Microsoft.Resources/deployments', 'retailshield-data-connector')]" + ], + "properties": { + "mode": "Incremental", + "templateLink": { + "relativePath": "../sentinel/analytics-rules/data_exfiltration.json" + }, + "parameters": { + "workspaceName": { "value": "[parameters('workspaceName')]" } + } + } + }, + { + "condition": "[parameters('deployAnalyticsRules')]", + "type": "Microsoft.Resources/deployments", + "apiVersion": "2022-09-01", + "name": "retailshield-rule-ai-voice-fraud", + "dependsOn": [ + "[resourceId('Microsoft.Resources/deployments', 'retailshield-data-connector')]" + ], + "properties": { + "mode": "Incremental", + "templateLink": { + "relativePath": "../sentinel/analytics-rules/ai_voice_fraud.json" + }, + "parameters": { + "workspaceName": { "value": "[parameters('workspaceName')]" } + } + } + }, + { + "condition": "[parameters('deployAnalyticsRules')]", + "type": "Microsoft.Resources/deployments", + "apiVersion": "2022-09-01", + "name": "retailshield-rule-credential-stuffing", + "dependsOn": [ + "[resourceId('Microsoft.Resources/deployments', 'retailshield-data-connector')]" + ], + "properties": { + "mode": "Incremental", + "templateLink": { + "relativePath": "../sentinel/analytics-rules/credential_stuffing.json" + }, + "parameters": { + "workspaceName": { "value": "[parameters('workspaceName')]" } + } + } + }, + { + "condition": "[parameters('deployAnalyticsRules')]", + "type": "Microsoft.Resources/deployments", + "apiVersion": "2022-09-01", + "name": "retailshield-rule-after-hours-access", + "dependsOn": [ + "[resourceId('Microsoft.Resources/deployments', 'retailshield-data-connector')]" + ], + "properties": { + "mode": "Incremental", + "templateLink": { + "relativePath": "../sentinel/analytics-rules/after_hours_access.json" + }, + "parameters": { + "workspaceName": { "value": "[parameters('workspaceName')]" } + } + } + }, + { + "condition": "[parameters('deployAnalyticsRules')]", + "type": "Microsoft.Resources/deployments", + "apiVersion": "2022-09-01", + "name": "retailshield-rule-gift-card-fraud", + "dependsOn": [ + "[resourceId('Microsoft.Resources/deployments', 'retailshield-data-connector')]" + ], + "properties": { + "mode": "Incremental", + "templateLink": { + "relativePath": "../sentinel/analytics-rules/gift_card_fraud.json" + }, + "parameters": { + "workspaceName": { "value": "[parameters('workspaceName')]" } + } + } + }, + { + "condition": "[parameters('deployAnalyticsRules')]", + "type": "Microsoft.Resources/deployments", + "apiVersion": "2022-09-01", + "name": "retailshield-rule-pos-anomaly", + "dependsOn": [ + "[resourceId('Microsoft.Resources/deployments', 'retailshield-data-connector')]" + ], + "properties": { + "mode": "Incremental", + "templateLink": { + "relativePath": "../sentinel/analytics-rules/pos_anomaly.json" + }, + "parameters": { + "workspaceName": { "value": "[parameters('workspaceName')]" } + } + } + }, + { + "condition": "[parameters('deployAnalyticsRules')]", + "type": "Microsoft.Resources/deployments", + "apiVersion": "2022-09-01", + "name": "retailshield-rule-ransomware-indicator", + "dependsOn": [ + "[resourceId('Microsoft.Resources/deployments', 'retailshield-data-connector')]" + ], + "properties": { + "mode": "Incremental", + "templateLink": { + "relativePath": "../sentinel/analytics-rules/ransomware_indicator.json" + }, + "parameters": { + "workspaceName": { "value": "[parameters('workspaceName')]" } + } + } + }, + { + "condition": "[parameters('deployAnalyticsRules')]", + "type": "Microsoft.Resources/deployments", + "apiVersion": "2022-09-01", + "name": "retailshield-rule-supply-chain-anomaly", + "dependsOn": [ + "[resourceId('Microsoft.Resources/deployments', 'retailshield-data-connector')]" + ], + "properties": { + "mode": "Incremental", + "templateLink": { + "relativePath": "../sentinel/analytics-rules/supply_chain_anomaly.json" + }, + "parameters": { + "workspaceName": { "value": "[parameters('workspaceName')]" } + } + } + }, + { + "condition": "[parameters('deployAnalyticsRules')]", + "type": "Microsoft.Resources/deployments", + "apiVersion": "2022-09-01", + "name": "retailshield-rule-supplier-impossible-travel", + "dependsOn": [ + "[resourceId('Microsoft.Resources/deployments', 'retailshield-data-connector')]" + ], + "properties": { + "mode": "Incremental", + "templateLink": { + "relativePath": "../sentinel/analytics-rules/supplier_impossible_travel.json" + }, + "parameters": { + "workspaceName": { "value": "[parameters('workspaceName')]" } + } + } + }, + { + "condition": "[parameters('deployAnalyticsRules')]", + "type": "Microsoft.Resources/deployments", + "apiVersion": "2022-09-01", + "name": "retailshield-rule-privileged-role-addition", + "dependsOn": [ + "[resourceId('Microsoft.Resources/deployments', 'retailshield-data-connector')]" + ], + "properties": { + "mode": "Incremental", + "templateLink": { + "relativePath": "../sentinel/analytics-rules/privileged_role_addition.json" + }, + "parameters": { + "workspaceName": { "value": "[parameters('workspaceName')]" } + } + } + } + ], + "outputs": { + "solutionVersion": { + "type": "string", + "value": "[variables('solutionVersion')]" + }, + "workspaceName": { + "type": "string", + "value": "[parameters('workspaceName')]" + }, + "playbacksDeployed": { + "type": "bool", + "value": "[parameters('deployPlaybooks')]" + }, + "analyticsRulesDeployed": { + "type": "bool", + "value": "[parameters('deployAnalyticsRules')]" + } + } +} diff --git a/tests/detection-rules/test_kql_rules.py b/tests/detection-rules/test_kql_rules.py index 62c6525..4b9d1e0 100644 --- a/tests/detection-rules/test_kql_rules.py +++ b/tests/detection-rules/test_kql_rules.py @@ -1 +1,1262 @@ -"""\nRetailShield — KQL Rule Validation Test Suite\nValidates syntax structure, required metadata, and logic correctness\nfor all detection-rules/*.kql files without a live Sentinel connection.\n"""\n\nimport re\nfrom pathlib import Path\n\nRULES_DIR = Path(__file__).parent.parent.parent / \"detection-rules\"\n\n\ndef load_rule(filename: str) -> str:\n path = RULES_DIR / filename\n assert path.exists(), f\"Rule file not found: {path}\"\n return path.read_text(encoding=\"utf-8\")\n\n\n# ── Helpers ─────────────────────────────────────────────────────────────────────────────\n\ndef assert_metadata(content: str, key: str, value: str):\n pattern = rf\"//\\s*{re.escape(key)}\\s*:.*{re.escape(value)}\"\n assert re.search(pattern, content, re.IGNORECASE), (\n f\"Missing or incorrect metadata '{key}: {value}'\"\n )\n\n\ndef assert_contains(content: str, *fragments: str):\n for fragment in fragments:\n assert fragment in content, f\"Expected '{fragment}' not found in rule\"\n\n\ndef assert_not_contains(content: str, *fragments: str):\n for fragment in fragments:\n assert fragment not in content, f\"Unexpected fragment '{fragment}' found in rule\"\n\n\n# ── phishing_detection.kql ─────────────────────────────────────────────────────────────────────\n\nclass TestPhishingDetection:\n RULE = \"retail/phishing_detection.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1566.001\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Initial Access\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"High\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"5 minutes\")\n\n def test_queries_email_attachment_table(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"EmailAttachmentInfo\")\n\n def test_queries_email_events_table(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"EmailEvents\")\n\n def test_filters_inbound_delivered_emails(self):\n content = load_rule(self.RULE)\n assert_contains(content, '\"Delivered\"', '\"Inbound\"')\n\n def test_suspicious_extensions_defined(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"SuspiciousExtensions\",\n '\".exe\"',\n '\".ps1\"',\n '\".vbs\"',\n '\".docm\"',\n )\n\n def test_suppresses_approved_senders(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"ApprovedSenderDomains\", \"leftanti\")\n\n def test_lure_classification_present(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"LureCategory\",\n \"Financial\",\n \"Credential\",\n )\n\n def test_playbook_trigger_field_exposed(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"quarantine_email\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content), (\n \"Rule contains a hardcoded GUID — use a watchlist or parameter instead\"\n )\n\n def test_uses_ingestion_time_not_static_timestamp(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"ingestion_time()\")\n assert_not_contains(content, \"datetime(2\")\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n required_fields = [\n \"SenderAddress\",\n \"RecipientEmailAddress\",\n \"Subject\",\n \"AttachmentFileName\",\n \"AttachmentSHA256\",\n \"NetworkMessageId\",\n \"AlertSeverity\",\n ]\n for field in required_fields:\n assert_contains(content, field)\n\n\n# ── ransomware_indicator.kql ──────────────────────────────────────────────────────────────────\n\nclass TestRansomwareIndicator:\n RULE = \"retail/ransomware_indicator.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1486\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Impact\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"Critical\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"5 minutes\")\n\n def test_mass_file_rename_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"DeviceFileEvents\", \"FileRenamed\", \"MassRenameThresh\")\n\n def test_shadow_copy_deletion_signal(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"vssadmin delete shadows\",\n \"wmic shadowcopy delete\",\n \"bcdedit /set recoveryenabled no\",\n \"ShadowCopyDeletion\",\n )\n\n def test_known_ransomware_process_names(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"lockbit\",\n \"blackcat\",\n \"conti\",\n \"ryuk\",\n \"KnownRansomwareProcess\",\n )\n\n def test_c2_beacon_signal(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"DeviceNetworkEvents\",\n \"RetailIOCWatchlist\",\n \"BeaconThresh\",\n \"C2BeaconToRansomwareIP\",\n )\n\n def test_ioc_watchlist_used(self):\n content = load_rule(self.RULE)\n assert_contains(content, '_GetWatchlist(\"RetailIOCWatchlist\")')\n\n def test_private_ip_exclusions(self):\n content = load_rule(self.RULE)\n assert_contains(content, '\"10.\"', '\"192.168.\"', '\"127.0.0.1\"')\n\n def test_union_combines_all_signals(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"MassFileRename\",\n \"ShadowCopyDeletion\",\n \"RansomwareProcesses\",\n \"C2Beaconing\",\n )\n\n def test_playbook_trigger_field_exposed(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"isolate_endpoint\")\n\n def test_risk_score_is_100(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskScore\", \"100\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content), (\n \"Rule contains a hardcoded GUID — use a watchlist or parameter instead\"\n )\n\n def test_uses_ingestion_time_not_static_timestamp(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"ingestion_time()\")\n assert_not_contains(content, \"datetime(2\")\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n required_fields = [\n \"DeviceName\",\n \"DeviceId\",\n \"ThreatSignal\",\n \"ProcessCommandLine\",\n \"RenameCount\",\n \"SampleFiles\",\n \"AffectedFolders\",\n \"RemoteIP\",\n \"BeaconCount\",\n \"AlertSeverity\",\n \"MitreTechnique\",\n \"MitreTactic\",\n \"PlaybookTrigger\",\n \"RiskScore\",\n ]\n for field in required_fields:\n assert_contains(content, field)\n\n\n# ── pos_anomaly.kql ─────────────────────────────────────────────────────────────────────────────\n\nclass TestPosAnomaly:\n RULE = \"retail/pos_anomaly.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1056.001\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Collection\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"High\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"15 minutes\")\n\n def test_pos_process_names_defined(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"POSProcessNames\", \"xstore.exe\", \"aloha.exe\", \"posready.exe\")\n\n def test_retail_terminal_prefix_defined(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RetailTerminalPrefix\", '\"pos-\"', '\"till-\"', '\"kiosk-\"')\n\n def test_unknown_dll_injection_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"UnknownDLLInjection\", \"DeviceEvents\", \"ImageLoaded\")\n\n def test_abnormal_transaction_volume_signal(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"AbnormalTransactionVolume\",\n \"RetailShield_Logs_CL\",\n \"TransactionVolumeThreshold\",\n )\n\n def test_process_memory_dump_signal(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"ProcessMemoryDump\",\n \"ProcessDumped\",\n \"ProcDump\",\n )\n\n def test_suspicious_network_signal(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"SuspiciousNetworkFromPOS\",\n \"DeviceNetworkEvents\",\n \"RetailIOCWatchlist\",\n )\n\n def test_playbook_trigger_field_exposed(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"suspend_terminal\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_uses_ingestion_time_not_static_timestamp(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"ingestion_time()\")\n assert_not_contains(content, \"datetime(2\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content), (\n \"Rule contains a hardcoded GUID — use a watchlist or parameter instead\"\n )\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n required_fields = [\n \"AlertTitle\",\n \"DeviceName\",\n \"DeviceId\",\n \"AlertSeverity\",\n \"RiskScore\",\n \"SignalCount\",\n \"Signals\",\n \"MitreTechnique\",\n \"MitreTactic\",\n \"PlaybookTrigger\",\n \"FirstSeen\",\n \"LastSeen\",\n ]\n for field in required_fields:\n assert_contains(content, field)\n\n\n# ── ai_voice_fraud.kql ─────────────────────────────────────────────────────────────────────\n\nclass TestAiVoiceFraud:\n RULE = \"retail/ai_voice_fraud.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1598\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Reconnaissance\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"High\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"30 minutes\")\n\n def test_ai_confidence_threshold_defined(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"AIConfidenceThreshold\")\n\n def test_fraud_keywords_defined(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"FraudKeywords\",\n '\"urgent\"',\n '\"override\"',\n '\"bypass\"',\n '\"transfer\"',\n )\n\n def test_business_hours_defined(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"BusinessHoursStart\", \"BusinessHoursEnd\")\n\n def test_high_confidence_voice_fraud_signal(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"HighConfidenceVoiceFraud\",\n \"RetailShield_Logs_CL\",\n \"AI_Voice_Fraud\",\n )\n\n def test_urgent_financial_request_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"UrgentFinancialRequest\")\n\n def test_spoofed_caller_id_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"SpoofedCallerID\")\n\n def test_after_hours_voice_fraud_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"AfterHoursVoiceFraud\", \"hourofday\")\n\n def test_playbook_trigger_field_exposed(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"notify_soc\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_uses_ingestion_time_not_static_timestamp(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"ingestion_time()\")\n assert_not_contains(content, \"datetime(2\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content), (\n \"Rule contains a hardcoded GUID — use a watchlist or parameter instead\"\n )\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n required_fields = [\n \"AlertTitle\",\n \"DeviceName\",\n \"TargetEmployee\",\n \"ImpersonatingEntity\",\n \"RequestMade\",\n \"AlertSeverity\",\n \"RiskScore\",\n \"SignalCount\",\n \"Signals\",\n \"MitreTechnique\",\n \"MitreTactic\",\n \"PlaybookTrigger\",\n \"FirstSeen\",\n \"LastSeen\",\n ]\n for field in required_fields:\n assert_contains(content, field)\n\n\n# ── supply_chain_anomaly.kql ──────────────────────────────────────────────────────────────────\n\nclass TestSupplyChainAnomaly:\n RULE = \"retail/supply_chain_anomaly.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1195\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Initial Access\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"High\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"30 minutes\")\n\n def test_admin_endpoints_defined(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"AdminEndpoints\",\n '\"/admin\"',\n '\"/management\"',\n '\"/config\"',\n )\n\n def test_agreeable_endpoints_defined(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"AgreeableEndpoints\",\n '\"/inventory\"',\n '\"/orders\"',\n )\n\n def test_bulk_export_threshold_defined(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"BulkExportThreshold\")\n\n def test_supplier_admin_access_signal(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"SupplierAdminAccess\",\n \"AzureDiagnostics\",\n \"supplier_api_key\",\n )\n\n def test_new_service_principal_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"NewServicePrincipal\", \"AuditLogs\")\n\n def test_unauthorised_endpoint_access_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"UnauthorisedEndpointAccess\")\n\n def test_mass_data_export_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MassDataExport\", \"BulkExportThreshold\")\n\n def test_playbook_trigger_field_exposed(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"notify_soc\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_uses_ingestion_time_not_static_timestamp(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"ingestion_time()\")\n assert_not_contains(content, \"datetime(2\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content), (\n \"Rule contains a hardcoded GUID — use a watchlist or parameter instead\"\n )\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n required_fields = [\n \"AlertTitle\",\n \"DeviceName\",\n \"SupplierKey\",\n \"AlertSeverity\",\n \"RiskScore\",\n \"SignalCount\",\n \"Signals\",\n \"MitreTechnique\",\n \"MitreTactic\",\n \"PlaybookTrigger\",\n \"FirstSeen\",\n \"LastSeen\",\n ]\n for field in required_fields:\n assert_contains(content, field)\n\n\n# ── pos_void_refund.kql ──────────────────────────────────────────────────────────────────\n\nclass TestPosVoidRefund:\n RULE = \"retail/pos_void_refund.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1056.001\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Collection\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"High\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"5 minutes\")\n\n def test_source_table(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RetailShield_POS_CL\")\n\n def test_business_hours_params(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"BusinessHourStart\", \"BusinessHourEnd\")\n\n def test_threshold_params(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"VoidRefundThresh\", \"HighValueThresh\")\n\n def test_after_hours_void_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"AfterHoursVoidRefund\")\n\n def test_high_volume_void_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"HighVolumeVoidRefund\")\n\n def test_high_value_void_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"HighValueVoidNoOverride\")\n\n def test_tender_mismatch_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"TenderMismatchRefund\", \"CASH\", \"GIFT_CARD\")\n\n def test_playbook_trigger(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"notify_soc\")\n\n def test_risk_score(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskScore\", \"80\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content)\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n for field in [\n \"OperatorId_s\", \"StoreId_s\", \"TerminalId_s\", \"ThreatSignal\",\n \"TotalValue\", \"AlertSeverity\", \"MitreTechnique\", \"MitreTactic\",\n \"PlaybookTrigger\", \"RiskScore\",\n ]:\n assert_contains(content, field)\n\n\n# ── gift_card_fraud.kql ──────────────────────────────────────────────────────────────────\n\nclass TestGiftCardFraud:\n RULE = \"retail/gift_card_fraud.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1657\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Impact\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"High\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"5 minutes\")\n\n def test_source_table(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RetailShield_POS_CL\")\n\n def test_activation_threshold(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"GiftCardActivThresh\")\n\n def test_structuring_params(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"StructuringThreshold\", \"StructuringBand\")\n\n def test_multi_terminal_threshold(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MultiTerminalThresh\")\n\n def test_high_velocity_activation_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"HighVelocityGiftCardActivation\")\n\n def test_structured_purchase_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"StructuredGiftCardPurchase\")\n\n def test_drain_and_reload_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"GiftCardDrainAndReload\")\n\n def test_multi_terminal_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"GiftCardMultiTerminalUse\")\n\n def test_playbook_trigger(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"notify_soc\")\n\n def test_risk_score(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskScore\", \"85\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content)\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n for field in [\n \"GiftCardNumber_s\", \"OperatorId_s\", \"StoreId_s\", \"TerminalId_s\",\n \"ThreatSignal\", \"ActivationCount\", \"UniqueCards\", \"TotalValue\",\n \"TerminalCount\", \"TerminalList\", \"AlertSeverity\", \"MitreTechnique\",\n \"MitreTactic\", \"PlaybookTrigger\", \"RiskScore\",\n ]:\n assert_contains(content, field)\n\n\n# ── mfa_fatigue.kql ──────────────────────────────────────────────────────────────────\n\nclass TestMfaFatigue:\n RULE = \"retail/mfa_fatigue.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1621\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Credential Access\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"High\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"5 minutes\")\n\n def test_source_table(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"SigninLogs\")\n\n def test_mfa_prompt_threshold(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MFAPromptThresh\")\n\n def test_mfa_result_codes(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MFAResultCodes\", \"50074\", \"50076\", \"50079\")\n\n def test_prompt_flood_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MFAPromptFlood\")\n\n def test_fatigue_acceptance_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"FatigueAcceptance\")\n\n def test_distributed_mfa_flood_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"DistributedMFAFlood\")\n\n def test_risky_signin_post_flood_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskySigninPostMFAFlood\")\n\n def test_playbook_trigger(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"block_ip\")\n\n def test_risk_score(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskScore\", \"85\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content)\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n for field in [\n \"UserPrincipalName\", \"ThreatSignal\", \"PromptCount\", \"SourceIPs\",\n \"SourceIPList\", \"Locations\", \"AppNames\", \"SuccessIP\", \"SuccessCountry\",\n \"RiskLevel\", \"RiskyIP\", \"AlertSeverity\", \"MitreTechnique\",\n \"MitreTactic\", \"PlaybookTrigger\", \"RiskScore\",\n ]:\n assert_contains(content, field)\n\n\n# ── credential_stuffing.kql ──────────────────────────────────────────────────────────────────\n\nclass TestCredentialStuffing:\n RULE = \"retail/credential_stuffing.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1110.004\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Credential Access\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"High\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"5 minutes\")\n\n def test_source_table(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"SigninLogs\")\n\n def test_abuse_ipdb_watchlist(self):\n content = load_rule(self.RULE)\n assert_contains(content, '_GetWatchlist(\"AbuseIPDBWatchlist\")')\n\n def test_threshold_params(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"FailThreshPerAcct\", \"MinSourceIPs\")\n\n def test_distributed_login_failure_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"DistributedLoginFailure\")\n\n def test_account_takeover_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"AccountTakeoverAfterStuffing\")\n\n def test_blacklisted_ip_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"StuffingFromBlacklistedIP\")\n\n def test_risky_signin_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskySigninAfterStuffing\")\n\n def test_playbook_trigger(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"block_ip\")\n\n def test_risk_score(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskScore\", \"80\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content)\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n for field in [\n \"UserPrincipalName\", \"ThreatSignal\", \"FailCount\", \"SourceIPs\",\n \"SourceIPList\", \"IPAddress\", \"TargetAccts\", \"TargetList\",\n \"SuccessIP\", \"SuccessCountry\", \"RiskLevel\", \"AppNames\",\n \"AlertSeverity\", \"MitreTechnique\", \"MitreTactic\",\n \"PlaybookTrigger\", \"RiskScore\",\n ]:\n assert_contains(content, field)\n\n\n# ── after_hours_access.kql ──────────────────────────────────────────────────────────────────\n\nclass TestAfterHoursAccess:\n RULE = \"retail/after_hours_access.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1078\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Persistence\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"Medium\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"5 minutes\")\n\n def test_source_tables(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"SigninLogs\", \"AuditLogs\", \"DeviceLogonEvents\")\n\n def test_service_accounts_watchlist(self):\n content = load_rule(self.RULE)\n assert_contains(content, '_GetWatchlist(\"RetailServiceAccounts\")', \"leftanti\")\n\n def test_business_hours_params(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"BusinessHourStart\", \"BusinessHourEnd\")\n\n def test_after_hours_interactive_login_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"AfterHoursInteractiveLogin\")\n\n def test_after_hours_privileged_operation_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"AfterHoursPrivilegedOperation\")\n\n def test_after_hours_device_logon_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"AfterHoursSensitiveDeviceLogon\")\n\n def test_playbook_trigger(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"notify_soc\")\n\n def test_risk_score(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskScore\", \"60\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content)\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n for field in [\n \"Timestamp\", \"AccountName\", \"ThreatSignal\", \"LoginCount\",\n \"SourceIPs\", \"Locations\", \"AppNames\", \"Devices\",\n \"RemoteIP\", \"Operations\", \"TargetObjects\", \"AlertSeverity\",\n \"MitreTechnique\", \"MitreTactic\", \"PlaybookTrigger\", \"RiskScore\",\n ]:\n assert_contains(content, field)\n\n\n# ── data_exfiltration.kql ──────────────────────────────────────────────────────────────────\n\nclass TestDataExfiltration:\n RULE = \"retail/data_exfiltration.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1048\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Exfiltration\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"Critical\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"5 minutes\")\n\n def test_source_tables(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"DeviceNetworkEvents\", \"DeviceFileEvents\")\n\n def test_ioc_watchlist(self):\n content = load_rule(self.RULE)\n assert_contains(content, '_GetWatchlist(\"RetailIOCWatchlist\")')\n\n def test_threshold_params(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"DNSQueryThresh\", \"SubdomainLenThresh\",\n \"OutboundMBThresh\", \"StagingFileThresh\")\n\n def test_private_ip_exclusions(self):\n content = load_rule(self.RULE)\n assert_contains(content, '\"10.\"', '\"192.168.\"', '\"172.\"', '\"127.0.0.1\"')\n\n def test_dns_tunneling_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"DNSTunneling\")\n\n def test_large_outbound_transfer_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"LargeOutboundTransfer\")\n\n def test_ioc_matched_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"IOCMatchedExfilTarget\")\n\n def test_data_staging_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"DataStagingToExfil\")\n\n def test_playbook_trigger(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"data_exfil_contain\")\n\n def test_risk_score(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskScore\", \"90\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content)\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n for field in [\n \"Timestamp\", \"DeviceName\", \"DeviceId\", \"AccountName\", \"ThreatSignal\",\n \"NormalisedTarget\", \"QueryCount\", \"UniqueSubdomains\", \"TotalSentMB\",\n \"TotalSentBytes\", \"FileReadCount\", \"SampleFiles\", \"UniqueFolders\",\n \"ExfilTargets\", \"AlertSeverity\", \"MitreTechnique\", \"MitreTactic\",\n \"PlaybookTrigger\", \"RiskScore\",\n ]:\n assert_contains(content, field)\n\n\n# ── supplier_impossible_travel.kql ──────────────────────────────────────────────────────────\n\nclass TestSupplierImpossibleTravel:\n RULE = \"retail/supplier_impossible_travel.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1199\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Initial Access\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"Medium\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"15 minutes\")\n\n def test_source_table(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"SigninLogs\")\n\n def test_supplier_accounts_watchlist(self):\n content = load_rule(self.RULE)\n assert_contains(content, '_GetWatchlist(\"RetailSupplierAccounts\")')\n\n def test_impossible_speed_threshold(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"ImpossibleSpeedKmph\", \"900\")\n\n def test_high_risk_countries(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"HighRiskCountries\", '\"RU\"', '\"CN\"', '\"KP\"', '\"IR\"')\n\n def test_geo_distance_function(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"geo_distance_2points\")\n\n def test_impossible_travel_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"ImpossibleTravel\", \"RequiredSpeedKmph\")\n\n def test_new_country_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"NewCountryForSupplier\")\n\n def test_high_risk_country_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"HighRiskCountrySignin\")\n\n def test_playbook_trigger(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"notify_soc\")\n\n def test_risk_score(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskScore\", \"70\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content)\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n for field in [\n \"Timestamp\", \"UserPrincipalName\", \"SupplierName\", \"ThreatSignal\",\n \"IPAddress\", \"CountryCode\", \"City\", \"SecondCountry\", \"SecondCity\",\n \"RequiredSpeedKmph\", \"DistanceKm\", \"AlertSeverity\", \"MitreTechnique\",\n \"MitreTactic\", \"PlaybookTrigger\", \"RiskScore\",\n ]:\n assert_contains(content, field)\n\n\n# ── privileged_role_addition.kql ──────────────────────────────────────────────────────────\n\nclass TestPrivilegedRoleAddition:\n RULE = \"retail/privileged_role_addition.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1098\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Persistence\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"High\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"5 minutes\")\n\n def test_source_table(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"AuditLogs\")\n\n def test_sensitive_roles_defined(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"SensitiveRoles\",\n \"Global Administrator\",\n \"Privileged Role Administrator\",\n \"Security Administrator\",\n )\n\n def test_sensitive_role_assigned_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"SensitiveRoleAssigned\")\n\n def test_after_hours_role_addition_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"AfterHoursRoleAddition\", \"HourOfDay\")\n\n def test_mfa_change_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RoleAdditionFollowedByMFAChange\")\n\n def test_sensitive_group_member_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"SensitiveGroupMemberAdded\")\n\n def test_playbook_trigger(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"notify_soc\")\n\n def test_risk_score(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskScore\", \"85\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content)\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n for field in [\n \"Timestamp\", \"InitiatedByUser\", \"TargetUser\", \"RoleDisplayName\",\n \"ThreatSignal\", \"HourOfDay\", \"MFAOperation\", \"AlertSeverity\",\n \"MitreTechnique\", \"MitreTactic\", \"PlaybookTrigger\", \"RiskScore\",\n ]:\n assert_contains(content, field)\n \ No newline at end of file +""" +RetailShield — KQL Rule Validation Test Suite +Validates syntax structure, required metadata, and logic correctness +for all detection-rules/*.kql files without a live Sentinel connection. +""" + +import re +from pathlib import Path + +RULES_DIR = Path(__file__).parent.parent.parent / "detection-rules" + + +def load_rule(filename: str) -> str: + path = RULES_DIR / filename + assert path.exists(), f"Rule file not found: {path}" + return path.read_text(encoding="utf-8") + + +# ── Helpers ───────────────────────────────────────────────────────────────────────────── + +def assert_metadata(content: str, key: str, value: str): + pattern = rf"//\s*{re.escape(key)}\s*:.*{re.escape(value)}" + assert re.search(pattern, content, re.IGNORECASE), ( + f"Missing or incorrect metadata '{key}: {value}'" + ) + + +def assert_contains(content: str, *fragments: str): + for fragment in fragments: + assert fragment in content, f"Expected '{fragment}' not found in rule" + + +def assert_not_contains(content: str, *fragments: str): + for fragment in fragments: + assert fragment not in content, f"Unexpected fragment '{fragment}' found in rule" + + +# ── phishing_detection.kql ───────────────────────────────────────────────────────────────────── + +class TestPhishingDetection: + RULE = "retail/phishing_detection.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1566.001") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Initial Access") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "High") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "5 minutes") + + def test_queries_email_attachment_table(self): + content = load_rule(self.RULE) + assert_contains(content, "EmailAttachmentInfo") + + def test_queries_email_events_table(self): + content = load_rule(self.RULE) + assert_contains(content, "EmailEvents") + + def test_filters_inbound_delivered_emails(self): + content = load_rule(self.RULE) + assert_contains(content, '"Delivered"', '"Inbound"') + + def test_suspicious_extensions_defined(self): + content = load_rule(self.RULE) + assert_contains( + content, + "SuspiciousExtensions", + '".exe"', + '".ps1"', + '".vbs"', + '".docm"', + ) + + def test_suppresses_approved_senders(self): + content = load_rule(self.RULE) + assert_contains(content, "ApprovedSenderDomains", "leftanti") + + def test_lure_classification_present(self): + content = load_rule(self.RULE) + assert_contains( + content, + "LureCategory", + "Financial", + "Credential", + ) + + def test_playbook_trigger_field_exposed(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "quarantine_email") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content), ( + "Rule contains a hardcoded GUID — use a watchlist or parameter instead" + ) + + def test_uses_ingestion_time_not_static_timestamp(self): + content = load_rule(self.RULE) + assert_contains(content, "ingestion_time()") + assert_not_contains(content, "datetime(2") + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + required_fields = [ + "SenderAddress", + "RecipientEmailAddress", + "Subject", + "AttachmentFileName", + "AttachmentSHA256", + "NetworkMessageId", + "AlertSeverity", + ] + for field in required_fields: + assert_contains(content, field) + + +# ── ransomware_indicator.kql ────────────────────────────────────────────────────────────────── + +class TestRansomwareIndicator: + RULE = "retail/ransomware_indicator.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1486") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Impact") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "Critical") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "5 minutes") + + def test_mass_file_rename_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "DeviceFileEvents", "FileRenamed", "MassRenameThresh") + + def test_shadow_copy_deletion_signal(self): + content = load_rule(self.RULE) + assert_contains( + content, + "vssadmin delete shadows", + "wmic shadowcopy delete", + "bcdedit /set recoveryenabled no", + "ShadowCopyDeletion", + ) + + def test_known_ransomware_process_names(self): + content = load_rule(self.RULE) + assert_contains( + content, + "lockbit", + "blackcat", + "conti", + "ryuk", + "KnownRansomwareProcess", + ) + + def test_c2_beacon_signal(self): + content = load_rule(self.RULE) + assert_contains( + content, + "DeviceNetworkEvents", + "RetailIOCWatchlist", + "BeaconThresh", + "C2BeaconToRansomwareIP", + ) + + def test_ioc_watchlist_used(self): + content = load_rule(self.RULE) + assert_contains(content, '_GetWatchlist("RetailIOCWatchlist")') + + def test_private_ip_exclusions(self): + content = load_rule(self.RULE) + assert_contains(content, '"10."', '"192.168."', '"127.0.0.1"') + + def test_union_combines_all_signals(self): + content = load_rule(self.RULE) + assert_contains( + content, + "MassFileRename", + "ShadowCopyDeletion", + "RansomwareProcesses", + "C2Beaconing", + ) + + def test_playbook_trigger_field_exposed(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "isolate_endpoint") + + def test_risk_score_is_100(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskScore", "100") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content), ( + "Rule contains a hardcoded GUID — use a watchlist or parameter instead" + ) + + def test_uses_ingestion_time_not_static_timestamp(self): + content = load_rule(self.RULE) + assert_contains(content, "ingestion_time()") + assert_not_contains(content, "datetime(2") + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + required_fields = [ + "DeviceName", + "DeviceId", + "ThreatSignal", + "ProcessCommandLine", + "RenameCount", + "SampleFiles", + "AffectedFolders", + "RemoteIP", + "BeaconCount", + "AlertSeverity", + "MitreTechnique", + "MitreTactic", + "PlaybookTrigger", + "RiskScore", + ] + for field in required_fields: + assert_contains(content, field) + + +# ── pos_anomaly.kql ───────────────────────────────────────────────────────────────────────────── + +class TestPosAnomaly: + RULE = "retail/pos_anomaly.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1056.001") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Collection") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "High") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "15 minutes") + + def test_pos_process_names_defined(self): + content = load_rule(self.RULE) + assert_contains(content, "POSProcessNames", "xstore.exe", "aloha.exe", "posready.exe") + + def test_retail_terminal_prefix_defined(self): + content = load_rule(self.RULE) + assert_contains(content, "RetailTerminalPrefix", '"pos-"', '"till-"', '"kiosk-"') + + def test_unknown_dll_injection_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "UnknownDLLInjection", "DeviceEvents", "ImageLoaded") + + def test_abnormal_transaction_volume_signal(self): + content = load_rule(self.RULE) + assert_contains( + content, + "AbnormalTransactionVolume", + "RetailShield_Logs_CL", + "TransactionVolumeThreshold", + ) + + def test_process_memory_dump_signal(self): + content = load_rule(self.RULE) + assert_contains( + content, + "ProcessMemoryDump", + "ProcessDumped", + "ProcDump", + ) + + def test_suspicious_network_signal(self): + content = load_rule(self.RULE) + assert_contains( + content, + "SuspiciousNetworkFromPOS", + "DeviceNetworkEvents", + "RetailIOCWatchlist", + ) + + def test_playbook_trigger_field_exposed(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "suspend_terminal") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_uses_ingestion_time_not_static_timestamp(self): + content = load_rule(self.RULE) + assert_contains(content, "ingestion_time()") + assert_not_contains(content, "datetime(2") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content), ( + "Rule contains a hardcoded GUID — use a watchlist or parameter instead" + ) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + required_fields = [ + "AlertTitle", + "DeviceName", + "DeviceId", + "AlertSeverity", + "RiskScore", + "SignalCount", + "Signals", + "MitreTechnique", + "MitreTactic", + "PlaybookTrigger", + "FirstSeen", + "LastSeen", + ] + for field in required_fields: + assert_contains(content, field) + + +# ── ai_voice_fraud.kql ───────────────────────────────────────────────────────────────────── + +class TestAiVoiceFraud: + RULE = "retail/ai_voice_fraud.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1598") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Reconnaissance") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "High") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "30 minutes") + + def test_ai_confidence_threshold_defined(self): + content = load_rule(self.RULE) + assert_contains(content, "AIConfidenceThreshold") + + def test_fraud_keywords_defined(self): + content = load_rule(self.RULE) + assert_contains( + content, + "FraudKeywords", + '"urgent"', + '"override"', + '"bypass"', + '"transfer"', + ) + + def test_business_hours_defined(self): + content = load_rule(self.RULE) + assert_contains(content, "BusinessHoursStart", "BusinessHoursEnd") + + def test_high_confidence_voice_fraud_signal(self): + content = load_rule(self.RULE) + assert_contains( + content, + "HighConfidenceVoiceFraud", + "RetailShield_Logs_CL", + "AI_Voice_Fraud", + ) + + def test_urgent_financial_request_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "UrgentFinancialRequest") + + def test_spoofed_caller_id_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "SpoofedCallerID") + + def test_after_hours_voice_fraud_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "AfterHoursVoiceFraud", "hourofday") + + def test_playbook_trigger_field_exposed(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "notify_soc") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_uses_ingestion_time_not_static_timestamp(self): + content = load_rule(self.RULE) + assert_contains(content, "ingestion_time()") + assert_not_contains(content, "datetime(2") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content), ( + "Rule contains a hardcoded GUID — use a watchlist or parameter instead" + ) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + required_fields = [ + "AlertTitle", + "DeviceName", + "TargetEmployee", + "ImpersonatingEntity", + "RequestMade", + "AlertSeverity", + "RiskScore", + "SignalCount", + "Signals", + "MitreTechnique", + "MitreTactic", + "PlaybookTrigger", + "FirstSeen", + "LastSeen", + ] + for field in required_fields: + assert_contains(content, field) + + +# ── supply_chain_anomaly.kql ────────────────────────────────────────────────────────────────── + +class TestSupplyChainAnomaly: + RULE = "retail/supply_chain_anomaly.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1195") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Initial Access") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "High") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "30 minutes") + + def test_admin_endpoints_defined(self): + content = load_rule(self.RULE) + assert_contains( + content, + "AdminEndpoints", + '"/admin"', + '"/management"', + '"/config"', + ) + + def test_agreeable_endpoints_defined(self): + content = load_rule(self.RULE) + assert_contains( + content, + "AgreeableEndpoints", + '"/inventory"', + '"/orders"', + ) + + def test_bulk_export_threshold_defined(self): + content = load_rule(self.RULE) + assert_contains(content, "BulkExportThreshold") + + def test_supplier_admin_access_signal(self): + content = load_rule(self.RULE) + assert_contains( + content, + "SupplierAdminAccess", + "AzureDiagnostics", + "supplier_api_key", + ) + + def test_new_service_principal_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "NewServicePrincipal", "AuditLogs") + + def test_unauthorised_endpoint_access_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "UnauthorisedEndpointAccess") + + def test_mass_data_export_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "MassDataExport", "BulkExportThreshold") + + def test_playbook_trigger_field_exposed(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "notify_soc") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_uses_ingestion_time_not_static_timestamp(self): + content = load_rule(self.RULE) + assert_contains(content, "ingestion_time()") + assert_not_contains(content, "datetime(2") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content), ( + "Rule contains a hardcoded GUID — use a watchlist or parameter instead" + ) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + required_fields = [ + "AlertTitle", + "DeviceName", + "SupplierKey", + "AlertSeverity", + "RiskScore", + "SignalCount", + "Signals", + "MitreTechnique", + "MitreTactic", + "PlaybookTrigger", + "FirstSeen", + "LastSeen", + ] + for field in required_fields: + assert_contains(content, field) + + +# ── pos_void_refund.kql ────────────────────────────────────────────────────────────────── + +class TestPosVoidRefund: + RULE = "retail/pos_void_refund.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1056.001") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Collection") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "High") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "5 minutes") + + def test_source_table(self): + content = load_rule(self.RULE) + assert_contains(content, "RetailShield_POS_CL") + + def test_business_hours_params(self): + content = load_rule(self.RULE) + assert_contains(content, "BusinessHourStart", "BusinessHourEnd") + + def test_threshold_params(self): + content = load_rule(self.RULE) + assert_contains(content, "VoidRefundThresh", "HighValueThresh") + + def test_after_hours_void_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "AfterHoursVoidRefund") + + def test_high_volume_void_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "HighVolumeVoidRefund") + + def test_high_value_void_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "HighValueVoidNoOverride") + + def test_tender_mismatch_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "TenderMismatchRefund", "CASH", "GIFT_CARD") + + def test_playbook_trigger(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "notify_soc") + + def test_risk_score(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskScore", "80") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + for field in [ + "OperatorId_s", "StoreId_s", "TerminalId_s", "ThreatSignal", + "TotalValue", "AlertSeverity", "MitreTechnique", "MitreTactic", + "PlaybookTrigger", "RiskScore", + ]: + assert_contains(content, field) + + +# ── gift_card_fraud.kql ────────────────────────────────────────────────────────────────── + +class TestGiftCardFraud: + RULE = "retail/gift_card_fraud.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1657") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Impact") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "High") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "5 minutes") + + def test_source_table(self): + content = load_rule(self.RULE) + assert_contains(content, "RetailShield_POS_CL") + + def test_activation_threshold(self): + content = load_rule(self.RULE) + assert_contains(content, "GiftCardActivThresh") + + def test_structuring_params(self): + content = load_rule(self.RULE) + assert_contains(content, "StructuringThreshold", "StructuringBand") + + def test_multi_terminal_threshold(self): + content = load_rule(self.RULE) + assert_contains(content, "MultiTerminalThresh") + + def test_high_velocity_activation_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "HighVelocityGiftCardActivation") + + def test_structured_purchase_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "StructuredGiftCardPurchase") + + def test_drain_and_reload_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "GiftCardDrainAndReload") + + def test_multi_terminal_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "GiftCardMultiTerminalUse") + + def test_playbook_trigger(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "notify_soc") + + def test_risk_score(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskScore", "85") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + for field in [ + "GiftCardNumber_s", "OperatorId_s", "StoreId_s", "TerminalId_s", + "ThreatSignal", "ActivationCount", "UniqueCards", "TotalValue", + "TerminalCount", "TerminalList", "AlertSeverity", "MitreTechnique", + "MitreTactic", "PlaybookTrigger", "RiskScore", + ]: + assert_contains(content, field) + + +# ── mfa_fatigue.kql ────────────────────────────────────────────────────────────────── + +class TestMfaFatigue: + RULE = "retail/mfa_fatigue.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1621") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Credential Access") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "High") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "5 minutes") + + def test_source_table(self): + content = load_rule(self.RULE) + assert_contains(content, "SigninLogs") + + def test_mfa_prompt_threshold(self): + content = load_rule(self.RULE) + assert_contains(content, "MFAPromptThresh") + + def test_mfa_result_codes(self): + content = load_rule(self.RULE) + assert_contains(content, "MFAResultCodes", "50074", "50076", "50079") + + def test_prompt_flood_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "MFAPromptFlood") + + def test_fatigue_acceptance_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "FatigueAcceptance") + + def test_distributed_mfa_flood_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "DistributedMFAFlood") + + def test_risky_signin_post_flood_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskySigninPostMFAFlood") + + def test_playbook_trigger(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "block_ip") + + def test_risk_score(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskScore", "85") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + for field in [ + "UserPrincipalName", "ThreatSignal", "PromptCount", "SourceIPs", + "SourceIPList", "Locations", "AppNames", "SuccessIP", "SuccessCountry", + "RiskLevel", "RiskyIP", "AlertSeverity", "MitreTechnique", + "MitreTactic", "PlaybookTrigger", "RiskScore", + ]: + assert_contains(content, field) + + +# ── credential_stuffing.kql ────────────────────────────────────────────────────────────────── + +class TestCredentialStuffing: + RULE = "retail/credential_stuffing.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1110.004") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Credential Access") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "High") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "5 minutes") + + def test_source_table(self): + content = load_rule(self.RULE) + assert_contains(content, "SigninLogs") + + def test_abuse_ipdb_watchlist(self): + content = load_rule(self.RULE) + assert_contains(content, '_GetWatchlist("AbuseIPDBWatchlist")') + + def test_threshold_params(self): + content = load_rule(self.RULE) + assert_contains(content, "FailThreshPerAcct", "MinSourceIPs") + + def test_distributed_login_failure_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "DistributedLoginFailure") + + def test_account_takeover_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "AccountTakeoverAfterStuffing") + + def test_blacklisted_ip_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "StuffingFromBlacklistedIP") + + def test_risky_signin_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskySigninAfterStuffing") + + def test_playbook_trigger(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "block_ip") + + def test_risk_score(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskScore", "80") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + for field in [ + "UserPrincipalName", "ThreatSignal", "FailCount", "SourceIPs", + "SourceIPList", "IPAddress", "TargetAccts", "TargetList", + "SuccessIP", "SuccessCountry", "RiskLevel", "AppNames", + "AlertSeverity", "MitreTechnique", "MitreTactic", + "PlaybookTrigger", "RiskScore", + ]: + assert_contains(content, field) + + +# ── after_hours_access.kql ────────────────────────────────────────────────────────────────── + +class TestAfterHoursAccess: + RULE = "retail/after_hours_access.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1078") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Persistence") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "Medium") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "5 minutes") + + def test_source_tables(self): + content = load_rule(self.RULE) + assert_contains(content, "SigninLogs", "AuditLogs", "DeviceLogonEvents") + + def test_service_accounts_watchlist(self): + content = load_rule(self.RULE) + assert_contains(content, '_GetWatchlist("RetailServiceAccounts")', "leftanti") + + def test_business_hours_params(self): + content = load_rule(self.RULE) + assert_contains(content, "BusinessHourStart", "BusinessHourEnd") + + def test_after_hours_interactive_login_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "AfterHoursInteractiveLogin") + + def test_after_hours_privileged_operation_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "AfterHoursPrivilegedOperation") + + def test_after_hours_device_logon_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "AfterHoursSensitiveDeviceLogon") + + def test_playbook_trigger(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "notify_soc") + + def test_risk_score(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskScore", "60") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + for field in [ + "Timestamp", "AccountName", "ThreatSignal", "LoginCount", + "SourceIPs", "Locations", "AppNames", "Devices", + "RemoteIP", "Operations", "TargetObjects", "AlertSeverity", + "MitreTechnique", "MitreTactic", "PlaybookTrigger", "RiskScore", + ]: + assert_contains(content, field) + + +# ── data_exfiltration.kql ────────────────────────────────────────────────────────────────── + +class TestDataExfiltration: + RULE = "retail/data_exfiltration.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1048") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Exfiltration") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "Critical") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "5 minutes") + + def test_source_tables(self): + content = load_rule(self.RULE) + assert_contains(content, "DeviceNetworkEvents", "DeviceFileEvents") + + def test_ioc_watchlist(self): + content = load_rule(self.RULE) + assert_contains(content, '_GetWatchlist("RetailIOCWatchlist")') + + def test_threshold_params(self): + content = load_rule(self.RULE) + assert_contains( + content, "DNSQueryThresh", "SubdomainLenThresh", + "OutboundMBThresh", "StagingFileThresh" + ) + + def test_private_ip_exclusions(self): + content = load_rule(self.RULE) + assert_contains(content, '"10."', '"192.168."', '"172."', '"127.0.0.1"') + + def test_dns_tunneling_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "DNSTunneling") + + def test_large_outbound_transfer_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "LargeOutboundTransfer") + + def test_ioc_matched_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "IOCMatchedExfilTarget") + + def test_data_staging_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "DataStagingToExfil") + + def test_playbook_trigger(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "data_exfil_contain") + + def test_risk_score(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskScore", "90") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + for field in [ + "Timestamp", "DeviceName", "DeviceId", "AccountName", "ThreatSignal", + "NormalisedTarget", "QueryCount", "UniqueSubdomains", "TotalSentMB", + "TotalSentBytes", "FileReadCount", "SampleFiles", "UniqueFolders", + "ExfilTargets", "AlertSeverity", "MitreTechnique", "MitreTactic", + "PlaybookTrigger", "RiskScore", + ]: + assert_contains(content, field) + + +# ── supplier_impossible_travel.kql ────────────────────────────────────────────────────────── + +class TestSupplierImpossibleTravel: + RULE = "retail/supplier_impossible_travel.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1199") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Initial Access") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "Medium") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "15 minutes") + + def test_source_table(self): + content = load_rule(self.RULE) + assert_contains(content, "SigninLogs") + + def test_supplier_accounts_watchlist(self): + content = load_rule(self.RULE) + assert_contains(content, '_GetWatchlist("RetailSupplierAccounts")') + + def test_impossible_speed_threshold(self): + content = load_rule(self.RULE) + assert_contains(content, "ImpossibleSpeedKmph", "900") + + def test_high_risk_countries(self): + content = load_rule(self.RULE) + assert_contains(content, "HighRiskCountries", '"RU"', '"CN"', '"KP"', '"IR"') + + def test_geo_distance_function(self): + content = load_rule(self.RULE) + assert_contains(content, "geo_distance_2points") + + def test_impossible_travel_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "ImpossibleTravel", "RequiredSpeedKmph") + + def test_new_country_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "NewCountryForSupplier") + + def test_high_risk_country_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "HighRiskCountrySignin") + + def test_playbook_trigger(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "notify_soc") + + def test_risk_score(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskScore", "70") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + for field in [ + "Timestamp", "UserPrincipalName", "SupplierName", "ThreatSignal", + "IPAddress", "CountryCode", "City", "SecondCountry", "SecondCity", + "RequiredSpeedKmph", "DistanceKm", "AlertSeverity", "MitreTechnique", + "MitreTactic", "PlaybookTrigger", "RiskScore", + ]: + assert_contains(content, field) + + +# ── privileged_role_addition.kql ────────────────────────────────────────────────────────── + +class TestPrivilegedRoleAddition: + RULE = "retail/privileged_role_addition.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1098") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Persistence") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "High") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "5 minutes") + + def test_source_table(self): + content = load_rule(self.RULE) + assert_contains(content, "AuditLogs") + + def test_sensitive_roles_defined(self): + content = load_rule(self.RULE) + assert_contains( + content, + "SensitiveRoles", + "Global Administrator", + "Privileged Role Administrator", + "Security Administrator", + ) + + def test_sensitive_role_assigned_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "SensitiveRoleAssigned") + + def test_after_hours_role_addition_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "AfterHoursRoleAddition", "HourOfDay") + + def test_mfa_change_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "RoleAdditionFollowedByMFAChange") + + def test_sensitive_group_member_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "SensitiveGroupMemberAdded") + + def test_playbook_trigger(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "notify_soc") + + def test_risk_score(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskScore", "85") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + for field in [ + "Timestamp", "InitiatedByUser", "TargetUser", "RoleDisplayName", + "ThreatSignal", "HourOfDay", "MFAOperation", "AlertSeverity", + "MitreTechnique", "MitreTactic", "PlaybookTrigger", "RiskScore", + ]: + assert_contains(content, field)