Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -328,17 +328,27 @@ jobs:
print(f"All compliance controls map to existing rule files. ({len(existing_ids)} rules checked)")
PYEOF

# ── CHECK 8: Rule regression tests (MockAzureClient, no Azure creds) ──
- name: Rule regression tests
id: rule_tests
env:
DATABASE_URL: "postgresql://ci:ci@localhost/ci_db"
run: |
echo "=== Running rule regression tests ==="
pytest tests/test_rules_*.py -v --tb=short

# ── Final summary — always runs, shows per-check pass/fail ────────
- name: CI Summary
if: always()
env:
SYNTAX: ${{ steps.syntax_check.outcome }}
STRUCTURE: ${{ steps.structure_check.outcome }}
CREDS: ${{ steps.cred_scan.outcome }}
PLAYBOOK: ${{ steps.playbook_check.outcome }}
JSON: ${{ steps.json_check.outcome }}
API: ${{ steps.api_check.outcome }}
XREF: ${{ steps.xref_check.outcome }}
SYNTAX: ${{ steps.syntax_check.outcome }}
STRUCTURE: ${{ steps.structure_check.outcome }}
CREDS: ${{ steps.cred_scan.outcome }}
PLAYBOOK: ${{ steps.playbook_check.outcome }}
JSON: ${{ steps.json_check.outcome }}
API: ${{ steps.api_check.outcome }}
XREF: ${{ steps.xref_check.outcome }}
RULE_TESTS: ${{ steps.rule_tests.outcome }}
run: |
python - <<'PYEOF'
import os
Expand All @@ -351,6 +361,7 @@ jobs:
("Compliance JSON validation", os.environ["JSON"]),
("API syntax check", os.environ["API"]),
("Compliance vs rule cross-reference", os.environ["XREF"]),
("Rule regression tests", os.environ["RULE_TESTS"]),
]

labels = {
Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,5 @@ azure-keyvault-keys==4.9.0
chromadb==0.4.24
sentence-transformers==2.7.0
numpy<2.0
pytest>=7.4.0
pytest-cov>=4.1.0
17 changes: 15 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@
import secrets
import time

import jwt
import pytest

from api.app import create_app
from tests.helpers.mock_azure import MockAzureClient

_TEST_JWT_SECRET = secrets.token_urlsafe(32)


@pytest.fixture
def app():
from api.app import create_app
application = create_app()
application.config["TESTING"] = True
application.config["JWT_SECRET"] = _TEST_JWT_SECRET
Expand All @@ -28,6 +28,7 @@ def client(app):

@pytest.fixture
def auth_headers():
import jwt
payload = {
"sub": "test-user",
"role": "admin",
Expand All @@ -36,3 +37,15 @@ def auth_headers():
}
token = jwt.encode(payload, _TEST_JWT_SECRET, algorithm="HS256")
return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}


@pytest.fixture
def mock_azure() -> MockAzureClient:
"""Return a clean MockAzureClient with no resources configured."""
return MockAzureClient()


@pytest.fixture
def subscription_id() -> str:
"""Return a fake Azure subscription ID for use in scan() calls."""
return "00000000-0000-0000-0000-000000000001"
1 change: 1 addition & 0 deletions tests/helpers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

98 changes: 98 additions & 0 deletions tests/helpers/mock_azure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""Shared MockAzureClient for rule regression tests.

Provides a configurable in-memory replacement for the real AzureClient so that
scanner rule unit tests can run fully offline with no Azure credentials.

Usage:
from tests.helpers.mock_azure import MockAzureClient, make_resource

client = MockAzureClient()
client.set_storage_accounts([
make_resource(id="/sub/rg/sa", name="mystorage", allow_blob_public_access=True)
])
findings = az_stor_001.scan(client, "sub-id")
"""

from types import SimpleNamespace
from typing import Any, Dict, List, Tuple


def make_resource(**kwargs: Any) -> SimpleNamespace:
"""Build a fake Azure resource object with arbitrary attributes."""
return SimpleNamespace(**kwargs)


class MockAzureClient:
"""Drop-in replacement for AzureClient that returns configured fake data."""

def __init__(self) -> None:
self._storage_accounts: List[Any] = []
self._network_security_groups: List[Any] = []
self._virtual_machines: List[Any] = []
self._key_vaults: List[Any] = []
self._sql_servers: List[Any] = []
self._service_principals: List[Any] = []
self._sql_firewall_rules: Dict[Tuple[str, str], List[Any]] = {}

def set_storage_accounts(self, accounts: List[Any]) -> "MockAzureClient":
self._storage_accounts = accounts
return self

def set_network_security_groups(self, nsgs: List[Any]) -> "MockAzureClient":
self._network_security_groups = nsgs
return self

def set_virtual_machines(self, vms: List[Any]) -> "MockAzureClient":
self._virtual_machines = vms
return self

def set_key_vaults(self, vaults: List[Any]) -> "MockAzureClient":
self._key_vaults = vaults
return self

def set_sql_servers(self, servers: List[Any]) -> "MockAzureClient":
self._sql_servers = servers
return self

def set_service_principals(self, principals: List[Any]) -> "MockAzureClient":
self._service_principals = principals
return self

def set_sql_server_firewall_rules(
self, resource_group: str, server_name: str, rules: List[Any]
) -> "MockAzureClient":
self._sql_firewall_rules[(resource_group, server_name)] = rules
return self

def get_storage_accounts(self) -> List[Any]:
return self._storage_accounts

def get_network_security_groups(self) -> List[Any]:
return self._network_security_groups

def get_virtual_machines(self) -> List[Any]:
return self._virtual_machines

def get_key_vaults(self) -> List[Any]:
return self._key_vaults

def get_sql_servers(self) -> List[Any]:
return self._sql_servers

def get_service_principals(self) -> List[Any]:
return self._service_principals

def get_sql_server_firewall_rules(
self, resource_group: str, server_name: str
) -> List[Any]:
return self._sql_firewall_rules.get((resource_group, server_name), [])

@staticmethod
def parse_resource_id(resource_id: str) -> Dict[str, str]:
"""Parse an Azure resource ID into a dict with name and resource_group."""
parts = resource_id.split("/")
result: Dict[str, str] = {"name": parts[-1] if parts else ""}
for idx, segment in enumerate(parts):
if segment.lower() == "resourcegroups" and idx + 1 < len(parts):
result["resource_group"] = parts[idx + 1]
return result
64 changes: 64 additions & 0 deletions tests/test_rules_database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""Rule regression tests for AZ-DB-004."""

import scanner.rules.az_db_004 as az_db_004
from tests.helpers.mock_azure import make_resource

_REQUIRED_FIELDS = {
"rule_id", "rule_name", "severity", "category",
"resource_id", "resource_name", "resource_type",
"description", "remediation", "playbook", "frameworks", "metadata",
}

_SUB = "00000000-0000-0000-0000-000000000001"
_RG = "rg-test"


def _sql_id(name):
return (
f"/subscriptions/{_SUB}/resourceGroups/{_RG}"
f"/providers/Microsoft.Sql/servers/{name}"
)


def _firewall_rule(name, start_ip, end_ip):
return make_resource(
name=name,
start_ip_address=start_ip,
end_ip_address=end_ip,
)


def test_db_004_compliant_returns_no_findings(mock_azure, subscription_id):
"""A SQL Server with no AllowAzureServices rule must produce no findings."""
server = make_resource(id=_sql_id("sql-restricted"), name="sql-restricted")
rule = _firewall_rule("AllowSpecificIP", "203.0.113.10", "203.0.113.10")
mock_azure.set_sql_servers([server])
mock_azure.set_sql_server_firewall_rules(_RG, "sql-restricted", [rule])
findings = az_db_004.scan(mock_azure, subscription_id)
assert findings == []


def test_db_004_noncompliant_returns_one_finding(mock_azure, subscription_id):
"""A SQL Server with AllowAllWindowsAzureIps rule must produce exactly one finding."""
server = make_resource(id=_sql_id("sql-open"), name="sql-open")
allow_azure = _firewall_rule("AllowAllWindowsAzureIps", "0.0.0.0", "0.0.0.0")
mock_azure.set_sql_servers([server])
mock_azure.set_sql_server_firewall_rules(_RG, "sql-open", [allow_azure])
findings = az_db_004.scan(mock_azure, subscription_id)
assert len(findings) == 1
finding = findings[0]
assert _REQUIRED_FIELDS.issubset(finding.keys())
assert finding["rule_id"] == "AZ-DB-004"
assert finding["severity"] == "HIGH"
assert finding["category"] == "Database"
assert finding["resource_name"] == "sql-open"
assert finding["metadata"]["resource_group"] == _RG


def test_db_004_no_firewall_rules_returns_no_findings(mock_azure, subscription_id):
"""A SQL Server with no firewall rules must produce no findings."""
server = make_resource(id=_sql_id("sql-no-rules"), name="sql-no-rules")
mock_azure.set_sql_servers([server])
mock_azure.set_sql_server_firewall_rules(_RG, "sql-no-rules", [])
findings = az_db_004.scan(mock_azure, subscription_id)
assert findings == []
53 changes: 53 additions & 0 deletions tests/test_rules_identity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""Rule regression tests for AZ-IDN-001.

Each test configures a MockAzureClient with fake role assignment objects and
calls the rule's scan() function directly. No network calls are made.
"""

import scanner.rules.az_idn_001 as az_idn_001
from tests.helpers.mock_azure import make_resource

_REQUIRED_FIELDS = {
"rule_id", "rule_name", "severity", "category",
"resource_id", "resource_name", "resource_type",
"description", "remediation", "playbook", "frameworks", "metadata",
}

_SUB = "00000000-0000-0000-0000-000000000001"
_OWNER_ROLE_GUID = "8e3af657-a8ff-443c-a75c-2fe8c4bcb635"
_CONTRIBUTOR_ROLE_GUID = "b24988ac-6180-42a0-ab88-20f7382dd24c"
_ROLE_DEF_BASE = (
f"/subscriptions/{_SUB}/providers/Microsoft.Authorization/roleDefinitions"
)


def _assignment(role_guid, principal_id, assign_id):
return make_resource(
id=f"/subscriptions/{_SUB}/providers/Microsoft.Authorization/roleAssignments/{assign_id}",
role_definition_id=f"{_ROLE_DEF_BASE}/{role_guid}",
principal_id=principal_id,
scope=f"/subscriptions/{_SUB}",
)


def test_idn_001_compliant_returns_no_findings(mock_azure, subscription_id):
"""A service principal with a non-Owner role must produce no findings."""
assignment = _assignment(_CONTRIBUTOR_ROLE_GUID, "sp-contributor-abc123", "assign-001")
mock_azure.set_service_principals([assignment])
findings = az_idn_001.scan(mock_azure, subscription_id)
assert findings == []


def test_idn_001_noncompliant_returns_one_finding(mock_azure, subscription_id):
"""A service principal holding the Owner role must produce exactly one finding."""
assignment = _assignment(_OWNER_ROLE_GUID, "sp-owner-def456", "assign-002")
mock_azure.set_service_principals([assignment])
findings = az_idn_001.scan(mock_azure, subscription_id)
assert len(findings) == 1
finding = findings[0]
assert _REQUIRED_FIELDS.issubset(finding.keys())
assert finding["rule_id"] == "AZ-IDN-001"
assert finding["severity"] == "HIGH"
assert finding["category"] == "Identity"
assert finding["resource_name"] == "sp-owner-def456"
assert finding["metadata"]["principal_id"] == "sp-owner-def456"
66 changes: 66 additions & 0 deletions tests/test_rules_keyvault.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""Rule regression tests for AZ-KV-002.

Each test configures a MockAzureClient with a fake Key Vault object and calls
the rule's scan() function directly. No network calls are made.
"""

import scanner.rules.az_kv_002 as az_kv_002
from tests.helpers.mock_azure import make_resource

_REQUIRED_FIELDS = {
"rule_id", "rule_name", "severity", "category",
"resource_id", "resource_name", "resource_type",
"description", "remediation", "playbook", "frameworks", "metadata",
}

_SUB = "00000000-0000-0000-0000-000000000001"
_RG = "rg-test"


def _kv_id(name):
return (
f"/subscriptions/{_SUB}/resourceGroups/{_RG}"
f"/providers/Microsoft.KeyVault/vaults/{name}"
)


def _vault(name, public_access, private_endpoints):
props = make_resource(
public_network_access=public_access,
private_endpoint_connections=private_endpoints,
)
return make_resource(
id=_kv_id(name),
name=name,
location="eastus",
properties=props,
)


def test_kv_002_compliant_public_access_disabled_returns_no_findings(mock_azure, subscription_id):
"""A Key Vault with public access disabled must produce no findings."""
mock_azure.set_key_vaults([_vault("kv-private", "Disabled", [])])
findings = az_kv_002.scan(mock_azure, subscription_id)
assert findings == []


def test_kv_002_compliant_private_endpoint_present_returns_no_findings(mock_azure, subscription_id):
"""A Key Vault with a private endpoint must produce no findings."""
endpoint = make_resource(id="pe-connection-001", name="pe-kv-secure")
mock_azure.set_key_vaults([_vault("kv-with-pe", "Enabled", [endpoint])])
findings = az_kv_002.scan(mock_azure, subscription_id)
assert findings == []


def test_kv_002_noncompliant_returns_one_finding(mock_azure, subscription_id):
"""A Key Vault with public access enabled and no private endpoint must produce one finding."""
mock_azure.set_key_vaults([_vault("kv-public", "Enabled", [])])
findings = az_kv_002.scan(mock_azure, subscription_id)
assert len(findings) == 1
finding = findings[0]
assert _REQUIRED_FIELDS.issubset(finding.keys())
assert finding["rule_id"] == "AZ-KV-002"
assert finding["severity"] == "HIGH"
assert finding["category"] == "Key Vault"
assert finding["resource_name"] == "kv-public"
assert finding["metadata"]["resource_group"] == _RG
Loading
Loading