diff --git a/logic-apps/README.md b/logic-apps/README.md index 0ab597f..a793c32 100644 --- a/logic-apps/README.md +++ b/logic-apps/README.md @@ -1,9 +1,41 @@ # Logic Apps (SOAR Playbooks) -Azure Logic App workflow definitions for automated incident response. +Azure Logic App workflow definitions for automated incident response and UK regulatory compliance assistance. + +## Containment Playbooks + +Triggered automatically via Sentinel Automation Rules when the `PlaybookTrigger` custom detail in an alert matches the trigger value. + +| Workflow | Trigger Value | Action | +|---|---|---| +| `block-ip/` | `block_ip` | Adds the offending IP to an Entra ID Conditional Access named location to block sign-ins | +| `isolate-endpoint/` | `isolate_endpoint` | Isolates the device via the Microsoft Defender for Endpoint API | +| `quarantine-email/` | `quarantine_email` | Notifies the affected mailbox user and submits the mail message to Defender for Office 365 quarantine review | +| `suspend-terminal/` | `suspend_terminal` | Calls the POS management REST API to suspend the identified terminal (API key retrieved from Key Vault) | + +## Compliance Playbooks + +| Workflow | Trigger | Action | +|---|---|---| +| `incident-reporting/` | High or Critical severity incident created | Calculates UK 24h / 72h regulatory deadlines, drafts an ICO/NCSC-format incident report, and emails the designated compliance contact. **Never auto-files with any government body.** See [`incident-reporting/README.md`](incident-reporting/README.md). | + +## Other Playbooks (In Development) | Workflow | Purpose | |---|---| | `triage-classify/` | Auto-triage incoming Sentinel incidents and assign severity | -| `threat-intel-enrich/` | Enrich IOCs via VirusTotal, AbuseIPDB, and Microsoft TI | -| `containment/` | Block IP, disable AD account, or isolate host via Defender | +| `threat-intel-enrich/` | Enrich IOCs via VirusTotal, AbuseIPDB, and Microsoft Threat Intelligence | +| `containment/` | Generalised containment actions (block IP, disable AD account, isolate host) | + +## Deployment + +See [`DEPLOYMENT.md`](DEPLOYMENT.md) for full step-by-step deployment instructions, including role assignments and API connection authorisation steps. + +Quick deploy example: + +```bash +az deployment group create \ + --resource-group \ + --template-file logic-apps/block-ip/workflow.json \ + --parameters workspaceName= +``` diff --git a/logic-apps/incident-reporting/README.md b/logic-apps/incident-reporting/README.md new file mode 100644 index 0000000..648aaaf --- /dev/null +++ b/logic-apps/incident-reporting/README.md @@ -0,0 +1,130 @@ +# UK Incident Reporting / Compliance Assistant + +## ⚠️ Critical Design Note + +**This playbook ASSISTS compliance by drafting and alerting. It never auto-files official government reports.** + +Automated submission to regulators requires human judgement. A false or premature report filed with the ICO or NCSC can itself constitute a regulatory failure and carry independent legal consequences. This playbook generates a pre-filled draft and emails it to your designated compliance contact. A qualified person must review, verify, and submit via the official portals. + +--- + +## UK Regulatory Requirements + +### Cyber Security and Resilience Bill (2025) + +The UK Government's Cyber Security and Resilience Bill imposes a **24-hour early-warning** duty on organisations operating in-scope sectors (operators of essential services, digital service providers, and supply chain organisations). The early warning must be filed with the relevant supervisory authority within 24 hours of becoming aware of a significant incident. A full report follows. + +### UK GDPR / Data Protection Act 2018 — ICO Notification + +Under UK GDPR Article 33 and the Data Protection Act 2018, organisations must notify the Information Commissioner's Office (ICO) within **72 hours** of becoming aware of a personal data breach, where that breach is likely to result in a risk to individuals’ rights and freedoms. Notification to affected individuals may also be required under Article 34. + +### NIS Regulations 2018 + +Operators of Essential Services and Relevant Digital Service Providers must report incidents with a significant impact on service continuity to their Competent Authority within timelines that vary by sector (typically 72 hours). + +### Why it matters — Fines + +| Regulation | Maximum penalty | +|---|---| +| UK GDPR / DPA 2018 | **£17.5 million** or **4% of global annual turnover** (whichever is higher) | +| Cyber Security and Resilience Bill | Expected to align with NIS2 enforcement levels | +| NIS Regulations 2018 | Up to **£17 million** | + +Missing the notification window without justification is itself an infringement that regulators treat as a separate failure, compounding the original incident. + +--- + +## What This Playbook Does + +| Step | Action | +|---|---| +| 1 | Triggers when a **High or Critical** severity Microsoft Sentinel incident is created | +| 2 | Extracts incident details: title, detection timestamp, severity, status, affected accounts / hosts / IPs, MITRE technique | +| 3 | Calculates the **24-hour early-warning deadline** (Cyber Security and Resilience Bill) and the **72-hour full-report deadline** (UK GDPR / ICO) from the incident creation timestamp | +| 4 | Composes a pre-filled draft incident report structured to meet ICO/NCSC reporting requirements | +| 5 | Emails the draft report to your organisation’s **designated compliance contact** (configurable parameter — never a government address) | +| 6 | Posts a comment to the originating Sentinel incident recording that the compliance notification was generated, with both deadline timestamps | + +## What This Playbook Does NOT Do + +- Does **not** send anything to the ICO, NCSC, or any government body or system +- Does **not** determine whether an incident is legally reportable — that is a DPO/legal determination +- Does **not** replace your incident response plan, legal counsel, or DPO +- Does **not** transmit data outside your Azure tenant (email is sent via your own Office 365 connection) +- Does **not** file on a schedule or retry — it fires once per qualifying incident + +--- + +## Configuration + +Deploy using the Azure CLI: + +```bash +az deployment group create \ + --resource-group \ + --template-file logic-apps/incident-reporting/workflow.json \ + --parameters workspaceName= \ + complianceContactEmail=security@yourorganisation.com \ + organisationName="Your Organisation Ltd" +``` + +### Parameters + +| Parameter | Required | Default | Description | +|---|---|---|---| +| `workspaceName` | Yes | — | Microsoft Sentinel Log Analytics workspace name | +| `complianceContactEmail` | Yes | — | Email address of the designated compliance or security contact who receives draft reports | +| `organisationName` | No | `RetailShield Operator` | Your organisation name, included in the draft report header | +| `logicAppName` | No | `RetailShield-UKIncidentReporting` | Name for the deployed Logic App resource | +| `location` | No | Resource group location | Azure region | + +--- + +## Required Permissions + +The Logic App uses a **system-assigned managed identity**. After deployment: + +1. Open the Logic App in the Azure portal +2. Go to **Identity → System assigned** and copy the **Object ID** +3. Assign **Microsoft Sentinel Responder** to the managed identity on your Sentinel workspace (required to read incident entities and post comments) + +The Office 365 connection requires user-delegated consent. After deployment, open the Logic App in the portal and authorise the Office 365 API connection under **API connections**. + +--- + +## Official Reporting Channels + +The compliance email includes direct links to both portals. Your compliance contact must submit through these — no other method is accepted: + +| Regulator | Portal | Use for | +|---|---|---| +| Information Commissioner’s Office (ICO) | https://ico.org.uk/for-organisations/report-a-breach/ | Personal data breaches (UK GDPR Article 33) | +| National Cyber Security Centre (NCSC) | https://www.ncsc.gov.uk/section/about-this-website/incident-management | Significant cyber incidents | + +--- + +## Draft Report Structure + +The playbook pre-fills the following sections based on incident data from Sentinel. Fields marked `[REVIEWER]` in the email require human input before any submission: + +1. Nature of the incident +2. Date and time of detection +3. Systems and data potentially affected *(requires human verification)* +4. Estimated number of individuals affected *(requires DPO input — do not guess)* +5. Likely consequences *(requires human assessment)* +6. Measures taken or proposed *(auto-populated from Sentinel status; must be expanded)* +7. Current investigation status + +--- + +## Automation Rule Setup + +To trigger this playbook automatically, create a Sentinel Automation Rule: + +1. Go to **Sentinel → Automation → Create → Automation rule** +2. Trigger: **When incident is created** +3. Condition: **Incident severity** → **Equals** → **High** +4. Action: **Run playbook** → select `RetailShield-UKIncidentReporting` +5. Save + +If your environment uses the Defender XDR unified portal and has "Critical" as a severity level, add a second condition for Critical. diff --git a/logic-apps/incident-reporting/workflow.json b/logic-apps/incident-reporting/workflow.json new file mode 100644 index 0000000..c8f1a6b --- /dev/null +++ b/logic-apps/incident-reporting/workflow.json @@ -0,0 +1,263 @@ +{ + "$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#", + "contentVersion": "1.0.0.0", + "parameters": { + "workspaceName": { + "type": "string", + "metadata": { "description": "Microsoft Sentinel Log Analytics workspace name" } + }, + "logicAppName": { + "type": "string", + "defaultValue": "RetailShield-UKIncidentReporting", + "metadata": { "description": "Name of the Logic App" } + }, + "location": { + "type": "string", + "defaultValue": "[resourceGroup().location]", + "metadata": { "description": "Azure region for the Logic App" } + }, + "complianceContactEmail": { + "type": "string", + "metadata": { + "description": "Email address of your organisation's designated compliance or security contact. This is the ONLY address this playbook sends to — never a government address." + } + }, + "organisationName": { + "type": "string", + "defaultValue": "RetailShield Operator", + "metadata": { "description": "Your organisation name, included in the draft report header." } + } + }, + "variables": { + "sentinelConnectionName": "azuresentinel", + "office365ConnectionName": "office365" + }, + "resources": [ + { + "type": "Microsoft.Web/connections", + "apiVersion": "2016-06-01", + "name": "[variables('sentinelConnectionName')]", + "location": "[parameters('location')]", + "properties": { + "displayName": "Microsoft Sentinel", + "api": { + "id": "[concat('/subscriptions/', subscription().subscriptionId, '/providers/Microsoft.Web/locations/', parameters('location'), '/managedApis/azuresentinel')]" + } + } + }, + { + "type": "Microsoft.Web/connections", + "apiVersion": "2016-06-01", + "name": "[variables('office365ConnectionName')]", + "location": "[parameters('location')]", + "properties": { + "displayName": "Office 365 — Compliance Notifications", + "api": { + "id": "[concat('/subscriptions/', subscription().subscriptionId, '/providers/Microsoft.Web/locations/', parameters('location'), '/managedApis/office365')]" + } + } + }, + { + "type": "Microsoft.Logic/workflows", + "apiVersion": "2019-05-01", + "name": "[parameters('logicAppName')]", + "location": "[parameters('location')]", + "identity": { "type": "SystemAssigned" }, + "dependsOn": [ + "[resourceId('Microsoft.Web/connections', variables('sentinelConnectionName'))]", + "[resourceId('Microsoft.Web/connections', variables('office365ConnectionName'))]" + ], + "properties": { + "state": "Enabled", + "definition": { + "$schema": "https://schema.management.azure.com/providers/Microsoft.Logic/schemas/2016-06-01/workflowdefinition.json#", + "contentVersion": "1.0.0.0", + "parameters": { + "$connections": { "defaultValue": {}, "type": "Object" }, + "complianceContactEmail": { "defaultValue": "", "type": "String" }, + "organisationName": { "defaultValue": "Retail Organisation", "type": "String" } + }, + "triggers": { + "Microsoft_Sentinel_incident": { + "type": "ApiConnectionWebhook", + "inputs": { + "body": { "callback_url": "@{listCallbackUrl()}" }, + "host": { + "connection": { "name": "@parameters('$connections')['azuresentinel']['connectionId']" } + }, + "path": "/incident-creation" + } + } + }, + "actions": { + "Check_severity": { + "type": "If", + "expression": { + "or": [ + { "equals": ["@triggerBody()?['object']?['properties']?['severity']", "High"] }, + { "equals": ["@triggerBody()?['object']?['properties']?['severity']", "Critical"] } + ] + }, + "actions": { + "Get_incident_entities": { + "type": "ApiConnection", + "inputs": { + "body": "@triggerBody()", + "host": { + "connection": { "name": "@parameters('$connections')['azuresentinel']['connectionId']" } + }, + "method": "post", + "path": "/Incidents/subscriptions/@{encodeURIComponent(triggerBody()?['workspaceInfo']?['SubscriptionId'])}/resourceGroups/@{encodeURIComponent(triggerBody()?['workspaceInfo']?['ResourceGroupName'])}/workspaces/@{encodeURIComponent(triggerBody()?['workspaceInfo']?['WorkspaceName'])}/incidents/@{encodeURIComponent(triggerBody()?['object']?['properties']?['incidentNumber'])}/entities" + }, + "runAfter": {} + }, + "Filter_account_entities": { + "type": "Query", + "inputs": { + "from": "@body('Get_incident_entities')?['entities']", + "where": "@equals(item()?['kind'], 'Account')" + }, + "runAfter": { "Get_incident_entities": ["Succeeded"] } + }, + "Filter_host_entities": { + "type": "Query", + "inputs": { + "from": "@body('Get_incident_entities')?['entities']", + "where": "@equals(item()?['kind'], 'Host')" + }, + "runAfter": { "Get_incident_entities": ["Succeeded"] } + }, + "Filter_ip_entities": { + "type": "Query", + "inputs": { + "from": "@body('Get_incident_entities')?['entities']", + "where": "@equals(item()?['kind'], 'Ip')" + }, + "runAfter": { "Get_incident_entities": ["Succeeded"] } + }, + "Select_account_names": { + "type": "Select", + "inputs": { + "from": "@body('Filter_account_entities')", + "select": "@coalesce(item()?['properties']?['upn'], item()?['properties']?['accountName'], 'Unknown Account')" + }, + "runAfter": { "Filter_account_entities": ["Succeeded"] } + }, + "Select_host_names": { + "type": "Select", + "inputs": { + "from": "@body('Filter_host_entities')", + "select": "@coalesce(item()?['properties']?['hostName'], item()?['properties']?['dnsDomain'], 'Unknown Host')" + }, + "runAfter": { "Filter_host_entities": ["Succeeded"] } + }, + "Select_ip_addresses": { + "type": "Select", + "inputs": { + "from": "@body('Filter_ip_entities')", + "select": "@coalesce(item()?['properties']?['address'], 'Unknown IP')" + }, + "runAfter": { "Filter_ip_entities": ["Succeeded"] } + }, + "Compose_detection_time": { + "type": "Compose", + "inputs": "@formatDateTime(triggerBody()?['object']?['properties']?['createdTimeUtc'], 'dd/MM/yyyy HH:mm')", + "runAfter": { "Get_incident_entities": ["Succeeded"] } + }, + "Compose_24h_deadline": { + "type": "Compose", + "inputs": "@formatDateTime(addHours(triggerBody()?['object']?['properties']?['createdTimeUtc'], 24), 'dd/MM/yyyy HH:mm')", + "runAfter": { "Get_incident_entities": ["Succeeded"] } + }, + "Compose_72h_deadline": { + "type": "Compose", + "inputs": "@formatDateTime(addHours(triggerBody()?['object']?['properties']?['createdTimeUtc'], 72), 'dd/MM/yyyy HH:mm')", + "runAfter": { "Get_incident_entities": ["Succeeded"] } + }, + "Compose_email_subject": { + "type": "Compose", + "inputs": "[ACTION REQUIRED] Critical Security Incident — UK Regulatory Notification Due by @{outputs('Compose_24h_deadline')} UTC — @{triggerBody()?['object']?['properties']?['title']}", + "runAfter": { + "Select_account_names": ["Succeeded"], + "Select_host_names": ["Succeeded"], + "Select_ip_addresses": ["Succeeded"], + "Compose_detection_time": ["Succeeded"], + "Compose_24h_deadline": ["Succeeded"], + "Compose_72h_deadline": ["Succeeded"] + } + }, + "Compose_email_body": { + "type": "Compose", + "inputs": "

⚠ CRITICAL SECURITY INCIDENT — UK REGULATORY NOTIFICATION REQUIRED


⚠ IMPORTANT — HUMAN REVIEW REQUIRED: This draft was generated automatically by the RetailShield UK Incident Reporting playbook. A qualified person must review, verify, and submit via the official channels below. This system does NOT auto-submit to any government body. Submitting inaccurate reports to regulators carries independent legal risk.

Incident Summary

Organisation@{parameters('organisationName')}
Incident Title@{triggerBody()?['object']?['properties']?['title']}
Incident Number#@{triggerBody()?['object']?['properties']?['incidentNumber']}
Severity@{triggerBody()?['object']?['properties']?['severity']}
Detection Time (UTC)@{outputs('Compose_detection_time')} UTC
Current Status@{triggerBody()?['object']?['properties']?['status']}
MITRE Technique@{coalesce(triggerBody()?['object']?['properties']?['additionalData']?['customDetails']?['MitreTechnique'], 'See incident details in Sentinel')}
Affected Accounts@{if(equals(length(body('Select_account_names')), 0), 'None identified at this time', join(body('Select_account_names'), ', '))}
Affected Hosts / Devices@{if(equals(length(body('Select_host_names')), 0), 'None identified at this time', join(body('Select_host_names'), ', '))}
Source IP Addresses@{if(equals(length(body('Select_ip_addresses')), 0), 'None identified at this time', join(body('Select_ip_addresses'), ', '))}

⏳ Regulatory Deadlines

24-Hour Early WarningCyber Security and Resilience Bill (early notification duty)@{outputs('Compose_24h_deadline')} UTC
72-Hour Full ReportUK GDPR Article 33 / ICO breach notification@{outputs('Compose_72h_deadline')} UTC


Draft Incident Report — Pre-filled for Review

Fields marked [REVIEWER] require human input before submission. Do not submit with placeholders.

1. Nature of the incident
A @{triggerBody()?['object']?['properties']?['severity']}-severity cyber security incident was detected on @{outputs('Compose_detection_time')} UTC by the RetailShield automated threat detection system operating within Microsoft Sentinel. The incident is classified as: @{triggerBody()?['object']?['properties']?['title']}.

2. Date and time the incident was detected
@{outputs('Compose_detection_time')} UTC (Sentinel incident #@{triggerBody()?['object']?['properties']?['incidentNumber']} created).

3. Systems and/or data potentially affected
The following assets were identified: Accounts — @{if(equals(length(body('Select_account_names')), 0), 'none identified at this time', join(body('Select_account_names'), ', '))}. Hosts/Devices — @{if(equals(length(body('Select_host_names')), 0), 'none identified at this time', join(body('Select_host_names'), ', '))}. [REVIEWER: confirm which systems and categories of personal data may be affected. Do not submit until verified.]

4. Estimated number of individuals affected
[REVIEWER: to be determined — do not estimate. Consult your Data Protection Officer.]

5. Likely consequences of the breach
[REVIEWER: describe the likely impact based on the incident type, data categories involved, and current evidence. Do not speculate beyond what evidence supports.]

6. Measures taken or proposed to address the breach
Automated detection and initial containment actions have been triggered by the RetailShield SOAR platform. Current incident status in Microsoft Sentinel: @{triggerBody()?['object']?['properties']?['status']}. [REVIEWER: describe specific containment steps taken — system isolation, credential resets, customer notification status, forensic investigation steps initiated, etc.]

7. Current status of the investigation
Investigation ongoing as of @{outputs('Compose_detection_time')} UTC. Microsoft Sentinel incident #@{triggerBody()?['object']?['properties']?['incidentNumber']} remains active. [REVIEWER: update with current status at time of actual submission.]



Official Reporting Channels

You must submit via these official portals. Do not reply to this email.

ICO — Personal data / GDPR breacheshttps://ico.org.uk/for-organisations/report-a-breach/
NCSC — Cyber incidentshttps://www.ncsc.gov.uk/section/about-this-website/incident-management


This notification was generated automatically by the RetailShield UK Incident Reporting playbook in response to Microsoft Sentinel incident #@{triggerBody()?['object']?['properties']?['incidentNumber']}. It is an advisory and drafting tool only. Always apply human judgement and legal advice before filing regulatory reports. Incorrect or premature submissions may have independent legal consequences.

", + "runAfter": { "Compose_email_subject": ["Succeeded"] } + }, + "Send_compliance_email": { + "type": "ApiConnection", + "inputs": { + "host": { + "connection": { "name": "@parameters('$connections')['office365']['connectionId']" } + }, + "method": "post", + "path": "/Mail", + "body": { + "To": "@parameters('complianceContactEmail')", + "Subject": "@{outputs('Compose_email_subject')}", + "Body": "@{outputs('Compose_email_body')}", + "Importance": "High", + "IsHtml": true + } + }, + "runAfter": { "Compose_email_body": ["Succeeded"] } + }, + "Add_comment_to_incident": { + "type": "ApiConnection", + "inputs": { + "body": { + "message": "

📋 UK Compliance Notification Generated

The RetailShield UK Incident Reporting playbook has been triggered for this High/Critical severity incident.

  • Draft report emailed to: designated compliance contact
  • 24-hour early-warning deadline (Cyber Security and Resilience Bill): @{outputs('Compose_24h_deadline')} UTC
  • 72-hour full-report deadline (UK GDPR / ICO): @{outputs('Compose_72h_deadline')} UTC

⚠ Action required: the compliance contact must review the draft report and submit via official channels if the incident is legally reportable. This playbook does NOT auto-file with any regulator.

Official channels: ICO (https://ico.org.uk/for-organisations/report-a-breach/) | NCSC (https://www.ncsc.gov.uk/section/about-this-website/incident-management)

" + }, + "host": { + "connection": { "name": "@parameters('$connections')['azuresentinel']['connectionId']" } + }, + "method": "post", + "path": "/Incidents/subscriptions/@{encodeURIComponent(triggerBody()?['workspaceInfo']?['SubscriptionId'])}/resourceGroups/@{encodeURIComponent(triggerBody()?['workspaceInfo']?['ResourceGroupName'])}/workspaces/@{encodeURIComponent(triggerBody()?['workspaceInfo']?['WorkspaceName'])}/incidents/@{encodeURIComponent(triggerBody()?['object']?['properties']?['incidentNumber'])}/comments" + }, + "runAfter": { "Send_compliance_email": ["Succeeded", "Failed"] } + } + }, + "else": { "actions": {} }, + "runAfter": {} + } + } + }, + "parameters": { + "$connections": { + "value": { + "azuresentinel": { + "connectionId": "[resourceId('Microsoft.Web/connections', variables('sentinelConnectionName'))]", + "connectionName": "[variables('sentinelConnectionName')]", + "id": "[concat('/subscriptions/', subscription().subscriptionId, '/providers/Microsoft.Web/locations/', parameters('location'), '/managedApis/azuresentinel')]" + }, + "office365": { + "connectionId": "[resourceId('Microsoft.Web/connections', variables('office365ConnectionName'))]", + "connectionName": "[variables('office365ConnectionName')]", + "id": "[concat('/subscriptions/', subscription().subscriptionId, '/providers/Microsoft.Web/locations/', parameters('location'), '/managedApis/office365')]" + } + } + }, + "complianceContactEmail": { "value": "[parameters('complianceContactEmail')]" }, + "organisationName": { "value": "[parameters('organisationName')]" } + } + } + } + ], + "outputs": { + "logicAppResourceId": { + "type": "string", + "value": "[resourceId('Microsoft.Logic/workflows', parameters('logicAppName'))]" + }, + "principalId": { + "type": "string", + "value": "[reference(resourceId('Microsoft.Logic/workflows', parameters('logicAppName')), '2019-05-01', 'Full').identity.principalId]" + } + } +} diff --git a/tests/detection-rules/test_kql_rules.py b/tests/detection-rules/test_kql_rules.py index 62c6525..4b9d1e0 100644 --- a/tests/detection-rules/test_kql_rules.py +++ b/tests/detection-rules/test_kql_rules.py @@ -1 +1,1262 @@ -"""\nRetailShield — KQL Rule Validation Test Suite\nValidates syntax structure, required metadata, and logic correctness\nfor all detection-rules/*.kql files without a live Sentinel connection.\n"""\n\nimport re\nfrom pathlib import Path\n\nRULES_DIR = Path(__file__).parent.parent.parent / \"detection-rules\"\n\n\ndef load_rule(filename: str) -> str:\n path = RULES_DIR / filename\n assert path.exists(), f\"Rule file not found: {path}\"\n return path.read_text(encoding=\"utf-8\")\n\n\n# ── Helpers ─────────────────────────────────────────────────────────────────────────────\n\ndef assert_metadata(content: str, key: str, value: str):\n pattern = rf\"//\\s*{re.escape(key)}\\s*:.*{re.escape(value)}\"\n assert re.search(pattern, content, re.IGNORECASE), (\n f\"Missing or incorrect metadata '{key}: {value}'\"\n )\n\n\ndef assert_contains(content: str, *fragments: str):\n for fragment in fragments:\n assert fragment in content, f\"Expected '{fragment}' not found in rule\"\n\n\ndef assert_not_contains(content: str, *fragments: str):\n for fragment in fragments:\n assert fragment not in content, f\"Unexpected fragment '{fragment}' found in rule\"\n\n\n# ── phishing_detection.kql ─────────────────────────────────────────────────────────────────────\n\nclass TestPhishingDetection:\n RULE = \"retail/phishing_detection.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1566.001\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Initial Access\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"High\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"5 minutes\")\n\n def test_queries_email_attachment_table(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"EmailAttachmentInfo\")\n\n def test_queries_email_events_table(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"EmailEvents\")\n\n def test_filters_inbound_delivered_emails(self):\n content = load_rule(self.RULE)\n assert_contains(content, '\"Delivered\"', '\"Inbound\"')\n\n def test_suspicious_extensions_defined(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"SuspiciousExtensions\",\n '\".exe\"',\n '\".ps1\"',\n '\".vbs\"',\n '\".docm\"',\n )\n\n def test_suppresses_approved_senders(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"ApprovedSenderDomains\", \"leftanti\")\n\n def test_lure_classification_present(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"LureCategory\",\n \"Financial\",\n \"Credential\",\n )\n\n def test_playbook_trigger_field_exposed(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"quarantine_email\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content), (\n \"Rule contains a hardcoded GUID — use a watchlist or parameter instead\"\n )\n\n def test_uses_ingestion_time_not_static_timestamp(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"ingestion_time()\")\n assert_not_contains(content, \"datetime(2\")\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n required_fields = [\n \"SenderAddress\",\n \"RecipientEmailAddress\",\n \"Subject\",\n \"AttachmentFileName\",\n \"AttachmentSHA256\",\n \"NetworkMessageId\",\n \"AlertSeverity\",\n ]\n for field in required_fields:\n assert_contains(content, field)\n\n\n# ── ransomware_indicator.kql ──────────────────────────────────────────────────────────────────\n\nclass TestRansomwareIndicator:\n RULE = \"retail/ransomware_indicator.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1486\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Impact\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"Critical\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"5 minutes\")\n\n def test_mass_file_rename_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"DeviceFileEvents\", \"FileRenamed\", \"MassRenameThresh\")\n\n def test_shadow_copy_deletion_signal(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"vssadmin delete shadows\",\n \"wmic shadowcopy delete\",\n \"bcdedit /set recoveryenabled no\",\n \"ShadowCopyDeletion\",\n )\n\n def test_known_ransomware_process_names(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"lockbit\",\n \"blackcat\",\n \"conti\",\n \"ryuk\",\n \"KnownRansomwareProcess\",\n )\n\n def test_c2_beacon_signal(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"DeviceNetworkEvents\",\n \"RetailIOCWatchlist\",\n \"BeaconThresh\",\n \"C2BeaconToRansomwareIP\",\n )\n\n def test_ioc_watchlist_used(self):\n content = load_rule(self.RULE)\n assert_contains(content, '_GetWatchlist(\"RetailIOCWatchlist\")')\n\n def test_private_ip_exclusions(self):\n content = load_rule(self.RULE)\n assert_contains(content, '\"10.\"', '\"192.168.\"', '\"127.0.0.1\"')\n\n def test_union_combines_all_signals(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"MassFileRename\",\n \"ShadowCopyDeletion\",\n \"RansomwareProcesses\",\n \"C2Beaconing\",\n )\n\n def test_playbook_trigger_field_exposed(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"isolate_endpoint\")\n\n def test_risk_score_is_100(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskScore\", \"100\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content), (\n \"Rule contains a hardcoded GUID — use a watchlist or parameter instead\"\n )\n\n def test_uses_ingestion_time_not_static_timestamp(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"ingestion_time()\")\n assert_not_contains(content, \"datetime(2\")\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n required_fields = [\n \"DeviceName\",\n \"DeviceId\",\n \"ThreatSignal\",\n \"ProcessCommandLine\",\n \"RenameCount\",\n \"SampleFiles\",\n \"AffectedFolders\",\n \"RemoteIP\",\n \"BeaconCount\",\n \"AlertSeverity\",\n \"MitreTechnique\",\n \"MitreTactic\",\n \"PlaybookTrigger\",\n \"RiskScore\",\n ]\n for field in required_fields:\n assert_contains(content, field)\n\n\n# ── pos_anomaly.kql ─────────────────────────────────────────────────────────────────────────────\n\nclass TestPosAnomaly:\n RULE = \"retail/pos_anomaly.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1056.001\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Collection\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"High\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"15 minutes\")\n\n def test_pos_process_names_defined(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"POSProcessNames\", \"xstore.exe\", \"aloha.exe\", \"posready.exe\")\n\n def test_retail_terminal_prefix_defined(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RetailTerminalPrefix\", '\"pos-\"', '\"till-\"', '\"kiosk-\"')\n\n def test_unknown_dll_injection_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"UnknownDLLInjection\", \"DeviceEvents\", \"ImageLoaded\")\n\n def test_abnormal_transaction_volume_signal(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"AbnormalTransactionVolume\",\n \"RetailShield_Logs_CL\",\n \"TransactionVolumeThreshold\",\n )\n\n def test_process_memory_dump_signal(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"ProcessMemoryDump\",\n \"ProcessDumped\",\n \"ProcDump\",\n )\n\n def test_suspicious_network_signal(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"SuspiciousNetworkFromPOS\",\n \"DeviceNetworkEvents\",\n \"RetailIOCWatchlist\",\n )\n\n def test_playbook_trigger_field_exposed(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"suspend_terminal\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_uses_ingestion_time_not_static_timestamp(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"ingestion_time()\")\n assert_not_contains(content, \"datetime(2\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content), (\n \"Rule contains a hardcoded GUID — use a watchlist or parameter instead\"\n )\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n required_fields = [\n \"AlertTitle\",\n \"DeviceName\",\n \"DeviceId\",\n \"AlertSeverity\",\n \"RiskScore\",\n \"SignalCount\",\n \"Signals\",\n \"MitreTechnique\",\n \"MitreTactic\",\n \"PlaybookTrigger\",\n \"FirstSeen\",\n \"LastSeen\",\n ]\n for field in required_fields:\n assert_contains(content, field)\n\n\n# ── ai_voice_fraud.kql ─────────────────────────────────────────────────────────────────────\n\nclass TestAiVoiceFraud:\n RULE = \"retail/ai_voice_fraud.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1598\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Reconnaissance\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"High\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"30 minutes\")\n\n def test_ai_confidence_threshold_defined(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"AIConfidenceThreshold\")\n\n def test_fraud_keywords_defined(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"FraudKeywords\",\n '\"urgent\"',\n '\"override\"',\n '\"bypass\"',\n '\"transfer\"',\n )\n\n def test_business_hours_defined(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"BusinessHoursStart\", \"BusinessHoursEnd\")\n\n def test_high_confidence_voice_fraud_signal(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"HighConfidenceVoiceFraud\",\n \"RetailShield_Logs_CL\",\n \"AI_Voice_Fraud\",\n )\n\n def test_urgent_financial_request_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"UrgentFinancialRequest\")\n\n def test_spoofed_caller_id_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"SpoofedCallerID\")\n\n def test_after_hours_voice_fraud_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"AfterHoursVoiceFraud\", \"hourofday\")\n\n def test_playbook_trigger_field_exposed(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"notify_soc\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_uses_ingestion_time_not_static_timestamp(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"ingestion_time()\")\n assert_not_contains(content, \"datetime(2\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content), (\n \"Rule contains a hardcoded GUID — use a watchlist or parameter instead\"\n )\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n required_fields = [\n \"AlertTitle\",\n \"DeviceName\",\n \"TargetEmployee\",\n \"ImpersonatingEntity\",\n \"RequestMade\",\n \"AlertSeverity\",\n \"RiskScore\",\n \"SignalCount\",\n \"Signals\",\n \"MitreTechnique\",\n \"MitreTactic\",\n \"PlaybookTrigger\",\n \"FirstSeen\",\n \"LastSeen\",\n ]\n for field in required_fields:\n assert_contains(content, field)\n\n\n# ── supply_chain_anomaly.kql ──────────────────────────────────────────────────────────────────\n\nclass TestSupplyChainAnomaly:\n RULE = \"retail/supply_chain_anomaly.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1195\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Initial Access\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"High\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"30 minutes\")\n\n def test_admin_endpoints_defined(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"AdminEndpoints\",\n '\"/admin\"',\n '\"/management\"',\n '\"/config\"',\n )\n\n def test_agreeable_endpoints_defined(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"AgreeableEndpoints\",\n '\"/inventory\"',\n '\"/orders\"',\n )\n\n def test_bulk_export_threshold_defined(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"BulkExportThreshold\")\n\n def test_supplier_admin_access_signal(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"SupplierAdminAccess\",\n \"AzureDiagnostics\",\n \"supplier_api_key\",\n )\n\n def test_new_service_principal_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"NewServicePrincipal\", \"AuditLogs\")\n\n def test_unauthorised_endpoint_access_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"UnauthorisedEndpointAccess\")\n\n def test_mass_data_export_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MassDataExport\", \"BulkExportThreshold\")\n\n def test_playbook_trigger_field_exposed(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"notify_soc\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_uses_ingestion_time_not_static_timestamp(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"ingestion_time()\")\n assert_not_contains(content, \"datetime(2\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content), (\n \"Rule contains a hardcoded GUID — use a watchlist or parameter instead\"\n )\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n required_fields = [\n \"AlertTitle\",\n \"DeviceName\",\n \"SupplierKey\",\n \"AlertSeverity\",\n \"RiskScore\",\n \"SignalCount\",\n \"Signals\",\n \"MitreTechnique\",\n \"MitreTactic\",\n \"PlaybookTrigger\",\n \"FirstSeen\",\n \"LastSeen\",\n ]\n for field in required_fields:\n assert_contains(content, field)\n\n\n# ── pos_void_refund.kql ──────────────────────────────────────────────────────────────────\n\nclass TestPosVoidRefund:\n RULE = \"retail/pos_void_refund.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1056.001\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Collection\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"High\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"5 minutes\")\n\n def test_source_table(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RetailShield_POS_CL\")\n\n def test_business_hours_params(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"BusinessHourStart\", \"BusinessHourEnd\")\n\n def test_threshold_params(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"VoidRefundThresh\", \"HighValueThresh\")\n\n def test_after_hours_void_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"AfterHoursVoidRefund\")\n\n def test_high_volume_void_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"HighVolumeVoidRefund\")\n\n def test_high_value_void_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"HighValueVoidNoOverride\")\n\n def test_tender_mismatch_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"TenderMismatchRefund\", \"CASH\", \"GIFT_CARD\")\n\n def test_playbook_trigger(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"notify_soc\")\n\n def test_risk_score(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskScore\", \"80\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content)\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n for field in [\n \"OperatorId_s\", \"StoreId_s\", \"TerminalId_s\", \"ThreatSignal\",\n \"TotalValue\", \"AlertSeverity\", \"MitreTechnique\", \"MitreTactic\",\n \"PlaybookTrigger\", \"RiskScore\",\n ]:\n assert_contains(content, field)\n\n\n# ── gift_card_fraud.kql ──────────────────────────────────────────────────────────────────\n\nclass TestGiftCardFraud:\n RULE = \"retail/gift_card_fraud.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1657\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Impact\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"High\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"5 minutes\")\n\n def test_source_table(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RetailShield_POS_CL\")\n\n def test_activation_threshold(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"GiftCardActivThresh\")\n\n def test_structuring_params(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"StructuringThreshold\", \"StructuringBand\")\n\n def test_multi_terminal_threshold(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MultiTerminalThresh\")\n\n def test_high_velocity_activation_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"HighVelocityGiftCardActivation\")\n\n def test_structured_purchase_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"StructuredGiftCardPurchase\")\n\n def test_drain_and_reload_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"GiftCardDrainAndReload\")\n\n def test_multi_terminal_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"GiftCardMultiTerminalUse\")\n\n def test_playbook_trigger(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"notify_soc\")\n\n def test_risk_score(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskScore\", \"85\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content)\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n for field in [\n \"GiftCardNumber_s\", \"OperatorId_s\", \"StoreId_s\", \"TerminalId_s\",\n \"ThreatSignal\", \"ActivationCount\", \"UniqueCards\", \"TotalValue\",\n \"TerminalCount\", \"TerminalList\", \"AlertSeverity\", \"MitreTechnique\",\n \"MitreTactic\", \"PlaybookTrigger\", \"RiskScore\",\n ]:\n assert_contains(content, field)\n\n\n# ── mfa_fatigue.kql ──────────────────────────────────────────────────────────────────\n\nclass TestMfaFatigue:\n RULE = \"retail/mfa_fatigue.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1621\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Credential Access\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"High\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"5 minutes\")\n\n def test_source_table(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"SigninLogs\")\n\n def test_mfa_prompt_threshold(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MFAPromptThresh\")\n\n def test_mfa_result_codes(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MFAResultCodes\", \"50074\", \"50076\", \"50079\")\n\n def test_prompt_flood_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MFAPromptFlood\")\n\n def test_fatigue_acceptance_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"FatigueAcceptance\")\n\n def test_distributed_mfa_flood_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"DistributedMFAFlood\")\n\n def test_risky_signin_post_flood_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskySigninPostMFAFlood\")\n\n def test_playbook_trigger(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"block_ip\")\n\n def test_risk_score(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskScore\", \"85\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content)\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n for field in [\n \"UserPrincipalName\", \"ThreatSignal\", \"PromptCount\", \"SourceIPs\",\n \"SourceIPList\", \"Locations\", \"AppNames\", \"SuccessIP\", \"SuccessCountry\",\n \"RiskLevel\", \"RiskyIP\", \"AlertSeverity\", \"MitreTechnique\",\n \"MitreTactic\", \"PlaybookTrigger\", \"RiskScore\",\n ]:\n assert_contains(content, field)\n\n\n# ── credential_stuffing.kql ──────────────────────────────────────────────────────────────────\n\nclass TestCredentialStuffing:\n RULE = \"retail/credential_stuffing.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1110.004\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Credential Access\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"High\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"5 minutes\")\n\n def test_source_table(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"SigninLogs\")\n\n def test_abuse_ipdb_watchlist(self):\n content = load_rule(self.RULE)\n assert_contains(content, '_GetWatchlist(\"AbuseIPDBWatchlist\")')\n\n def test_threshold_params(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"FailThreshPerAcct\", \"MinSourceIPs\")\n\n def test_distributed_login_failure_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"DistributedLoginFailure\")\n\n def test_account_takeover_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"AccountTakeoverAfterStuffing\")\n\n def test_blacklisted_ip_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"StuffingFromBlacklistedIP\")\n\n def test_risky_signin_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskySigninAfterStuffing\")\n\n def test_playbook_trigger(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"block_ip\")\n\n def test_risk_score(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskScore\", \"80\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content)\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n for field in [\n \"UserPrincipalName\", \"ThreatSignal\", \"FailCount\", \"SourceIPs\",\n \"SourceIPList\", \"IPAddress\", \"TargetAccts\", \"TargetList\",\n \"SuccessIP\", \"SuccessCountry\", \"RiskLevel\", \"AppNames\",\n \"AlertSeverity\", \"MitreTechnique\", \"MitreTactic\",\n \"PlaybookTrigger\", \"RiskScore\",\n ]:\n assert_contains(content, field)\n\n\n# ── after_hours_access.kql ──────────────────────────────────────────────────────────────────\n\nclass TestAfterHoursAccess:\n RULE = \"retail/after_hours_access.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1078\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Persistence\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"Medium\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"5 minutes\")\n\n def test_source_tables(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"SigninLogs\", \"AuditLogs\", \"DeviceLogonEvents\")\n\n def test_service_accounts_watchlist(self):\n content = load_rule(self.RULE)\n assert_contains(content, '_GetWatchlist(\"RetailServiceAccounts\")', \"leftanti\")\n\n def test_business_hours_params(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"BusinessHourStart\", \"BusinessHourEnd\")\n\n def test_after_hours_interactive_login_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"AfterHoursInteractiveLogin\")\n\n def test_after_hours_privileged_operation_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"AfterHoursPrivilegedOperation\")\n\n def test_after_hours_device_logon_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"AfterHoursSensitiveDeviceLogon\")\n\n def test_playbook_trigger(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"notify_soc\")\n\n def test_risk_score(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskScore\", \"60\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content)\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n for field in [\n \"Timestamp\", \"AccountName\", \"ThreatSignal\", \"LoginCount\",\n \"SourceIPs\", \"Locations\", \"AppNames\", \"Devices\",\n \"RemoteIP\", \"Operations\", \"TargetObjects\", \"AlertSeverity\",\n \"MitreTechnique\", \"MitreTactic\", \"PlaybookTrigger\", \"RiskScore\",\n ]:\n assert_contains(content, field)\n\n\n# ── data_exfiltration.kql ──────────────────────────────────────────────────────────────────\n\nclass TestDataExfiltration:\n RULE = \"retail/data_exfiltration.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1048\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Exfiltration\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"Critical\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"5 minutes\")\n\n def test_source_tables(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"DeviceNetworkEvents\", \"DeviceFileEvents\")\n\n def test_ioc_watchlist(self):\n content = load_rule(self.RULE)\n assert_contains(content, '_GetWatchlist(\"RetailIOCWatchlist\")')\n\n def test_threshold_params(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"DNSQueryThresh\", \"SubdomainLenThresh\",\n \"OutboundMBThresh\", \"StagingFileThresh\")\n\n def test_private_ip_exclusions(self):\n content = load_rule(self.RULE)\n assert_contains(content, '\"10.\"', '\"192.168.\"', '\"172.\"', '\"127.0.0.1\"')\n\n def test_dns_tunneling_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"DNSTunneling\")\n\n def test_large_outbound_transfer_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"LargeOutboundTransfer\")\n\n def test_ioc_matched_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"IOCMatchedExfilTarget\")\n\n def test_data_staging_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"DataStagingToExfil\")\n\n def test_playbook_trigger(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"data_exfil_contain\")\n\n def test_risk_score(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskScore\", \"90\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content)\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n for field in [\n \"Timestamp\", \"DeviceName\", \"DeviceId\", \"AccountName\", \"ThreatSignal\",\n \"NormalisedTarget\", \"QueryCount\", \"UniqueSubdomains\", \"TotalSentMB\",\n \"TotalSentBytes\", \"FileReadCount\", \"SampleFiles\", \"UniqueFolders\",\n \"ExfilTargets\", \"AlertSeverity\", \"MitreTechnique\", \"MitreTactic\",\n \"PlaybookTrigger\", \"RiskScore\",\n ]:\n assert_contains(content, field)\n\n\n# ── supplier_impossible_travel.kql ──────────────────────────────────────────────────────────\n\nclass TestSupplierImpossibleTravel:\n RULE = \"retail/supplier_impossible_travel.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1199\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Initial Access\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"Medium\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"15 minutes\")\n\n def test_source_table(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"SigninLogs\")\n\n def test_supplier_accounts_watchlist(self):\n content = load_rule(self.RULE)\n assert_contains(content, '_GetWatchlist(\"RetailSupplierAccounts\")')\n\n def test_impossible_speed_threshold(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"ImpossibleSpeedKmph\", \"900\")\n\n def test_high_risk_countries(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"HighRiskCountries\", '\"RU\"', '\"CN\"', '\"KP\"', '\"IR\"')\n\n def test_geo_distance_function(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"geo_distance_2points\")\n\n def test_impossible_travel_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"ImpossibleTravel\", \"RequiredSpeedKmph\")\n\n def test_new_country_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"NewCountryForSupplier\")\n\n def test_high_risk_country_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"HighRiskCountrySignin\")\n\n def test_playbook_trigger(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"notify_soc\")\n\n def test_risk_score(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskScore\", \"70\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content)\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n for field in [\n \"Timestamp\", \"UserPrincipalName\", \"SupplierName\", \"ThreatSignal\",\n \"IPAddress\", \"CountryCode\", \"City\", \"SecondCountry\", \"SecondCity\",\n \"RequiredSpeedKmph\", \"DistanceKm\", \"AlertSeverity\", \"MitreTechnique\",\n \"MitreTactic\", \"PlaybookTrigger\", \"RiskScore\",\n ]:\n assert_contains(content, field)\n\n\n# ── privileged_role_addition.kql ──────────────────────────────────────────────────────────\n\nclass TestPrivilegedRoleAddition:\n RULE = \"retail/privileged_role_addition.kql\"\n\n def test_file_exists(self):\n assert (RULES_DIR / self.RULE).exists()\n\n def test_mitre_technique_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"MITRE ATT&CK\", \"T1098\")\n\n def test_mitre_tactic_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Tactic\", \"Persistence\")\n\n def test_severity_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Severity\", \"High\")\n\n def test_frequency_metadata(self):\n content = load_rule(self.RULE)\n assert_metadata(content, \"Frequency\", \"5 minutes\")\n\n def test_source_table(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"AuditLogs\")\n\n def test_sensitive_roles_defined(self):\n content = load_rule(self.RULE)\n assert_contains(\n content,\n \"SensitiveRoles\",\n \"Global Administrator\",\n \"Privileged Role Administrator\",\n \"Security Administrator\",\n )\n\n def test_sensitive_role_assigned_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"SensitiveRoleAssigned\")\n\n def test_after_hours_role_addition_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"AfterHoursRoleAddition\", \"HourOfDay\")\n\n def test_mfa_change_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RoleAdditionFollowedByMFAChange\")\n\n def test_sensitive_group_member_signal(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"SensitiveGroupMemberAdded\")\n\n def test_playbook_trigger(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"PlaybookTrigger\", \"notify_soc\")\n\n def test_risk_score(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"RiskScore\", \"85\")\n\n def test_mitre_fields_in_output(self):\n content = load_rule(self.RULE)\n assert_contains(content, \"MitreTechnique\", \"MitreTactic\")\n\n def test_no_hardcoded_tenant_ids(self):\n content = load_rule(self.RULE)\n tenant_pattern = re.compile(\n r\"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\",\n re.IGNORECASE,\n )\n assert not tenant_pattern.search(content)\n\n def test_output_contains_required_fields(self):\n content = load_rule(self.RULE)\n for field in [\n \"Timestamp\", \"InitiatedByUser\", \"TargetUser\", \"RoleDisplayName\",\n \"ThreatSignal\", \"HourOfDay\", \"MFAOperation\", \"AlertSeverity\",\n \"MitreTechnique\", \"MitreTactic\", \"PlaybookTrigger\", \"RiskScore\",\n ]:\n assert_contains(content, field)\n \ No newline at end of file +""" +RetailShield — KQL Rule Validation Test Suite +Validates syntax structure, required metadata, and logic correctness +for all detection-rules/*.kql files without a live Sentinel connection. +""" + +import re +from pathlib import Path + +RULES_DIR = Path(__file__).parent.parent.parent / "detection-rules" + + +def load_rule(filename: str) -> str: + path = RULES_DIR / filename + assert path.exists(), f"Rule file not found: {path}" + return path.read_text(encoding="utf-8") + + +# ── Helpers ───────────────────────────────────────────────────────────────────────────── + +def assert_metadata(content: str, key: str, value: str): + pattern = rf"//\s*{re.escape(key)}\s*:.*{re.escape(value)}" + assert re.search(pattern, content, re.IGNORECASE), ( + f"Missing or incorrect metadata '{key}: {value}'" + ) + + +def assert_contains(content: str, *fragments: str): + for fragment in fragments: + assert fragment in content, f"Expected '{fragment}' not found in rule" + + +def assert_not_contains(content: str, *fragments: str): + for fragment in fragments: + assert fragment not in content, f"Unexpected fragment '{fragment}' found in rule" + + +# ── phishing_detection.kql ───────────────────────────────────────────────────────────────────── + +class TestPhishingDetection: + RULE = "retail/phishing_detection.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1566.001") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Initial Access") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "High") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "5 minutes") + + def test_queries_email_attachment_table(self): + content = load_rule(self.RULE) + assert_contains(content, "EmailAttachmentInfo") + + def test_queries_email_events_table(self): + content = load_rule(self.RULE) + assert_contains(content, "EmailEvents") + + def test_filters_inbound_delivered_emails(self): + content = load_rule(self.RULE) + assert_contains(content, '"Delivered"', '"Inbound"') + + def test_suspicious_extensions_defined(self): + content = load_rule(self.RULE) + assert_contains( + content, + "SuspiciousExtensions", + '".exe"', + '".ps1"', + '".vbs"', + '".docm"', + ) + + def test_suppresses_approved_senders(self): + content = load_rule(self.RULE) + assert_contains(content, "ApprovedSenderDomains", "leftanti") + + def test_lure_classification_present(self): + content = load_rule(self.RULE) + assert_contains( + content, + "LureCategory", + "Financial", + "Credential", + ) + + def test_playbook_trigger_field_exposed(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "quarantine_email") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content), ( + "Rule contains a hardcoded GUID — use a watchlist or parameter instead" + ) + + def test_uses_ingestion_time_not_static_timestamp(self): + content = load_rule(self.RULE) + assert_contains(content, "ingestion_time()") + assert_not_contains(content, "datetime(2") + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + required_fields = [ + "SenderAddress", + "RecipientEmailAddress", + "Subject", + "AttachmentFileName", + "AttachmentSHA256", + "NetworkMessageId", + "AlertSeverity", + ] + for field in required_fields: + assert_contains(content, field) + + +# ── ransomware_indicator.kql ────────────────────────────────────────────────────────────────── + +class TestRansomwareIndicator: + RULE = "retail/ransomware_indicator.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1486") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Impact") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "Critical") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "5 minutes") + + def test_mass_file_rename_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "DeviceFileEvents", "FileRenamed", "MassRenameThresh") + + def test_shadow_copy_deletion_signal(self): + content = load_rule(self.RULE) + assert_contains( + content, + "vssadmin delete shadows", + "wmic shadowcopy delete", + "bcdedit /set recoveryenabled no", + "ShadowCopyDeletion", + ) + + def test_known_ransomware_process_names(self): + content = load_rule(self.RULE) + assert_contains( + content, + "lockbit", + "blackcat", + "conti", + "ryuk", + "KnownRansomwareProcess", + ) + + def test_c2_beacon_signal(self): + content = load_rule(self.RULE) + assert_contains( + content, + "DeviceNetworkEvents", + "RetailIOCWatchlist", + "BeaconThresh", + "C2BeaconToRansomwareIP", + ) + + def test_ioc_watchlist_used(self): + content = load_rule(self.RULE) + assert_contains(content, '_GetWatchlist("RetailIOCWatchlist")') + + def test_private_ip_exclusions(self): + content = load_rule(self.RULE) + assert_contains(content, '"10."', '"192.168."', '"127.0.0.1"') + + def test_union_combines_all_signals(self): + content = load_rule(self.RULE) + assert_contains( + content, + "MassFileRename", + "ShadowCopyDeletion", + "RansomwareProcesses", + "C2Beaconing", + ) + + def test_playbook_trigger_field_exposed(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "isolate_endpoint") + + def test_risk_score_is_100(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskScore", "100") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content), ( + "Rule contains a hardcoded GUID — use a watchlist or parameter instead" + ) + + def test_uses_ingestion_time_not_static_timestamp(self): + content = load_rule(self.RULE) + assert_contains(content, "ingestion_time()") + assert_not_contains(content, "datetime(2") + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + required_fields = [ + "DeviceName", + "DeviceId", + "ThreatSignal", + "ProcessCommandLine", + "RenameCount", + "SampleFiles", + "AffectedFolders", + "RemoteIP", + "BeaconCount", + "AlertSeverity", + "MitreTechnique", + "MitreTactic", + "PlaybookTrigger", + "RiskScore", + ] + for field in required_fields: + assert_contains(content, field) + + +# ── pos_anomaly.kql ───────────────────────────────────────────────────────────────────────────── + +class TestPosAnomaly: + RULE = "retail/pos_anomaly.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1056.001") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Collection") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "High") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "15 minutes") + + def test_pos_process_names_defined(self): + content = load_rule(self.RULE) + assert_contains(content, "POSProcessNames", "xstore.exe", "aloha.exe", "posready.exe") + + def test_retail_terminal_prefix_defined(self): + content = load_rule(self.RULE) + assert_contains(content, "RetailTerminalPrefix", '"pos-"', '"till-"', '"kiosk-"') + + def test_unknown_dll_injection_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "UnknownDLLInjection", "DeviceEvents", "ImageLoaded") + + def test_abnormal_transaction_volume_signal(self): + content = load_rule(self.RULE) + assert_contains( + content, + "AbnormalTransactionVolume", + "RetailShield_Logs_CL", + "TransactionVolumeThreshold", + ) + + def test_process_memory_dump_signal(self): + content = load_rule(self.RULE) + assert_contains( + content, + "ProcessMemoryDump", + "ProcessDumped", + "ProcDump", + ) + + def test_suspicious_network_signal(self): + content = load_rule(self.RULE) + assert_contains( + content, + "SuspiciousNetworkFromPOS", + "DeviceNetworkEvents", + "RetailIOCWatchlist", + ) + + def test_playbook_trigger_field_exposed(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "suspend_terminal") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_uses_ingestion_time_not_static_timestamp(self): + content = load_rule(self.RULE) + assert_contains(content, "ingestion_time()") + assert_not_contains(content, "datetime(2") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content), ( + "Rule contains a hardcoded GUID — use a watchlist or parameter instead" + ) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + required_fields = [ + "AlertTitle", + "DeviceName", + "DeviceId", + "AlertSeverity", + "RiskScore", + "SignalCount", + "Signals", + "MitreTechnique", + "MitreTactic", + "PlaybookTrigger", + "FirstSeen", + "LastSeen", + ] + for field in required_fields: + assert_contains(content, field) + + +# ── ai_voice_fraud.kql ───────────────────────────────────────────────────────────────────── + +class TestAiVoiceFraud: + RULE = "retail/ai_voice_fraud.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1598") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Reconnaissance") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "High") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "30 minutes") + + def test_ai_confidence_threshold_defined(self): + content = load_rule(self.RULE) + assert_contains(content, "AIConfidenceThreshold") + + def test_fraud_keywords_defined(self): + content = load_rule(self.RULE) + assert_contains( + content, + "FraudKeywords", + '"urgent"', + '"override"', + '"bypass"', + '"transfer"', + ) + + def test_business_hours_defined(self): + content = load_rule(self.RULE) + assert_contains(content, "BusinessHoursStart", "BusinessHoursEnd") + + def test_high_confidence_voice_fraud_signal(self): + content = load_rule(self.RULE) + assert_contains( + content, + "HighConfidenceVoiceFraud", + "RetailShield_Logs_CL", + "AI_Voice_Fraud", + ) + + def test_urgent_financial_request_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "UrgentFinancialRequest") + + def test_spoofed_caller_id_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "SpoofedCallerID") + + def test_after_hours_voice_fraud_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "AfterHoursVoiceFraud", "hourofday") + + def test_playbook_trigger_field_exposed(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "notify_soc") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_uses_ingestion_time_not_static_timestamp(self): + content = load_rule(self.RULE) + assert_contains(content, "ingestion_time()") + assert_not_contains(content, "datetime(2") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content), ( + "Rule contains a hardcoded GUID — use a watchlist or parameter instead" + ) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + required_fields = [ + "AlertTitle", + "DeviceName", + "TargetEmployee", + "ImpersonatingEntity", + "RequestMade", + "AlertSeverity", + "RiskScore", + "SignalCount", + "Signals", + "MitreTechnique", + "MitreTactic", + "PlaybookTrigger", + "FirstSeen", + "LastSeen", + ] + for field in required_fields: + assert_contains(content, field) + + +# ── supply_chain_anomaly.kql ────────────────────────────────────────────────────────────────── + +class TestSupplyChainAnomaly: + RULE = "retail/supply_chain_anomaly.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1195") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Initial Access") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "High") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "30 minutes") + + def test_admin_endpoints_defined(self): + content = load_rule(self.RULE) + assert_contains( + content, + "AdminEndpoints", + '"/admin"', + '"/management"', + '"/config"', + ) + + def test_agreeable_endpoints_defined(self): + content = load_rule(self.RULE) + assert_contains( + content, + "AgreeableEndpoints", + '"/inventory"', + '"/orders"', + ) + + def test_bulk_export_threshold_defined(self): + content = load_rule(self.RULE) + assert_contains(content, "BulkExportThreshold") + + def test_supplier_admin_access_signal(self): + content = load_rule(self.RULE) + assert_contains( + content, + "SupplierAdminAccess", + "AzureDiagnostics", + "supplier_api_key", + ) + + def test_new_service_principal_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "NewServicePrincipal", "AuditLogs") + + def test_unauthorised_endpoint_access_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "UnauthorisedEndpointAccess") + + def test_mass_data_export_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "MassDataExport", "BulkExportThreshold") + + def test_playbook_trigger_field_exposed(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "notify_soc") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_uses_ingestion_time_not_static_timestamp(self): + content = load_rule(self.RULE) + assert_contains(content, "ingestion_time()") + assert_not_contains(content, "datetime(2") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content), ( + "Rule contains a hardcoded GUID — use a watchlist or parameter instead" + ) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + required_fields = [ + "AlertTitle", + "DeviceName", + "SupplierKey", + "AlertSeverity", + "RiskScore", + "SignalCount", + "Signals", + "MitreTechnique", + "MitreTactic", + "PlaybookTrigger", + "FirstSeen", + "LastSeen", + ] + for field in required_fields: + assert_contains(content, field) + + +# ── pos_void_refund.kql ────────────────────────────────────────────────────────────────── + +class TestPosVoidRefund: + RULE = "retail/pos_void_refund.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1056.001") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Collection") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "High") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "5 minutes") + + def test_source_table(self): + content = load_rule(self.RULE) + assert_contains(content, "RetailShield_POS_CL") + + def test_business_hours_params(self): + content = load_rule(self.RULE) + assert_contains(content, "BusinessHourStart", "BusinessHourEnd") + + def test_threshold_params(self): + content = load_rule(self.RULE) + assert_contains(content, "VoidRefundThresh", "HighValueThresh") + + def test_after_hours_void_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "AfterHoursVoidRefund") + + def test_high_volume_void_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "HighVolumeVoidRefund") + + def test_high_value_void_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "HighValueVoidNoOverride") + + def test_tender_mismatch_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "TenderMismatchRefund", "CASH", "GIFT_CARD") + + def test_playbook_trigger(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "notify_soc") + + def test_risk_score(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskScore", "80") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + for field in [ + "OperatorId_s", "StoreId_s", "TerminalId_s", "ThreatSignal", + "TotalValue", "AlertSeverity", "MitreTechnique", "MitreTactic", + "PlaybookTrigger", "RiskScore", + ]: + assert_contains(content, field) + + +# ── gift_card_fraud.kql ────────────────────────────────────────────────────────────────── + +class TestGiftCardFraud: + RULE = "retail/gift_card_fraud.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1657") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Impact") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "High") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "5 minutes") + + def test_source_table(self): + content = load_rule(self.RULE) + assert_contains(content, "RetailShield_POS_CL") + + def test_activation_threshold(self): + content = load_rule(self.RULE) + assert_contains(content, "GiftCardActivThresh") + + def test_structuring_params(self): + content = load_rule(self.RULE) + assert_contains(content, "StructuringThreshold", "StructuringBand") + + def test_multi_terminal_threshold(self): + content = load_rule(self.RULE) + assert_contains(content, "MultiTerminalThresh") + + def test_high_velocity_activation_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "HighVelocityGiftCardActivation") + + def test_structured_purchase_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "StructuredGiftCardPurchase") + + def test_drain_and_reload_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "GiftCardDrainAndReload") + + def test_multi_terminal_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "GiftCardMultiTerminalUse") + + def test_playbook_trigger(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "notify_soc") + + def test_risk_score(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskScore", "85") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + for field in [ + "GiftCardNumber_s", "OperatorId_s", "StoreId_s", "TerminalId_s", + "ThreatSignal", "ActivationCount", "UniqueCards", "TotalValue", + "TerminalCount", "TerminalList", "AlertSeverity", "MitreTechnique", + "MitreTactic", "PlaybookTrigger", "RiskScore", + ]: + assert_contains(content, field) + + +# ── mfa_fatigue.kql ────────────────────────────────────────────────────────────────── + +class TestMfaFatigue: + RULE = "retail/mfa_fatigue.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1621") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Credential Access") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "High") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "5 minutes") + + def test_source_table(self): + content = load_rule(self.RULE) + assert_contains(content, "SigninLogs") + + def test_mfa_prompt_threshold(self): + content = load_rule(self.RULE) + assert_contains(content, "MFAPromptThresh") + + def test_mfa_result_codes(self): + content = load_rule(self.RULE) + assert_contains(content, "MFAResultCodes", "50074", "50076", "50079") + + def test_prompt_flood_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "MFAPromptFlood") + + def test_fatigue_acceptance_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "FatigueAcceptance") + + def test_distributed_mfa_flood_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "DistributedMFAFlood") + + def test_risky_signin_post_flood_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskySigninPostMFAFlood") + + def test_playbook_trigger(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "block_ip") + + def test_risk_score(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskScore", "85") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + for field in [ + "UserPrincipalName", "ThreatSignal", "PromptCount", "SourceIPs", + "SourceIPList", "Locations", "AppNames", "SuccessIP", "SuccessCountry", + "RiskLevel", "RiskyIP", "AlertSeverity", "MitreTechnique", + "MitreTactic", "PlaybookTrigger", "RiskScore", + ]: + assert_contains(content, field) + + +# ── credential_stuffing.kql ────────────────────────────────────────────────────────────────── + +class TestCredentialStuffing: + RULE = "retail/credential_stuffing.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1110.004") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Credential Access") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "High") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "5 minutes") + + def test_source_table(self): + content = load_rule(self.RULE) + assert_contains(content, "SigninLogs") + + def test_abuse_ipdb_watchlist(self): + content = load_rule(self.RULE) + assert_contains(content, '_GetWatchlist("AbuseIPDBWatchlist")') + + def test_threshold_params(self): + content = load_rule(self.RULE) + assert_contains(content, "FailThreshPerAcct", "MinSourceIPs") + + def test_distributed_login_failure_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "DistributedLoginFailure") + + def test_account_takeover_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "AccountTakeoverAfterStuffing") + + def test_blacklisted_ip_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "StuffingFromBlacklistedIP") + + def test_risky_signin_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskySigninAfterStuffing") + + def test_playbook_trigger(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "block_ip") + + def test_risk_score(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskScore", "80") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + for field in [ + "UserPrincipalName", "ThreatSignal", "FailCount", "SourceIPs", + "SourceIPList", "IPAddress", "TargetAccts", "TargetList", + "SuccessIP", "SuccessCountry", "RiskLevel", "AppNames", + "AlertSeverity", "MitreTechnique", "MitreTactic", + "PlaybookTrigger", "RiskScore", + ]: + assert_contains(content, field) + + +# ── after_hours_access.kql ────────────────────────────────────────────────────────────────── + +class TestAfterHoursAccess: + RULE = "retail/after_hours_access.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1078") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Persistence") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "Medium") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "5 minutes") + + def test_source_tables(self): + content = load_rule(self.RULE) + assert_contains(content, "SigninLogs", "AuditLogs", "DeviceLogonEvents") + + def test_service_accounts_watchlist(self): + content = load_rule(self.RULE) + assert_contains(content, '_GetWatchlist("RetailServiceAccounts")', "leftanti") + + def test_business_hours_params(self): + content = load_rule(self.RULE) + assert_contains(content, "BusinessHourStart", "BusinessHourEnd") + + def test_after_hours_interactive_login_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "AfterHoursInteractiveLogin") + + def test_after_hours_privileged_operation_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "AfterHoursPrivilegedOperation") + + def test_after_hours_device_logon_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "AfterHoursSensitiveDeviceLogon") + + def test_playbook_trigger(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "notify_soc") + + def test_risk_score(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskScore", "60") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + for field in [ + "Timestamp", "AccountName", "ThreatSignal", "LoginCount", + "SourceIPs", "Locations", "AppNames", "Devices", + "RemoteIP", "Operations", "TargetObjects", "AlertSeverity", + "MitreTechnique", "MitreTactic", "PlaybookTrigger", "RiskScore", + ]: + assert_contains(content, field) + + +# ── data_exfiltration.kql ────────────────────────────────────────────────────────────────── + +class TestDataExfiltration: + RULE = "retail/data_exfiltration.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1048") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Exfiltration") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "Critical") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "5 minutes") + + def test_source_tables(self): + content = load_rule(self.RULE) + assert_contains(content, "DeviceNetworkEvents", "DeviceFileEvents") + + def test_ioc_watchlist(self): + content = load_rule(self.RULE) + assert_contains(content, '_GetWatchlist("RetailIOCWatchlist")') + + def test_threshold_params(self): + content = load_rule(self.RULE) + assert_contains( + content, "DNSQueryThresh", "SubdomainLenThresh", + "OutboundMBThresh", "StagingFileThresh" + ) + + def test_private_ip_exclusions(self): + content = load_rule(self.RULE) + assert_contains(content, '"10."', '"192.168."', '"172."', '"127.0.0.1"') + + def test_dns_tunneling_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "DNSTunneling") + + def test_large_outbound_transfer_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "LargeOutboundTransfer") + + def test_ioc_matched_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "IOCMatchedExfilTarget") + + def test_data_staging_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "DataStagingToExfil") + + def test_playbook_trigger(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "data_exfil_contain") + + def test_risk_score(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskScore", "90") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + for field in [ + "Timestamp", "DeviceName", "DeviceId", "AccountName", "ThreatSignal", + "NormalisedTarget", "QueryCount", "UniqueSubdomains", "TotalSentMB", + "TotalSentBytes", "FileReadCount", "SampleFiles", "UniqueFolders", + "ExfilTargets", "AlertSeverity", "MitreTechnique", "MitreTactic", + "PlaybookTrigger", "RiskScore", + ]: + assert_contains(content, field) + + +# ── supplier_impossible_travel.kql ────────────────────────────────────────────────────────── + +class TestSupplierImpossibleTravel: + RULE = "retail/supplier_impossible_travel.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1199") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Initial Access") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "Medium") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "15 minutes") + + def test_source_table(self): + content = load_rule(self.RULE) + assert_contains(content, "SigninLogs") + + def test_supplier_accounts_watchlist(self): + content = load_rule(self.RULE) + assert_contains(content, '_GetWatchlist("RetailSupplierAccounts")') + + def test_impossible_speed_threshold(self): + content = load_rule(self.RULE) + assert_contains(content, "ImpossibleSpeedKmph", "900") + + def test_high_risk_countries(self): + content = load_rule(self.RULE) + assert_contains(content, "HighRiskCountries", '"RU"', '"CN"', '"KP"', '"IR"') + + def test_geo_distance_function(self): + content = load_rule(self.RULE) + assert_contains(content, "geo_distance_2points") + + def test_impossible_travel_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "ImpossibleTravel", "RequiredSpeedKmph") + + def test_new_country_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "NewCountryForSupplier") + + def test_high_risk_country_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "HighRiskCountrySignin") + + def test_playbook_trigger(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "notify_soc") + + def test_risk_score(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskScore", "70") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + for field in [ + "Timestamp", "UserPrincipalName", "SupplierName", "ThreatSignal", + "IPAddress", "CountryCode", "City", "SecondCountry", "SecondCity", + "RequiredSpeedKmph", "DistanceKm", "AlertSeverity", "MitreTechnique", + "MitreTactic", "PlaybookTrigger", "RiskScore", + ]: + assert_contains(content, field) + + +# ── privileged_role_addition.kql ────────────────────────────────────────────────────────── + +class TestPrivilegedRoleAddition: + RULE = "retail/privileged_role_addition.kql" + + def test_file_exists(self): + assert (RULES_DIR / self.RULE).exists() + + def test_mitre_technique_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "MITRE ATT&CK", "T1098") + + def test_mitre_tactic_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Tactic", "Persistence") + + def test_severity_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Severity", "High") + + def test_frequency_metadata(self): + content = load_rule(self.RULE) + assert_metadata(content, "Frequency", "5 minutes") + + def test_source_table(self): + content = load_rule(self.RULE) + assert_contains(content, "AuditLogs") + + def test_sensitive_roles_defined(self): + content = load_rule(self.RULE) + assert_contains( + content, + "SensitiveRoles", + "Global Administrator", + "Privileged Role Administrator", + "Security Administrator", + ) + + def test_sensitive_role_assigned_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "SensitiveRoleAssigned") + + def test_after_hours_role_addition_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "AfterHoursRoleAddition", "HourOfDay") + + def test_mfa_change_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "RoleAdditionFollowedByMFAChange") + + def test_sensitive_group_member_signal(self): + content = load_rule(self.RULE) + assert_contains(content, "SensitiveGroupMemberAdded") + + def test_playbook_trigger(self): + content = load_rule(self.RULE) + assert_contains(content, "PlaybookTrigger", "notify_soc") + + def test_risk_score(self): + content = load_rule(self.RULE) + assert_contains(content, "RiskScore", "85") + + def test_mitre_fields_in_output(self): + content = load_rule(self.RULE) + assert_contains(content, "MitreTechnique", "MitreTactic") + + def test_no_hardcoded_tenant_ids(self): + content = load_rule(self.RULE) + tenant_pattern = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + re.IGNORECASE, + ) + assert not tenant_pattern.search(content) + + def test_output_contains_required_fields(self): + content = load_rule(self.RULE) + for field in [ + "Timestamp", "InitiatedByUser", "TargetUser", "RoleDisplayName", + "ThreatSignal", "HourOfDay", "MFAOperation", "AlertSeverity", + "MitreTechnique", "MitreTactic", "PlaybookTrigger", "RiskScore", + ]: + assert_contains(content, field)