diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7fa5aad..46f174f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -328,17 +328,27 @@ jobs: print(f"All compliance controls map to existing rule files. ({len(existing_ids)} rules checked)") PYEOF + # ── CHECK 8: Rule regression tests (MockAzureClient, no Azure creds) ── + - name: Rule regression tests + id: rule_tests + env: + DATABASE_URL: "postgresql://ci:ci@localhost/ci_db" + run: | + echo "=== Running rule regression tests ===" + pytest tests/test_rules_*.py -v --tb=short + # ── Final summary — always runs, shows per-check pass/fail ──────── - name: CI Summary if: always() env: - SYNTAX: ${{ steps.syntax_check.outcome }} - STRUCTURE: ${{ steps.structure_check.outcome }} - CREDS: ${{ steps.cred_scan.outcome }} - PLAYBOOK: ${{ steps.playbook_check.outcome }} - JSON: ${{ steps.json_check.outcome }} - API: ${{ steps.api_check.outcome }} - XREF: ${{ steps.xref_check.outcome }} + SYNTAX: ${{ steps.syntax_check.outcome }} + STRUCTURE: ${{ steps.structure_check.outcome }} + CREDS: ${{ steps.cred_scan.outcome }} + PLAYBOOK: ${{ steps.playbook_check.outcome }} + JSON: ${{ steps.json_check.outcome }} + API: ${{ steps.api_check.outcome }} + XREF: ${{ steps.xref_check.outcome }} + RULE_TESTS: ${{ steps.rule_tests.outcome }} run: | python - <<'PYEOF' import os @@ -351,6 +361,7 @@ jobs: ("Compliance JSON validation", os.environ["JSON"]), ("API syntax check", os.environ["API"]), ("Compliance vs rule cross-reference", os.environ["XREF"]), + ("Rule regression tests", os.environ["RULE_TESTS"]), ] labels = { diff --git a/requirements.txt b/requirements.txt index fb9aabd..77c5511 100644 --- a/requirements.txt +++ b/requirements.txt @@ -26,3 +26,5 @@ azure-keyvault-keys==4.9.0 chromadb==0.4.24 sentence-transformers==2.7.0 numpy<2.0 +pytest>=7.4.0 +pytest-cov>=4.1.0 diff --git a/tests/conftest.py b/tests/conftest.py index 6f3accd..4b645b0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -5,16 +5,16 @@ import secrets import time -import jwt import pytest -from api.app import create_app +from tests.helpers.mock_azure import MockAzureClient _TEST_JWT_SECRET = secrets.token_urlsafe(32) @pytest.fixture def app(): + from api.app import create_app application = create_app() application.config["TESTING"] = True application.config["JWT_SECRET"] = _TEST_JWT_SECRET @@ -28,6 +28,7 @@ def client(app): @pytest.fixture def auth_headers(): + import jwt payload = { "sub": "test-user", "role": "admin", @@ -36,3 +37,15 @@ def auth_headers(): } token = jwt.encode(payload, _TEST_JWT_SECRET, algorithm="HS256") return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} + + +@pytest.fixture +def mock_azure() -> MockAzureClient: + """Return a clean MockAzureClient with no resources configured.""" + return MockAzureClient() + + +@pytest.fixture +def subscription_id() -> str: + """Return a fake Azure subscription ID for use in scan() calls.""" + return "00000000-0000-0000-0000-000000000001" diff --git a/tests/helpers/__init__.py b/tests/helpers/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/tests/helpers/__init__.py @@ -0,0 +1 @@ + diff --git a/tests/helpers/mock_azure.py b/tests/helpers/mock_azure.py new file mode 100644 index 0000000..3c3c134 --- /dev/null +++ b/tests/helpers/mock_azure.py @@ -0,0 +1,98 @@ +"""Shared MockAzureClient for rule regression tests. + +Provides a configurable in-memory replacement for the real AzureClient so that +scanner rule unit tests can run fully offline with no Azure credentials. + +Usage: + from tests.helpers.mock_azure import MockAzureClient, make_resource + + client = MockAzureClient() + client.set_storage_accounts([ + make_resource(id="/sub/rg/sa", name="mystorage", allow_blob_public_access=True) + ]) + findings = az_stor_001.scan(client, "sub-id") +""" + +from types import SimpleNamespace +from typing import Any, Dict, List, Tuple + + +def make_resource(**kwargs: Any) -> SimpleNamespace: + """Build a fake Azure resource object with arbitrary attributes.""" + return SimpleNamespace(**kwargs) + + +class MockAzureClient: + """Drop-in replacement for AzureClient that returns configured fake data.""" + + def __init__(self) -> None: + self._storage_accounts: List[Any] = [] + self._network_security_groups: List[Any] = [] + self._virtual_machines: List[Any] = [] + self._key_vaults: List[Any] = [] + self._sql_servers: List[Any] = [] + self._service_principals: List[Any] = [] + self._sql_firewall_rules: Dict[Tuple[str, str], List[Any]] = {} + + def set_storage_accounts(self, accounts: List[Any]) -> "MockAzureClient": + self._storage_accounts = accounts + return self + + def set_network_security_groups(self, nsgs: List[Any]) -> "MockAzureClient": + self._network_security_groups = nsgs + return self + + def set_virtual_machines(self, vms: List[Any]) -> "MockAzureClient": + self._virtual_machines = vms + return self + + def set_key_vaults(self, vaults: List[Any]) -> "MockAzureClient": + self._key_vaults = vaults + return self + + def set_sql_servers(self, servers: List[Any]) -> "MockAzureClient": + self._sql_servers = servers + return self + + def set_service_principals(self, principals: List[Any]) -> "MockAzureClient": + self._service_principals = principals + return self + + def set_sql_server_firewall_rules( + self, resource_group: str, server_name: str, rules: List[Any] + ) -> "MockAzureClient": + self._sql_firewall_rules[(resource_group, server_name)] = rules + return self + + def get_storage_accounts(self) -> List[Any]: + return self._storage_accounts + + def get_network_security_groups(self) -> List[Any]: + return self._network_security_groups + + def get_virtual_machines(self) -> List[Any]: + return self._virtual_machines + + def get_key_vaults(self) -> List[Any]: + return self._key_vaults + + def get_sql_servers(self) -> List[Any]: + return self._sql_servers + + def get_service_principals(self) -> List[Any]: + return self._service_principals + + def get_sql_server_firewall_rules( + self, resource_group: str, server_name: str + ) -> List[Any]: + return self._sql_firewall_rules.get((resource_group, server_name), []) + + @staticmethod + def parse_resource_id(resource_id: str) -> Dict[str, str]: + """Parse an Azure resource ID into a dict with name and resource_group.""" + parts = resource_id.split("/") + result: Dict[str, str] = {"name": parts[-1] if parts else ""} + for idx, segment in enumerate(parts): + if segment.lower() == "resourcegroups" and idx + 1 < len(parts): + result["resource_group"] = parts[idx + 1] + return result diff --git a/tests/test_rules_database.py b/tests/test_rules_database.py new file mode 100644 index 0000000..e6a214f --- /dev/null +++ b/tests/test_rules_database.py @@ -0,0 +1,64 @@ +"""Rule regression tests for AZ-DB-004.""" + +import scanner.rules.az_db_004 as az_db_004 +from tests.helpers.mock_azure import make_resource + +_REQUIRED_FIELDS = { + "rule_id", "rule_name", "severity", "category", + "resource_id", "resource_name", "resource_type", + "description", "remediation", "playbook", "frameworks", "metadata", +} + +_SUB = "00000000-0000-0000-0000-000000000001" +_RG = "rg-test" + + +def _sql_id(name): + return ( + f"/subscriptions/{_SUB}/resourceGroups/{_RG}" + f"/providers/Microsoft.Sql/servers/{name}" + ) + + +def _firewall_rule(name, start_ip, end_ip): + return make_resource( + name=name, + start_ip_address=start_ip, + end_ip_address=end_ip, + ) + + +def test_db_004_compliant_returns_no_findings(mock_azure, subscription_id): + """A SQL Server with no AllowAzureServices rule must produce no findings.""" + server = make_resource(id=_sql_id("sql-restricted"), name="sql-restricted") + rule = _firewall_rule("AllowSpecificIP", "203.0.113.10", "203.0.113.10") + mock_azure.set_sql_servers([server]) + mock_azure.set_sql_server_firewall_rules(_RG, "sql-restricted", [rule]) + findings = az_db_004.scan(mock_azure, subscription_id) + assert findings == [] + + +def test_db_004_noncompliant_returns_one_finding(mock_azure, subscription_id): + """A SQL Server with AllowAllWindowsAzureIps rule must produce exactly one finding.""" + server = make_resource(id=_sql_id("sql-open"), name="sql-open") + allow_azure = _firewall_rule("AllowAllWindowsAzureIps", "0.0.0.0", "0.0.0.0") + mock_azure.set_sql_servers([server]) + mock_azure.set_sql_server_firewall_rules(_RG, "sql-open", [allow_azure]) + findings = az_db_004.scan(mock_azure, subscription_id) + assert len(findings) == 1 + finding = findings[0] + assert _REQUIRED_FIELDS.issubset(finding.keys()) + assert finding["rule_id"] == "AZ-DB-004" + assert finding["severity"] == "HIGH" + assert finding["category"] == "Database" + assert finding["resource_name"] == "sql-open" + assert finding["metadata"]["resource_group"] == _RG + + +def test_db_004_no_firewall_rules_returns_no_findings(mock_azure, subscription_id): + """A SQL Server with no firewall rules must produce no findings.""" + server = make_resource(id=_sql_id("sql-no-rules"), name="sql-no-rules") + mock_azure.set_sql_servers([server]) + mock_azure.set_sql_server_firewall_rules(_RG, "sql-no-rules", []) + findings = az_db_004.scan(mock_azure, subscription_id) + assert findings == [] diff --git a/tests/test_rules_identity.py b/tests/test_rules_identity.py new file mode 100644 index 0000000..26177bf --- /dev/null +++ b/tests/test_rules_identity.py @@ -0,0 +1,53 @@ +"""Rule regression tests for AZ-IDN-001. + +Each test configures a MockAzureClient with fake role assignment objects and +calls the rule's scan() function directly. No network calls are made. +""" + +import scanner.rules.az_idn_001 as az_idn_001 +from tests.helpers.mock_azure import make_resource + +_REQUIRED_FIELDS = { + "rule_id", "rule_name", "severity", "category", + "resource_id", "resource_name", "resource_type", + "description", "remediation", "playbook", "frameworks", "metadata", +} + +_SUB = "00000000-0000-0000-0000-000000000001" +_OWNER_ROLE_GUID = "8e3af657-a8ff-443c-a75c-2fe8c4bcb635" +_CONTRIBUTOR_ROLE_GUID = "b24988ac-6180-42a0-ab88-20f7382dd24c" +_ROLE_DEF_BASE = ( + f"/subscriptions/{_SUB}/providers/Microsoft.Authorization/roleDefinitions" +) + + +def _assignment(role_guid, principal_id, assign_id): + return make_resource( + id=f"/subscriptions/{_SUB}/providers/Microsoft.Authorization/roleAssignments/{assign_id}", + role_definition_id=f"{_ROLE_DEF_BASE}/{role_guid}", + principal_id=principal_id, + scope=f"/subscriptions/{_SUB}", + ) + + +def test_idn_001_compliant_returns_no_findings(mock_azure, subscription_id): + """A service principal with a non-Owner role must produce no findings.""" + assignment = _assignment(_CONTRIBUTOR_ROLE_GUID, "sp-contributor-abc123", "assign-001") + mock_azure.set_service_principals([assignment]) + findings = az_idn_001.scan(mock_azure, subscription_id) + assert findings == [] + + +def test_idn_001_noncompliant_returns_one_finding(mock_azure, subscription_id): + """A service principal holding the Owner role must produce exactly one finding.""" + assignment = _assignment(_OWNER_ROLE_GUID, "sp-owner-def456", "assign-002") + mock_azure.set_service_principals([assignment]) + findings = az_idn_001.scan(mock_azure, subscription_id) + assert len(findings) == 1 + finding = findings[0] + assert _REQUIRED_FIELDS.issubset(finding.keys()) + assert finding["rule_id"] == "AZ-IDN-001" + assert finding["severity"] == "HIGH" + assert finding["category"] == "Identity" + assert finding["resource_name"] == "sp-owner-def456" + assert finding["metadata"]["principal_id"] == "sp-owner-def456" diff --git a/tests/test_rules_keyvault.py b/tests/test_rules_keyvault.py new file mode 100644 index 0000000..2615afa --- /dev/null +++ b/tests/test_rules_keyvault.py @@ -0,0 +1,66 @@ +"""Rule regression tests for AZ-KV-002. + +Each test configures a MockAzureClient with a fake Key Vault object and calls +the rule's scan() function directly. No network calls are made. +""" + +import scanner.rules.az_kv_002 as az_kv_002 +from tests.helpers.mock_azure import make_resource + +_REQUIRED_FIELDS = { + "rule_id", "rule_name", "severity", "category", + "resource_id", "resource_name", "resource_type", + "description", "remediation", "playbook", "frameworks", "metadata", +} + +_SUB = "00000000-0000-0000-0000-000000000001" +_RG = "rg-test" + + +def _kv_id(name): + return ( + f"/subscriptions/{_SUB}/resourceGroups/{_RG}" + f"/providers/Microsoft.KeyVault/vaults/{name}" + ) + + +def _vault(name, public_access, private_endpoints): + props = make_resource( + public_network_access=public_access, + private_endpoint_connections=private_endpoints, + ) + return make_resource( + id=_kv_id(name), + name=name, + location="eastus", + properties=props, + ) + + +def test_kv_002_compliant_public_access_disabled_returns_no_findings(mock_azure, subscription_id): + """A Key Vault with public access disabled must produce no findings.""" + mock_azure.set_key_vaults([_vault("kv-private", "Disabled", [])]) + findings = az_kv_002.scan(mock_azure, subscription_id) + assert findings == [] + + +def test_kv_002_compliant_private_endpoint_present_returns_no_findings(mock_azure, subscription_id): + """A Key Vault with a private endpoint must produce no findings.""" + endpoint = make_resource(id="pe-connection-001", name="pe-kv-secure") + mock_azure.set_key_vaults([_vault("kv-with-pe", "Enabled", [endpoint])]) + findings = az_kv_002.scan(mock_azure, subscription_id) + assert findings == [] + + +def test_kv_002_noncompliant_returns_one_finding(mock_azure, subscription_id): + """A Key Vault with public access enabled and no private endpoint must produce one finding.""" + mock_azure.set_key_vaults([_vault("kv-public", "Enabled", [])]) + findings = az_kv_002.scan(mock_azure, subscription_id) + assert len(findings) == 1 + finding = findings[0] + assert _REQUIRED_FIELDS.issubset(finding.keys()) + assert finding["rule_id"] == "AZ-KV-002" + assert finding["severity"] == "HIGH" + assert finding["category"] == "Key Vault" + assert finding["resource_name"] == "kv-public" + assert finding["metadata"]["resource_group"] == _RG diff --git a/tests/test_rules_network.py b/tests/test_rules_network.py new file mode 100644 index 0000000..8215c4d --- /dev/null +++ b/tests/test_rules_network.py @@ -0,0 +1,105 @@ +"""Rule regression tests for AZ-NET-001 and AZ-NET-002.""" + +import scanner.rules.az_net_001 as az_net_001 +import scanner.rules.az_net_002 as az_net_002 +from tests.helpers.mock_azure import make_resource + +_REQUIRED_FIELDS = { + "rule_id", "rule_name", "severity", "category", + "resource_id", "resource_name", "resource_type", + "description", "remediation", "playbook", "frameworks", "metadata", +} + +_SUB = "00000000-0000-0000-0000-000000000001" +_RG = "rg-test" + + +def _nsg_id(name): + return ( + f"/subscriptions/{_SUB}/resourceGroups/{_RG}" + f"/providers/Microsoft.Network/networkSecurityGroups/{name}" + ) + + +def _allow_rule(name, port, source="10.0.0.0/24"): + return make_resource( + name=name, + direction="Inbound", + access="Allow", + source_address_prefix=source, + source_address_prefixes=[], + destination_port_range=port, + destination_port_ranges=[], + ) + + +def _open_allow_rule(name, port): + return make_resource( + name=name, + direction="Inbound", + access="Allow", + source_address_prefix="0.0.0.0/0", + source_address_prefixes=[], + destination_port_range=port, + destination_port_ranges=[], + ) + + +def test_net_001_compliant_returns_no_findings(mock_azure, subscription_id): + """An NSG restricting SSH to a trusted IP range must produce no findings.""" + nsg = make_resource( + id=_nsg_id("nsg-ssh-restricted"), + name="nsg-ssh-restricted", + security_rules=[_allow_rule("AllowSSHFromTrusted", "22", "10.0.0.0/24")], + ) + mock_azure.set_network_security_groups([nsg]) + findings = az_net_001.scan(mock_azure, subscription_id) + assert findings == [] + + +def test_net_001_noncompliant_returns_one_finding(mock_azure, subscription_id): + """An NSG with Allow-inbound-SSH-from-any must produce exactly one finding.""" + nsg = make_resource( + id=_nsg_id("nsg-ssh-open"), + name="nsg-ssh-open", + security_rules=[_open_allow_rule("AllowSSHFromInternet", "22")], + ) + mock_azure.set_network_security_groups([nsg]) + findings = az_net_001.scan(mock_azure, subscription_id) + assert len(findings) == 1 + finding = findings[0] + assert _REQUIRED_FIELDS.issubset(finding.keys()) + assert finding["rule_id"] == "AZ-NET-001" + assert finding["severity"] == "HIGH" + assert finding["category"] == "Network" + assert finding["resource_name"] == "nsg-ssh-open" + + +def test_net_002_compliant_returns_no_findings(mock_azure, subscription_id): + """An NSG restricting RDP to a trusted IP range must produce no findings.""" + nsg = make_resource( + id=_nsg_id("nsg-rdp-restricted"), + name="nsg-rdp-restricted", + security_rules=[_allow_rule("AllowRDPFromTrusted", "3389", "192.168.1.0/24")], + ) + mock_azure.set_network_security_groups([nsg]) + findings = az_net_002.scan(mock_azure, subscription_id) + assert findings == [] + + +def test_net_002_noncompliant_returns_one_finding(mock_azure, subscription_id): + """An NSG with Allow-inbound-RDP-from-any must produce exactly one finding.""" + nsg = make_resource( + id=_nsg_id("nsg-rdp-open"), + name="nsg-rdp-open", + security_rules=[_open_allow_rule("AllowRDPFromInternet", "3389")], + ) + mock_azure.set_network_security_groups([nsg]) + findings = az_net_002.scan(mock_azure, subscription_id) + assert len(findings) == 1 + finding = findings[0] + assert _REQUIRED_FIELDS.issubset(finding.keys()) + assert finding["rule_id"] == "AZ-NET-002" + assert finding["severity"] == "HIGH" + assert finding["category"] == "Network" + assert finding["resource_name"] == "nsg-rdp-open" diff --git a/tests/test_rules_storage.py b/tests/test_rules_storage.py new file mode 100644 index 0000000..4257102 --- /dev/null +++ b/tests/test_rules_storage.py @@ -0,0 +1,85 @@ +"""Rule regression tests for AZ-STOR-001 and AZ-STOR-002. + +Each test configures a MockAzureClient with a single fake storage account +and calls the rule's scan() function directly. No network calls are made. +""" + +import scanner.rules.az_stor_001 as az_stor_001 +import scanner.rules.az_stor_002 as az_stor_002 +from tests.helpers.mock_azure import make_resource + +_REQUIRED_FIELDS = { + "rule_id", "rule_name", "severity", "category", + "resource_id", "resource_name", "resource_type", + "description", "remediation", "playbook", "frameworks", +} + +_SUB = "00000000-0000-0000-0000-000000000001" +_RG = "rg-test" + + +def _storage_id(name): + return ( + f"/subscriptions/{_SUB}/resourceGroups/{_RG}" + f"/providers/Microsoft.Storage/storageAccounts/{name}" + ) + + +def test_stor_001_compliant_returns_no_findings(mock_azure, subscription_id): + """A storage account with public blob access disabled must produce no findings.""" + account = make_resource( + id=_storage_id("compliant-storage"), + name="compliant-storage", + allow_blob_public_access=False, + ) + mock_azure.set_storage_accounts([account]) + findings = az_stor_001.scan(mock_azure, subscription_id) + assert findings == [] + + +def test_stor_001_noncompliant_returns_one_finding(mock_azure, subscription_id): + """A storage account with public blob access enabled must produce exactly one finding.""" + account = make_resource( + id=_storage_id("public-storage"), + name="public-storage", + allow_blob_public_access=True, + ) + mock_azure.set_storage_accounts([account]) + findings = az_stor_001.scan(mock_azure, subscription_id) + assert len(findings) == 1 + finding = findings[0] + assert _REQUIRED_FIELDS.issubset(finding.keys()) + assert finding["rule_id"] == "AZ-STOR-001" + assert finding["severity"] == "HIGH" + assert finding["category"] == "Storage" + assert finding["resource_name"] == "public-storage" + + +def test_stor_002_compliant_returns_no_findings(mock_azure, subscription_id): + """A storage account with HTTPS-only enabled must produce no findings.""" + account = make_resource( + id=_storage_id("https-only-storage"), + name="https-only-storage", + enable_https_traffic_only=True, + ) + mock_azure.set_storage_accounts([account]) + findings = az_stor_002.scan(mock_azure, subscription_id) + assert findings == [] + + +def test_stor_002_noncompliant_returns_one_finding(mock_azure, subscription_id): + """A storage account that allows HTTP traffic must produce exactly one finding.""" + account = make_resource( + id=_storage_id("http-allowed-storage"), + name="http-allowed-storage", + enable_https_traffic_only=False, + ) + mock_azure.set_storage_accounts([account]) + findings = az_stor_002.scan(mock_azure, subscription_id) + assert len(findings) == 1 + finding = findings[0] + assert _REQUIRED_FIELDS.issubset(finding.keys()) + assert finding["rule_id"] == "AZ-STOR-002" + assert finding["severity"] == "HIGH" + assert finding["category"] == "Storage" + assert finding["resource_name"] == "http-allowed-storage"