diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d90871c..3573c3b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -71,6 +71,30 @@ jobs: - working-directory: skills/discover-environment run: pytest tests/ -v -o "testpaths=tests" + test-k8s-security: + runs-on: ubuntu-latest + needs: lint + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: "3.11" + - run: pip install pytest + - working-directory: skills/k8s-security-benchmark + run: pytest tests/ -v -o "testpaths=tests" + + test-container-security: + runs-on: ubuntu-latest + needs: lint + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: "3.11" + - run: pip install pytest + - working-directory: skills/container-security + run: pytest tests/ -v -o "testpaths=tests" + agent-bom: runs-on: ubuntu-latest needs: lint diff --git a/CLAUDE.md b/CLAUDE.md index f61a943..54a44c5 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -12,6 +12,8 @@ skills/ cspm-azure-cis-benchmark/ — CIS Azure Foundations v2.1 (19 checks + 5 AI Foundry) model-serving-security/ — Model serving security benchmark (16 checks) gpu-cluster-security/ — GPU cluster security benchmark (13 checks) + k8s-security-benchmark/ — Kubernetes security benchmark (10 checks) + container-security/ — Container image + runtime security (8 checks) discover-environment/ — Cloud environment discovery with MITRE ATT&CK/ATLAS overlay vuln-remediation-pipeline/ — Auto-remediate supply chain vulnerabilities ``` diff --git a/README.md b/README.md index 4a04d16..8556a2b 100644 --- a/README.md +++ b/README.md @@ -3,9 +3,7 @@ [![CI](https://github.com/msaad00/cloud-security/actions/workflows/ci.yml/badge.svg)](https://github.com/msaad00/cloud-security/actions/workflows/ci.yml) [![License: Apache 2.0](https://img.shields.io/badge/License-Apache_2.0-blue.svg)](LICENSE) [![Python 3.11+](https://img.shields.io/badge/python-3.11+-blue.svg)](https://www.python.org/downloads/) -[![Scanned by agent-bom](https://img.shields.io/badge/scanned%20by-agent--bom-10b981)](https://github.com/msaad00/agent-bom) - -Production-grade cloud security benchmarks and automation — CIS checks for AWS/GCP/Azure, model serving security, GPU cluster hardening, IAM remediation, and vulnerability response pipelines. Each skill is compliance-mapped, tested, and ready to deploy. +Production-grade cloud security benchmarks and automation — CIS checks for AWS/GCP/Azure, Kubernetes and container hardening, model serving security, GPU cluster security, IAM remediation, and vulnerability response pipelines. Each skill is compliance-mapped, tested, and ready to deploy. ## Skills @@ -16,6 +14,8 @@ Production-grade cloud security benchmarks and automation — CIS checks for AWS | [cspm-azure-cis-benchmark](skills/cspm-azure-cis-benchmark/) | Azure | 24 | CIS Azure Foundations v2.1 + AI Foundry security | | [model-serving-security](skills/model-serving-security/) | Any | 16 | Model endpoint auth, rate limiting, data egress, safety layers | | [gpu-cluster-security](skills/gpu-cluster-security/) | Any | 13 | GPU runtime isolation, driver CVEs, InfiniBand, tenant isolation | +| [k8s-security-benchmark](skills/k8s-security-benchmark/) | Any | 10 | Pod security, RBAC, network policies, secrets, image pinning | +| [container-security](skills/container-security/) | Any | 8 | Dockerfile best practices, image security, runtime isolation | | [discover-environment](skills/discover-environment/) | Multi-cloud | — | Map cloud resources to security graph with MITRE ATT&CK/ATLAS overlays | | [iam-departures-remediation](skills/iam-departures-remediation/) | Multi-cloud | — | Auto-remediate IAM for departed employees across 5 clouds | | [vuln-remediation-pipeline](skills/vuln-remediation-pipeline/) | AWS | — | Auto-remediate supply chain vulns with EPSS triage | @@ -155,16 +155,10 @@ flowchart LR ## CI/CD Pipeline -This repo is scanned by [agent-bom](https://github.com/msaad00/agent-bom) in CI — dogfooding the scanner against its own security skills. - | CI Job | What | |--------|------| | Lint | ruff check + format | -| Test (IAM) | pytest — parser + worker Lambdas | -| Test (Model Serving) | pytest — 31 checks | -| Test (GPU Cluster) | pytest — 31 checks | -| **agent-bom scan** | **SAST + secret detection → SARIF → GitHub Security tab** | -| **agent-bom skills audit** | **SKILL.md security review → SARIF → GitHub Security tab** | +| Tests | pytest per skill (IAM, model-serving, GPU, K8s, container, discover) | | CloudFormation | cfn-lint validation | | Terraform | terraform validate | | Security | bandit + hardcoded secret grep | @@ -179,35 +173,26 @@ cd cloud-security pip install boto3 python skills/cspm-aws-cis-benchmark/src/checks.py --region us-east-1 -# Model serving security audit -python skills/model-serving-security/src/checks.py serving-config.json +# Model serving security (with example config) +python skills/model-serving-security/src/checks.py skills/model-serving-security/examples/insecure-serving.json + +# GPU cluster security (with example config) +python skills/gpu-cluster-security/src/checks.py skills/gpu-cluster-security/examples/insecure-cluster.json + +# K8s security benchmark +python skills/k8s-security-benchmark/src/checks.py skills/k8s-security-benchmark/examples/secure-cluster.json -# GPU cluster security audit -python skills/gpu-cluster-security/src/checks.py cluster-config.json +# Container security +python skills/container-security/src/checks.py skills/container-security/examples/secure-image.json # Run tests pip install pytest boto3 moto -cd skills/iam-departures-remediation && pytest tests/test_parser_lambda.py tests/test_worker_lambda.py -v - -# Scan with agent-bom -pip install agent-bom -agent-bom skills scan skills/ -agent-bom code skills/ +pytest skills/model-serving-security/tests/ -v -o "testpaths=tests" +pytest skills/gpu-cluster-security/tests/ -v -o "testpaths=tests" +pytest skills/k8s-security-benchmark/tests/ -v -o "testpaths=tests" +pytest skills/container-security/tests/ -v -o "testpaths=tests" ``` -## Integration with agent-bom - -This repo provides the automations. [agent-bom](https://github.com/msaad00/agent-bom) provides continuous scanning: - -| agent-bom Feature | Use Case | -|--------------------|----------| -| `cis_benchmark` | Built-in CIS for AWS/GCP/Azure/Snowflake | -| `code` | SAST scan of Lambda/skill source code | -| `skills scan` | Audit SKILL.md for security risks | -| `blast_radius` | Map impact of orphaned credentials | -| `compliance` | 15-framework compliance posture | -| `graph` | Visualize dependencies + attack paths | - ## Contributing See [CONTRIBUTING.md](CONTRIBUTING.md). diff --git a/skills/container-security/SKILL.md b/skills/container-security/SKILL.md new file mode 100644 index 0000000..dbb48c4 --- /dev/null +++ b/skills/container-security/SKILL.md @@ -0,0 +1,76 @@ +--- +name: container-security +description: >- + Audit container image and runtime security. Checks Dockerfile best practices, + image configuration, secrets exposure, base image selection, and runtime + isolation. Works with Dockerfile analysis, image config JSON, or runtime + dumps. Use when the user mentions container security, Docker hardening, + image scanning, Dockerfile audit, or CIS Docker benchmark. +license: Apache-2.0 +compatibility: >- + Requires Python 3.11+. No Docker daemon needed — works with config files. + Optional: PyYAML for YAML parsing. Read-only — no image pulls or execution. +metadata: + author: msaad00 + homepage: https://github.com/msaad00/cloud-security + source: https://github.com/msaad00/cloud-security/tree/main/skills/container-security + version: 0.1.0 + frameworks: + - CIS Docker Benchmark + - NIST CSF 2.0 + cloud: any +--- + +# Container Security Benchmark + +8 automated checks across 3 domains — Dockerfile best practices, image +security, and runtime isolation. Each check mapped to CIS Docker Benchmark +and NIST CSF 2.0. + +## Architecture + +```mermaid +flowchart LR + IMG["Container Config\nDockerfile · Image JSON\nRuntime dumps"] + BENCH["checks.py\n8 checks · 3 domains"] + OUT["JSON / Console"] + + IMG --> BENCH --> OUT + + style IMG fill:#1e293b,stroke:#475569,color:#e2e8f0 + style BENCH fill:#164e63,stroke:#22d3ee,color:#e2e8f0 +``` + +## Controls + +| # | Check | Severity | CIS Docker | +|---|-------|----------|-----------| +| CTR-1.1 | No root user | HIGH | 4.1 | +| CTR-1.2 | No :latest base image | MEDIUM | 4.2 | +| CTR-1.3 | HEALTHCHECK defined | LOW | 4.6 | +| CTR-2.1 | No secrets in env vars | CRITICAL | 4.5 | +| CTR-2.2 | Minimal base image | MEDIUM | 4.3 | +| CTR-2.3 | COPY instead of ADD | LOW | 4.9 | +| CTR-3.1 | Read-only root filesystem | MEDIUM | 5.12 | +| CTR-3.2 | Resource limits set | MEDIUM | 5.14 | + +## Usage + +```bash +python src/checks.py container-config.json +python src/checks.py config.yaml --section dockerfile +python src/checks.py config.json --output json +``` + +## Security Guardrails + +- **Read-only**: Analyzes config files. No Docker daemon interaction. +- **No image pulls**: Does not pull, build, or execute container images. +- **Human-in-the-loop**: Assessment automated, Dockerfile changes require human. + +## Tests + +```bash +cd skills/container-security +pytest tests/ -v -o "testpaths=tests" +``` diff --git a/skills/container-security/examples/secure-image.json b/skills/container-security/examples/secure-image.json new file mode 100644 index 0000000..184b435 --- /dev/null +++ b/skills/container-security/examples/secure-image.json @@ -0,0 +1,19 @@ +{ + "_comment": "Example: hardened container — all 8 checks pass", + "images": [ + { + "name": "myapp", + "base_image": "python:3.11-alpine", + "user": "1000", + "healthcheck": {"test": ["CMD", "curl", "-f", "http://localhost:8080/health"]}, + "env": ["NODE_ENV=production", "LOG_LEVEL=info"] + } + ], + "containers": [ + { + "name": "myapp", + "security_context": {"readOnlyRootFilesystem": true}, + "resources": {"limits": {"cpu": "500m", "memory": "256Mi"}} + } + ] +} diff --git a/skills/container-security/src/checks.py b/skills/container-security/src/checks.py new file mode 100644 index 0000000..19e805a --- /dev/null +++ b/skills/container-security/src/checks.py @@ -0,0 +1,293 @@ +"""Container Security Benchmark — audit container image and runtime security. + +Checks Dockerfile best practices, image configuration, runtime security, +and supply chain integrity. Works with Dockerfile analysis, image config +JSON, or container runtime dumps. + +Read-only — analyzes configs only, does not pull or execute images. +""" + +from __future__ import annotations + +import argparse +import json +import re +import sys +from dataclasses import asdict, dataclass, field +from pathlib import Path + + +@dataclass +class Finding: + check_id: str + title: str + section: str + severity: str + status: str + detail: str = "" + remediation: str = "" + cis_docker: str = "" + nist_csf: str = "" + resources: list[str] = field(default_factory=list) + + +# ═══════════════════════════════════════════════════════════════════════════ +# Section 1 — Dockerfile Best Practices +# ═══════════════════════════════════════════════════════════════════════════ + + +def check_1_1_no_root_user(config: dict) -> Finding: + """CTR-1.1 — Container does not run as root.""" + images = config.get("images", config.get("containers", [])) + root_images = [] + for img in images: + user = img.get("user", img.get("User", "")) + if not user or user == "root" or user == "0": + root_images.append(img.get("name", img.get("image", "unknown"))) + return Finding( + check_id="CTR-1.1", + title="No root user", + section="dockerfile", + severity="HIGH", + status="FAIL" if root_images else "PASS", + detail=f"{len(root_images)} images run as root" if root_images else "All images use non-root user", + remediation="Add USER directive with non-root UID in Dockerfile. Never run as root.", + cis_docker="4.1", + nist_csf="PR.AC-4", + resources=root_images, + ) + + +def check_1_2_no_latest_base(config: dict) -> Finding: + """CTR-1.2 — Base image uses specific tag, not :latest.""" + images = config.get("images", config.get("containers", [])) + latest = [] + for img in images: + base = img.get("base_image", img.get("from", "")) + if base and (base.endswith(":latest") or ":" not in base): + latest.append(f"{img.get('name', 'unknown')}: FROM {base}") + return Finding( + check_id="CTR-1.2", + title="No :latest base images", + section="dockerfile", + severity="MEDIUM", + status="FAIL" if latest else "PASS", + detail=f"{len(latest)} images use :latest base" if latest else "All base images pinned", + remediation="Pin base images to SHA digest or specific version tag.", + cis_docker="4.2", + nist_csf="PR.DS-6", + resources=latest, + ) + + +def check_1_3_healthcheck_defined(config: dict) -> Finding: + """CTR-1.3 — HEALTHCHECK instruction defined.""" + images = config.get("images", config.get("containers", [])) + no_health = [] + for img in images: + healthcheck = img.get("healthcheck", img.get("Healthcheck")) + if not healthcheck: + no_health.append(img.get("name", "unknown")) + return Finding( + check_id="CTR-1.3", + title="HEALTHCHECK defined", + section="dockerfile", + severity="LOW", + status="FAIL" if no_health else "PASS", + detail=f"{len(no_health)} images without healthcheck" if no_health else "All images have HEALTHCHECK", + remediation="Add HEALTHCHECK instruction to Dockerfile for orchestrator liveness probes.", + cis_docker="4.6", + nist_csf="DE.CM-1", + resources=no_health, + ) + + +# ═══════════════════════════════════════════════════════════════════════════ +# Section 2 — Image Security +# ═══════════════════════════════════════════════════════════════════════════ + + +def check_2_1_no_secrets_in_env(config: dict) -> Finding: + """CTR-2.1 — No secrets in environment variables.""" + secret_patterns = re.compile(r"(?i)(password|secret|token|api_key|private_key|credentials)") + images = config.get("images", config.get("containers", [])) + exposed = [] + for img in images: + for env in img.get("env", img.get("Env", [])): + key = env.split("=")[0] if isinstance(env, str) else env.get("name", "") + if secret_patterns.search(key): + exposed.append(f"{img.get('name', 'unknown')}: {key}") + return Finding( + check_id="CTR-2.1", + title="No secrets in env vars", + section="image_security", + severity="CRITICAL", + status="FAIL" if exposed else "PASS", + detail=f"{len(exposed)} potential secrets in env" if exposed else "No secrets in environment variables", + remediation="Use mounted secrets or external secret managers. Never bake secrets into images.", + cis_docker="4.5", + nist_csf="PR.DS-5", + resources=exposed[:10], + ) + + +def check_2_2_minimal_packages(config: dict) -> Finding: + """CTR-2.2 — Image uses minimal base (alpine, slim, distroless).""" + images = config.get("images", config.get("containers", [])) + bloated = [] + minimal_indicators = ("alpine", "slim", "distroless", "scratch", "busybox", "ubi-minimal") + for img in images: + base = img.get("base_image", img.get("from", "")).lower() + if base and not any(m in base for m in minimal_indicators): + bloated.append(f"{img.get('name', 'unknown')}: {base}") + return Finding( + check_id="CTR-2.2", + title="Minimal base image", + section="image_security", + severity="MEDIUM", + status="FAIL" if bloated else "PASS", + detail=f"{len(bloated)} images not using minimal base" if bloated else "All images use minimal bases", + remediation="Use alpine, slim, or distroless base images to reduce attack surface.", + cis_docker="4.3", + nist_csf="PR.IP-1", + resources=bloated, + ) + + +def check_2_3_no_add_instruction(config: dict) -> Finding: + """CTR-2.3 — COPY used instead of ADD.""" + images = config.get("images", config.get("containers", [])) + uses_add = [] + for img in images: + instructions = img.get("instructions", img.get("history", [])) + for inst in instructions: + cmd = inst if isinstance(inst, str) else inst.get("created_by", "") + if cmd.strip().upper().startswith("ADD ") and not cmd.strip().upper().startswith("ADD --CHOWN"): + uses_add.append(img.get("name", "unknown")) + break + return Finding( + check_id="CTR-2.3", + title="COPY instead of ADD", + section="image_security", + severity="LOW", + status="FAIL" if uses_add else "PASS", + detail=f"{len(uses_add)} images using ADD" if uses_add else "No images use ADD instruction", + remediation="Replace ADD with COPY. ADD has implicit tar extraction and URL fetching — use explicit commands.", + cis_docker="4.9", + nist_csf="PR.IP-1", + resources=uses_add, + ) + + +# ═══════════════════════════════════════════════════════════════════════════ +# Section 3 — Runtime Security +# ═══════════════════════════════════════════════════════════════════════════ + + +def check_3_1_read_only_rootfs(config: dict) -> Finding: + """CTR-3.1 — Read-only root filesystem.""" + containers = config.get("containers", config.get("images", [])) + writable = [] + for c in containers: + sec = c.get("security_context", c.get("securityContext", {})) + if not sec.get("readOnlyRootFilesystem", sec.get("read_only_rootfs", False)): + writable.append(c.get("name", "unknown")) + return Finding( + check_id="CTR-3.1", + title="Read-only root filesystem", + section="runtime", + severity="MEDIUM", + status="FAIL" if writable else "PASS", + detail=f"{len(writable)} containers with writable rootfs" if writable else "All containers read-only", + remediation="Set readOnlyRootFilesystem: true. Use emptyDir for temp data.", + cis_docker="5.12", + nist_csf="PR.DS-6", + resources=writable, + ) + + +def check_3_2_resource_limits(config: dict) -> Finding: + """CTR-3.2 — CPU and memory limits set.""" + containers = config.get("containers", config.get("images", [])) + no_limits = [] + for c in containers: + res = c.get("resources", {}) + limits = res.get("limits", {}) + if not limits.get("cpu") and not limits.get("memory"): + no_limits.append(c.get("name", "unknown")) + return Finding( + check_id="CTR-3.2", + title="Resource limits set", + section="runtime", + severity="MEDIUM", + status="FAIL" if no_limits else "PASS", + detail=f"{len(no_limits)} containers without resource limits" if no_limits else "All containers have limits", + remediation="Set resources.limits.cpu and resources.limits.memory on every container.", + cis_docker="5.14", + nist_csf="PR.DS-4", + resources=no_limits, + ) + + +# ═══════════════════════════════════════════════════════════════════════════ +# Orchestrator +# ═══════════════════════════════════════════════════════════════════════════ + +ALL_CHECKS = { + "dockerfile": [check_1_1_no_root_user, check_1_2_no_latest_base, check_1_3_healthcheck_defined], + "image_security": [check_2_1_no_secrets_in_env, check_2_2_minimal_packages, check_2_3_no_add_instruction], + "runtime": [check_3_1_read_only_rootfs, check_3_2_resource_limits], +} + + +def run_benchmark(config: dict, *, section: str | None = None) -> list[Finding]: + findings: list[Finding] = [] + sections = {section: ALL_CHECKS[section]} if section and section in ALL_CHECKS else ALL_CHECKS + for checks in sections.values(): + for check_fn in checks: + findings.append(check_fn(config)) + return findings + + +def print_summary(findings: list[Finding]) -> None: + total = len(findings) + passed = sum(1 for f in findings if f.status == "PASS") + failed = sum(1 for f in findings if f.status == "FAIL") + print(f"\n{'=' * 60}") + print(" Container Security Benchmark — Results") + print(f"{'=' * 60}\n") + current = "" + for f in findings: + if f.section != current: + current = f.section + print(f"\n [{current.upper()}]") + icon = {"PASS": "+", "FAIL": "x"}[f.status] + print(f" [{icon}] {f.check_id} [{f.severity:8s}] {f.title}") + if f.status == "FAIL": + print(f" {f.detail}") + if f.remediation: + print(f" FIX: {f.remediation}") + print(f"\n {'─' * 56}") + print(f" Total: {total} | Passed: {passed} | Failed: {failed}") + print(f" Pass rate: {passed / total * 100:.0f}%\n" if total else "") + + +def main() -> None: + parser = argparse.ArgumentParser(description="Container Security Benchmark") + parser.add_argument("config", help="Path to container config (JSON/YAML)") + parser.add_argument("--section", choices=list(ALL_CHECKS.keys())) + parser.add_argument("--output", choices=["console", "json"], default="console") + args = parser.parse_args() + p = Path(args.config) + content = p.read_text() + config = json.loads(content) if p.suffix == ".json" else __import__("yaml").safe_load(content) + findings = run_benchmark(config, section=args.section) + if args.output == "json": + print(json.dumps([asdict(f) for f in findings], indent=2)) + else: + print_summary(findings) + sys.exit(1 if any(f.status == "FAIL" and f.severity in ("CRITICAL", "HIGH") for f in findings) else 0) + + +if __name__ == "__main__": + main() diff --git a/skills/container-security/tests/test_checks.py b/skills/container-security/tests/test_checks.py new file mode 100644 index 0000000..b91f977 --- /dev/null +++ b/skills/container-security/tests/test_checks.py @@ -0,0 +1,101 @@ +"""Tests for container security benchmark.""" + +from __future__ import annotations + +import os +import sys + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src")) + +from checks import Finding, run_benchmark + + +class TestDockerfile: + def test_root_user_fails(self): + config = {"images": [{"name": "app", "user": "root"}]} + findings = run_benchmark(config, section="dockerfile") + assert findings[0].status == "FAIL" + + def test_non_root_passes(self): + config = {"images": [{"name": "app", "user": "1000"}]} + findings = run_benchmark(config, section="dockerfile") + assert findings[0].status == "PASS" + + def test_latest_base_fails(self): + config = {"images": [{"name": "app", "base_image": "python:latest"}]} + findings = run_benchmark(config, section="dockerfile") + tag = next(f for f in findings if f.check_id == "CTR-1.2") + assert tag.status == "FAIL" + + def test_pinned_base_passes(self): + config = {"images": [{"name": "app", "base_image": "python:3.11-alpine"}]} + findings = run_benchmark(config, section="dockerfile") + tag = next(f for f in findings if f.check_id == "CTR-1.2") + assert tag.status == "PASS" + + def test_no_healthcheck_fails(self): + config = {"images": [{"name": "app"}]} + findings = run_benchmark(config, section="dockerfile") + hc = next(f for f in findings if f.check_id == "CTR-1.3") + assert hc.status == "FAIL" + + +class TestImageSecurity: + def test_secret_in_env_fails(self): + config = {"images": [{"name": "app", "env": ["DATABASE_PASSWORD=secret123"]}]} + findings = run_benchmark(config, section="image_security") + sec = next(f for f in findings if f.check_id == "CTR-2.1") + assert sec.status == "FAIL" + assert sec.severity == "CRITICAL" + + def test_clean_env_passes(self): + config = {"images": [{"name": "app", "env": ["NODE_ENV=production"]}]} + findings = run_benchmark(config, section="image_security") + sec = next(f for f in findings if f.check_id == "CTR-2.1") + assert sec.status == "PASS" + + def test_bloated_base_fails(self): + config = {"images": [{"name": "app", "base_image": "ubuntu:22.04"}]} + findings = run_benchmark(config, section="image_security") + base = next(f for f in findings if f.check_id == "CTR-2.2") + assert base.status == "FAIL" + + def test_alpine_base_passes(self): + config = {"images": [{"name": "app", "base_image": "python:3.11-alpine"}]} + findings = run_benchmark(config, section="image_security") + base = next(f for f in findings if f.check_id == "CTR-2.2") + assert base.status == "PASS" + + +class TestRuntime: + def test_writable_rootfs_fails(self): + config = {"containers": [{"name": "app", "security_context": {}}]} + findings = run_benchmark(config, section="runtime") + ro = next(f for f in findings if f.check_id == "CTR-3.1") + assert ro.status == "FAIL" + + def test_no_resource_limits_fails(self): + config = {"containers": [{"name": "app", "resources": {}}]} + findings = run_benchmark(config, section="runtime") + lim = next(f for f in findings if f.check_id == "CTR-3.2") + assert lim.status == "FAIL" + + def test_with_limits_passes(self): + config = {"containers": [{"name": "app", "resources": {"limits": {"cpu": "1", "memory": "512Mi"}}}]} + findings = run_benchmark(config, section="runtime") + lim = next(f for f in findings if f.check_id == "CTR-3.2") + assert lim.status == "PASS" + + +class TestRunner: + def test_run_all(self): + config = {"images": [{"name": "app", "user": "1000", "base_image": "python:3.11-alpine"}], "containers": []} + findings = run_benchmark(config) + assert len(findings) == 8 + assert all(isinstance(f, Finding) for f in findings) + + def test_all_have_cis_mapping(self): + config = {"images": [{"name": "test"}]} + findings = run_benchmark(config) + for f in findings: + assert f.cis_docker, f"{f.check_id} missing CIS Docker mapping" diff --git a/skills/cspm-aws-cis-benchmark/tests/test_checks.py b/skills/cspm-aws-cis-benchmark/tests/test_checks.py new file mode 100644 index 0000000..bebf279 --- /dev/null +++ b/skills/cspm-aws-cis-benchmark/tests/test_checks.py @@ -0,0 +1,154 @@ +"""Tests for CIS AWS Foundations Benchmark v3.0 checks. + +Uses moto to mock AWS services — no real AWS credentials needed. +""" + +from __future__ import annotations + +import os +import sys + +import boto3 +from moto import mock_aws + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src")) + +from checks import ( + Finding, + check_1_1_root_mfa, + check_1_2_user_mfa, + check_1_5_password_policy, + check_1_6_no_root_keys, + check_1_7_no_inline_policies, + check_2_1_s3_encryption, + check_2_3_s3_public_access, + check_2_4_s3_versioning, + check_4_1_no_unrestricted_ssh, + check_4_2_no_unrestricted_rdp, + check_4_3_vpc_flow_logs, +) + + +@mock_aws +class TestIAMChecks: + def test_1_1_root_mfa_pass(self): + iam = boto3.client("iam", region_name="us-east-1") + f = check_1_1_root_mfa(iam) + assert isinstance(f, Finding) + assert f.control_id == "1.1" + assert f.severity == "CRITICAL" + assert f.nist_csf == "PR.AC-1" + + def test_1_2_no_users_passes(self): + iam = boto3.client("iam", region_name="us-east-1") + f = check_1_2_user_mfa(iam) + assert f.status == "PASS" + + def test_1_5_password_policy(self): + iam = boto3.client("iam", region_name="us-east-1") + iam.update_account_password_policy( + MinimumPasswordLength=14, + RequireSymbols=True, + RequireNumbers=True, + RequireUppercaseCharacters=True, + RequireLowercaseCharacters=True, + MaxPasswordAge=90, + PasswordReusePrevention=24, + ) + f = check_1_5_password_policy(iam) + assert f.control_id == "1.5" + assert f.status == "PASS" + + def test_1_6_no_root_keys(self): + iam = boto3.client("iam", region_name="us-east-1") + f = check_1_6_no_root_keys(iam) + assert f.control_id == "1.6" + assert f.severity == "CRITICAL" + + def test_1_7_no_inline_policies_pass(self): + iam = boto3.client("iam", region_name="us-east-1") + f = check_1_7_no_inline_policies(iam) + assert f.status == "PASS" + + def test_1_7_inline_policy_fails(self): + iam = boto3.client("iam", region_name="us-east-1") + iam.create_user(UserName="testuser") + iam.put_user_policy( + UserName="testuser", + PolicyName="inline-policy", + PolicyDocument='{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":"s3:*","Resource":"*"}]}', + ) + f = check_1_7_no_inline_policies(iam) + assert f.status == "FAIL" + assert "testuser" in f.resources + + +@mock_aws +class TestStorageChecks: + def test_2_1_s3_encryption(self): + s3 = boto3.client("s3", region_name="us-east-1") + s3.create_bucket(Bucket="test-bucket") + f = check_2_1_s3_encryption(s3) + assert f.control_id == "2.1" + + def test_2_3_public_access_blocked(self): + s3 = boto3.client("s3", region_name="us-east-1") + s3.create_bucket(Bucket="test-bucket") + s3.put_public_access_block( + Bucket="test-bucket", + PublicAccessBlockConfiguration={ + "BlockPublicAcls": True, + "IgnorePublicAcls": True, + "BlockPublicPolicy": True, + "RestrictPublicBuckets": True, + }, + ) + f = check_2_3_s3_public_access(s3) + assert f.control_id == "2.3" + + def test_2_4_versioning(self): + s3 = boto3.client("s3", region_name="us-east-1") + s3.create_bucket(Bucket="test-bucket") + f = check_2_4_s3_versioning(s3) + assert f.control_id == "2.4" + + +@mock_aws +class TestNetworkChecks: + def test_4_1_ssh_open_fails(self): + ec2 = boto3.client("ec2", region_name="us-east-1") + vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16") + sg = ec2.create_security_group(GroupName="open-ssh", Description="test", VpcId=vpc["Vpc"]["VpcId"]) + ec2.authorize_security_group_ingress( + GroupId=sg["GroupId"], + IpPermissions=[{"FromPort": 22, "ToPort": 22, "IpProtocol": "tcp", "IpRanges": [{"CidrIp": "0.0.0.0/0"}]}], + ) + f = check_4_1_no_unrestricted_ssh(ec2) + assert f.status == "FAIL" + + def test_4_2_rdp_closed_passes(self): + ec2 = boto3.client("ec2", region_name="us-east-1") + f = check_4_2_no_unrestricted_rdp(ec2) + # Default SGs don't have RDP open + assert f.control_id == "4.2" + + def test_4_3_vpc_flow_logs(self): + ec2 = boto3.client("ec2", region_name="us-east-1") + f = check_4_3_vpc_flow_logs(ec2) + assert f.control_id == "4.3" + + +@mock_aws +class TestFindingCompliance: + def test_all_checks_have_nist_mapping(self): + iam = boto3.client("iam", region_name="us-east-1") + checks = [ + check_1_1_root_mfa(iam), + check_1_2_user_mfa(iam), + check_1_5_password_policy(iam), + check_1_6_no_root_keys(iam), + check_1_7_no_inline_policies(iam), + ] + for f in checks: + assert f.nist_csf, f"Check {f.control_id} missing NIST CSF mapping" + assert f.iso_27001, f"Check {f.control_id} missing ISO 27001 mapping" diff --git a/skills/cspm-azure-cis-benchmark/tests/test_checks.py b/skills/cspm-azure-cis-benchmark/tests/test_checks.py new file mode 100644 index 0000000..1524976 --- /dev/null +++ b/skills/cspm-azure-cis-benchmark/tests/test_checks.py @@ -0,0 +1,94 @@ +"""Tests for CIS Azure Foundations Benchmark v2.1 checks. + +Uses unittest.mock to simulate Azure SDK responses. +""" + +from __future__ import annotations + +import os +import sys +from unittest.mock import MagicMock + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src")) + +from checks import check_2_1_storage_encryption, check_4_1_no_open_ssh + + +class TestStorageChecks: + def test_2_1_encryption_enabled_passes(self): + mock_client = MagicMock() + account = MagicMock() + account.name = "teststorage" + account.encryption = MagicMock() + account.encryption.services = MagicMock() + account.encryption.services.blob = MagicMock() + account.encryption.services.blob.enabled = True + mock_client.storage_accounts = MagicMock() + mock_client.storage_accounts.list.return_value = [account] + + f = check_2_1_storage_encryption(mock_client) + assert f.control_id == "2.1" + assert f.status == "PASS" + + def test_2_1_encryption_disabled_fails(self): + mock_client = MagicMock() + account = MagicMock() + account.name = "unencrypted" + account.encryption = MagicMock() + account.encryption.services = MagicMock() + account.encryption.services.blob = MagicMock() + account.encryption.services.blob.enabled = False + mock_client.storage_accounts = MagicMock() + mock_client.storage_accounts.list.return_value = [account] + + f = check_2_1_storage_encryption(mock_client) + assert f.status == "FAIL" + assert "unencrypted" in f.resources + + +class TestNetworkChecks: + def test_4_1_open_ssh_fails(self): + mock_client = MagicMock() + nsg = MagicMock() + nsg.name = "open-nsg" + nsg.id = "/subscriptions/sub/resourceGroups/rg/providers/Microsoft.Network/networkSecurityGroups/open-nsg" + rule = MagicMock() + rule.direction = "Inbound" + rule.access = "Allow" + rule.destination_port_range = "22" + rule.source_address_prefix = "*" + rule.protocol = "Tcp" + nsg.security_rules = [rule] + mock_client.network_security_groups = MagicMock() + mock_client.network_security_groups.list_all.return_value = [nsg] + + f = check_4_1_no_open_ssh(mock_client) + assert f.status == "FAIL" + + def test_4_1_restricted_ssh_passes(self): + mock_client = MagicMock() + nsg = MagicMock() + nsg.name = "restricted-nsg" + rule = MagicMock() + rule.direction = "Inbound" + rule.access = "Allow" + rule.destination_port_range = "22" + rule.source_address_prefix = "10.0.0.0/8" + rule.protocol = "Tcp" + nsg.security_rules = [rule] + mock_client.network_security_groups = MagicMock() + mock_client.network_security_groups.list_all.return_value = [nsg] + + f = check_4_1_no_open_ssh(mock_client) + assert f.status == "PASS" + + +class TestFindingStructure: + def test_finding_has_compliance(self): + mock_client = MagicMock() + mock_client.storage_accounts = MagicMock() + mock_client.storage_accounts.list.return_value = [] + + f = check_2_1_storage_encryption(mock_client) + assert f.nist_csf + assert f.control_id == "2.1" diff --git a/skills/cspm-gcp-cis-benchmark/tests/test_checks.py b/skills/cspm-gcp-cis-benchmark/tests/test_checks.py new file mode 100644 index 0000000..ae9c0c8 --- /dev/null +++ b/skills/cspm-gcp-cis-benchmark/tests/test_checks.py @@ -0,0 +1,104 @@ +"""Tests for CIS GCP Foundations Benchmark v3.0 checks. + +Uses unittest.mock to simulate GCP SDK responses. +""" + +from __future__ import annotations + +import os +import sys +from unittest.mock import MagicMock + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src")) + +from checks import check_1_1_no_gmail_accounts, check_1_3_no_sa_keys, check_2_3_no_public_buckets + + +class TestIAMChecks: + def test_1_1_gmail_found_fails(self): + mock_crm = MagicMock() + binding = MagicMock() + binding.role = "roles/editor" + binding.members = ["user:someone@gmail.com"] + policy = MagicMock() + policy.bindings = [binding] + mock_crm.get_iam_policy.return_value = policy + + f = check_1_1_no_gmail_accounts(mock_crm, "test-project") + assert f.status == "FAIL" + assert f.severity == "HIGH" + assert len(f.resources) == 1 + + def test_1_1_no_gmail_passes(self): + mock_crm = MagicMock() + binding = MagicMock() + binding.role = "roles/viewer" + binding.members = ["user:admin@company.com"] + policy = MagicMock() + policy.bindings = [binding] + mock_crm.get_iam_policy.return_value = policy + + f = check_1_1_no_gmail_accounts(mock_crm, "test-project") + assert f.status == "PASS" + + def test_1_3_sa_keys_found_fails(self): + mock_iam = MagicMock() + sa = MagicMock() + sa.name = "projects/test/serviceAccounts/sa@test.iam.gserviceaccount.com" + sa.email = "sa@test.iam.gserviceaccount.com" + mock_iam.list_service_accounts.return_value = [sa] + key = MagicMock() + mock_iam.list_service_account_keys.return_value = [key] + + f = check_1_3_no_sa_keys(mock_iam, "test-project") + assert f.status == "FAIL" + + def test_1_3_no_sa_keys_passes(self): + mock_iam = MagicMock() + sa = MagicMock() + sa.name = "projects/test/serviceAccounts/sa@test.iam.gserviceaccount.com" + sa.email = "sa@test.iam.gserviceaccount.com" + mock_iam.list_service_accounts.return_value = [sa] + mock_iam.list_service_account_keys.return_value = [] + + f = check_1_3_no_sa_keys(mock_iam, "test-project") + assert f.status == "PASS" + + +class TestStorageChecks: + def test_2_3_public_bucket_fails(self): + mock_storage = MagicMock() + bucket = MagicMock() + bucket.name = "public-bucket" + policy = MagicMock() + policy.bindings = [{"role": "roles/storage.objectViewer", "members": ["allUsers"]}] + bucket.get_iam_policy.return_value = policy + mock_storage.list_buckets.return_value = [bucket] + + f = check_2_3_no_public_buckets(mock_storage, "test-project") + assert f.status == "FAIL" + assert "public-bucket" in f.resources[0] + + def test_2_3_private_bucket_passes(self): + mock_storage = MagicMock() + bucket = MagicMock() + bucket.name = "private-bucket" + policy = MagicMock() + policy.bindings = [{"role": "roles/storage.objectViewer", "members": ["user:admin@company.com"]}] + bucket.get_iam_policy.return_value = policy + mock_storage.list_buckets.return_value = [bucket] + + f = check_2_3_no_public_buckets(mock_storage, "test-project") + assert f.status == "PASS" + + +class TestFindingStructure: + def test_finding_has_compliance_fields(self): + mock_crm = MagicMock() + policy = MagicMock() + policy.bindings = [] + mock_crm.get_iam_policy.return_value = policy + + f = check_1_1_no_gmail_accounts(mock_crm, "test-project") + assert f.nist_csf == "PR.AC-1" + assert f.control_id == "1.1" diff --git a/skills/gpu-cluster-security/examples/insecure-cluster.json b/skills/gpu-cluster-security/examples/insecure-cluster.json new file mode 100644 index 0000000..2b181f5 --- /dev/null +++ b/skills/gpu-cluster-security/examples/insecure-cluster.json @@ -0,0 +1,49 @@ +{ + "_comment": "Example: misconfigured GPU cluster — multiple checks will fail", + "pods": [ + { + "name": "training-gpu", + "security_context": { + "privileged": true, + "runAsUser": 0 + }, + "spec": { + "hostIPC": true + }, + "resources": { + "limits": { + "nvidia.com/gpu": 8 + } + }, + "volumes": [ + { + "hostPath": {"path": "/dev/nvidia0"} + }, + { + "name": "dshm", + "emptyDir": {"medium": "Memory"} + } + ] + } + ], + "nodes": [ + { + "name": "gpu-node-old", + "driver_version": "535.129.03", + "cuda_version": "11.8" + } + ], + "namespaces": [ + { + "name": "shared-gpu", + "shared": true, + "network_policies": [], + "resource_quota": {} + } + ], + "storage": { + "volumes": [ + {"name": "model-weights", "encrypted": false} + ] + } +} diff --git a/skills/gpu-cluster-security/examples/secure-cluster.json b/skills/gpu-cluster-security/examples/secure-cluster.json new file mode 100644 index 0000000..77cec19 --- /dev/null +++ b/skills/gpu-cluster-security/examples/secure-cluster.json @@ -0,0 +1,93 @@ +{ + "_comment": "Example: well-configured GPU cluster — all 13 checks pass", + "pods": [ + { + "name": "training-a100-8gpu", + "security_context": { + "privileged": false, + "runAsNonRoot": true, + "runAsUser": 1000, + "readOnlyRootFilesystem": true + }, + "resources": { + "limits": { + "nvidia.com/gpu": 8, + "memory": "512Gi", + "cpu": "64" + } + }, + "volumes": [ + { + "name": "dshm", + "emptyDir": { + "medium": "Memory", + "sizeLimit": "16Gi" + } + }, + { + "name": "model-weights", + "persistentVolumeClaim": { + "claimName": "model-pvc" + } + } + ] + } + ], + "nodes": [ + { + "name": "gpu-node-01", + "driver_version": "550.90.07", + "cuda_version": "12.6" + }, + { + "name": "gpu-node-02", + "driver_version": "550.90.07", + "cuda_version": "12.6" + } + ], + "network": { + "infiniband": { + "partitions": ["tenant-a-pkey", "tenant-b-pkey", "management-pkey"], + "tenant_isolation": true + } + }, + "namespaces": [ + { + "name": "tenant-a-training", + "network_policies": [ + {"name": "default-deny-ingress"}, + {"name": "allow-nccl-intra-namespace"} + ], + "resource_quota": { + "nvidia.com/gpu": 16, + "memory": "1Ti" + } + }, + { + "name": "tenant-b-inference", + "network_policies": [ + {"name": "default-deny-ingress"}, + {"name": "allow-inference-gateway"} + ], + "resource_quota": { + "nvidia.com/gpu": 4, + "memory": "128Gi" + } + } + ], + "storage": { + "encryption_at_rest": true, + "volumes": [ + {"name": "model-weights-a", "encrypted": true}, + {"name": "model-weights-b", "encrypted": true} + ] + }, + "monitoring": { + "dcgm": true, + "prometheus_gpu_exporter": true + }, + "logging": { + "gpu_workloads": true, + "audit_level": "RequestResponse" + } +} diff --git a/skills/k8s-security-benchmark/SKILL.md b/skills/k8s-security-benchmark/SKILL.md new file mode 100644 index 0000000..d71f42b --- /dev/null +++ b/skills/k8s-security-benchmark/SKILL.md @@ -0,0 +1,80 @@ +--- +name: k8s-security-benchmark +description: >- + Audit Kubernetes cluster and workload security. Checks pod security standards, + RBAC hygiene, network policies, secrets management, and image pinning. Works + with exported K8s resource JSON or live kubectl output. Use when the user + mentions Kubernetes security, pod security, RBAC audit, network policy check, + K8s hardening, or CIS Kubernetes benchmark. +license: Apache-2.0 +compatibility: >- + Requires Python 3.11+. No cloud SDKs needed — works with exported JSON/YAML. + Optional: kubectl for live cluster dumps. Read-only — no write permissions. +metadata: + author: msaad00 + homepage: https://github.com/msaad00/cloud-security + source: https://github.com/msaad00/cloud-security/tree/main/skills/k8s-security-benchmark + version: 0.1.0 + frameworks: + - CIS Kubernetes Benchmark + - NIST CSF 2.0 + cloud: any + optional_bins: + - kubectl +--- + +# Kubernetes Security Benchmark + +10 automated checks across 5 domains — pod security, RBAC, network policies, +secrets, and image management. Each check mapped to CIS Kubernetes Benchmark +and NIST CSF 2.0. + +## Architecture + +```mermaid +flowchart LR + K8S["K8s Resources\nPods · RBAC · NetworkPolicy\nSecrets · Images"] + BENCH["checks.py\n10 checks · 5 domains"] + OUT["JSON / Console"] + + K8S --> BENCH --> OUT + + style K8S fill:#1e293b,stroke:#475569,color:#e2e8f0 + style BENCH fill:#164e63,stroke:#22d3ee,color:#e2e8f0 +``` + +## Controls + +| # | Check | Severity | CIS K8s | +|---|-------|----------|---------| +| K8S-1.1 | No privileged pods | CRITICAL | 5.2.1 | +| K8S-1.2 | No host PID namespace | HIGH | 5.2.2 | +| K8S-1.3 | No host network | HIGH | 5.2.4 | +| K8S-1.4 | Drop ALL capabilities | MEDIUM | 5.2.7 | +| K8S-2.1 | No cluster-admin on default SA | CRITICAL | 5.1.1 | +| K8S-2.2 | No wildcard RBAC permissions | HIGH | 5.1.3 | +| K8S-3.1 | Default deny NetworkPolicy | HIGH | 5.3.2 | +| K8S-4.1 | Secrets not via env vars | MEDIUM | 5.4.1 | +| K8S-4.2 | Secrets encrypted at rest | HIGH | 5.4.2 | +| K8S-5.1 | No :latest image tags | MEDIUM | 5.5.1 | + +## Usage + +```bash +python src/checks.py cluster-config.json +python src/checks.py config.yaml --section pod_security +python src/checks.py config.json --output json +``` + +## Security Guardrails + +- **Read-only**: Analyzes exported configs. No kubectl write commands. +- **No cluster access required**: Works with JSON/YAML dumps. +- **Human-in-the-loop**: Assessment automated, remediation requires human. + +## Tests + +```bash +cd skills/k8s-security-benchmark +pytest tests/ -v -o "testpaths=tests" +``` diff --git a/skills/k8s-security-benchmark/examples/secure-cluster.json b/skills/k8s-security-benchmark/examples/secure-cluster.json new file mode 100644 index 0000000..fdbe999 --- /dev/null +++ b/skills/k8s-security-benchmark/examples/secure-cluster.json @@ -0,0 +1,30 @@ +{ + "_comment": "Example: hardened K8s cluster — all 10 checks pass", + "pods": [ + { + "name": "api-server", + "containers": [ + { + "name": "api", + "image": "myapp:v2.3.1-alpine", + "securityContext": { + "privileged": false, + "runAsNonRoot": true, + "readOnlyRootFilesystem": true, + "capabilities": {"drop": ["ALL"]} + }, + "resources": {"limits": {"cpu": "500m", "memory": "256Mi"}} + } + ] + } + ], + "namespaces": [ + {"name": "production", "network_policies": [{"name": "default-deny-ingress"}]}, + {"name": "staging", "network_policies": [{"name": "default-deny-all"}]} + ], + "cluster_role_bindings": [], + "cluster_roles": [ + {"name": "app-reader", "rules": [{"verbs": ["get", "list"], "resources": ["pods", "services"]}]} + ], + "api_server": {"encryption_config": "/etc/kubernetes/enc/enc.yaml"} +} diff --git a/skills/k8s-security-benchmark/src/checks.py b/skills/k8s-security-benchmark/src/checks.py new file mode 100644 index 0000000..0b49460 --- /dev/null +++ b/skills/k8s-security-benchmark/src/checks.py @@ -0,0 +1,343 @@ +"""Kubernetes Security Benchmark — audit K8s cluster and workload security. + +Checks pod security, RBAC hygiene, network policies, secrets management, +admission control, and API server configuration. Works with exported +Kubernetes resource JSON/YAML or live kubectl access. + +Read-only — no write permissions. Safe to run in production. +""" + +from __future__ import annotations + +import argparse +import json +import sys +from dataclasses import asdict, dataclass, field +from pathlib import Path + + +@dataclass +class Finding: + check_id: str + title: str + section: str + severity: str + status: str + detail: str = "" + remediation: str = "" + cis_k8s: str = "" + nist_csf: str = "" + resources: list[str] = field(default_factory=list) + + +# ═══════════════════════════════════════════════════════════════════════════ +# Section 1 — Pod Security +# ═══════════════════════════════════════════════════════════════════════════ + + +def check_1_1_no_privileged_pods(config: dict) -> Finding: + """K8S-1.1 — No pods running in privileged mode.""" + pods = config.get("pods", []) + privileged = [] + for pod in pods: + containers = pod.get("containers", pod.get("spec", {}).get("containers", [])) + for c in containers: + sec = c.get("securityContext", c.get("security_context", {})) + if sec.get("privileged", False): + privileged.append(f"{pod.get('name', 'unknown')}:{c.get('name', 'unknown')}") + return Finding( + check_id="K8S-1.1", + title="No privileged pods", + section="pod_security", + severity="CRITICAL", + status="FAIL" if privileged else "PASS", + detail=f"{len(privileged)} privileged containers" if privileged else "No privileged containers", + remediation="Remove privileged: true. Use specific capabilities instead.", + cis_k8s="5.2.1", + nist_csf="PR.AC-4", + resources=privileged, + ) + + +def check_1_2_no_host_pid(config: dict) -> Finding: + """K8S-1.2 — No pods sharing host PID namespace.""" + pods = config.get("pods", []) + host_pid = [p.get("name", "unknown") for p in pods if p.get("spec", p).get("hostPID", False)] + return Finding( + check_id="K8S-1.2", + title="No host PID namespace", + section="pod_security", + severity="HIGH", + status="FAIL" if host_pid else "PASS", + detail=f"{len(host_pid)} pods with hostPID" if host_pid else "No pods share host PID", + remediation="Set hostPID: false on all pod specs.", + cis_k8s="5.2.2", + nist_csf="PR.AC-4", + resources=host_pid, + ) + + +def check_1_3_no_host_network(config: dict) -> Finding: + """K8S-1.3 — No pods using host network.""" + pods = config.get("pods", []) + host_net = [p.get("name", "unknown") for p in pods if p.get("spec", p).get("hostNetwork", False)] + return Finding( + check_id="K8S-1.3", + title="No host network", + section="pod_security", + severity="HIGH", + status="FAIL" if host_net else "PASS", + detail=f"{len(host_net)} pods with hostNetwork" if host_net else "No pods use host network", + remediation="Set hostNetwork: false. Use Services and Ingress instead.", + cis_k8s="5.2.4", + nist_csf="PR.AC-5", + resources=host_net, + ) + + +def check_1_4_drop_all_capabilities(config: dict) -> Finding: + """K8S-1.4 — Containers drop ALL capabilities.""" + pods = config.get("pods", []) + no_drop = [] + for pod in pods: + containers = pod.get("containers", pod.get("spec", {}).get("containers", [])) + for c in containers: + sec = c.get("securityContext", c.get("security_context", {})) + caps = sec.get("capabilities", {}) + drop = caps.get("drop", []) + if "ALL" not in [d.upper() for d in drop]: + no_drop.append(f"{pod.get('name', 'unknown')}:{c.get('name', 'unknown')}") + return Finding( + check_id="K8S-1.4", + title="Drop ALL capabilities", + section="pod_security", + severity="MEDIUM", + status="FAIL" if no_drop else "PASS", + detail=f"{len(no_drop)} containers not dropping ALL" if no_drop else "All containers drop ALL capabilities", + remediation="Add securityContext.capabilities.drop: ['ALL'] to every container.", + cis_k8s="5.2.7", + nist_csf="PR.AC-4", + resources=no_drop, + ) + + +# ═══════════════════════════════════════════════════════════════════════════ +# Section 2 — RBAC +# ═══════════════════════════════════════════════════════════════════════════ + + +def check_2_1_no_cluster_admin_default(config: dict) -> Finding: + """K8S-2.1 — No ClusterRoleBinding to cluster-admin for default SA.""" + bindings = config.get("cluster_role_bindings", []) + dangerous = [] + for b in bindings: + role_ref = b.get("roleRef", {}) + if role_ref.get("name") == "cluster-admin": + for subj in b.get("subjects", []): + if subj.get("name") == "default" or subj.get("namespace") == "kube-system": + dangerous.append(b.get("name", "unknown")) + return Finding( + check_id="K8S-2.1", + title="No cluster-admin on default SA", + section="rbac", + severity="CRITICAL", + status="FAIL" if dangerous else "PASS", + detail=f"{len(dangerous)} bindings give cluster-admin to default/system" if dangerous else "No dangerous cluster-admin bindings", + remediation="Remove cluster-admin bindings from default service accounts. Use scoped roles.", + cis_k8s="5.1.1", + nist_csf="PR.AC-4", + resources=dangerous, + ) + + +def check_2_2_no_wildcard_permissions(config: dict) -> Finding: + """K8S-2.2 — No roles with wildcard (*) permissions.""" + roles = config.get("roles", []) + config.get("cluster_roles", []) + wildcard = [] + for role in roles: + for rule in role.get("rules", []): + verbs = rule.get("verbs", []) + resources = rule.get("resources", []) + if "*" in verbs or "*" in resources: + wildcard.append(role.get("name", "unknown")) + return Finding( + check_id="K8S-2.2", + title="No wildcard RBAC permissions", + section="rbac", + severity="HIGH", + status="FAIL" if wildcard else "PASS", + detail=f"{len(set(wildcard))} roles with wildcard" if wildcard else "No wildcard permissions", + remediation="Replace * verbs/resources with explicit least-privilege lists.", + cis_k8s="5.1.3", + nist_csf="PR.AC-4", + resources=list(set(wildcard)), + ) + + +# ═══════════════════════════════════════════════════════════════════════════ +# Section 3 — Network Policies +# ═══════════════════════════════════════════════════════════════════════════ + + +def check_3_1_default_deny(config: dict) -> Finding: + """K8S-3.1 — Default deny NetworkPolicy per namespace.""" + namespaces = config.get("namespaces", []) + no_deny = [] + for ns in namespaces: + policies = ns.get("network_policies", []) + has_deny = any("deny" in p.get("name", "").lower() for p in policies) + if not has_deny and not policies: + no_deny.append(ns.get("name", "unknown")) + return Finding( + check_id="K8S-3.1", + title="Default deny NetworkPolicy", + section="network", + severity="HIGH", + status="FAIL" if no_deny else "PASS", + detail=f"{len(no_deny)} namespaces without default deny" if no_deny else "All namespaces have deny policy", + remediation="Apply a default-deny NetworkPolicy to every namespace.", + cis_k8s="5.3.2", + nist_csf="PR.AC-5", + resources=no_deny, + ) + + +# ═══════════════════════════════════════════════════════════════════════════ +# Section 4 — Secrets +# ═══════════════════════════════════════════════════════════════════════════ + + +def check_4_1_no_env_secrets(config: dict) -> Finding: + """K8S-4.1 — Secrets not passed via environment variables.""" + pods = config.get("pods", []) + env_secrets = [] + for pod in pods: + containers = pod.get("containers", pod.get("spec", {}).get("containers", [])) + for c in containers: + for env in c.get("env", []): + if env.get("valueFrom", {}).get("secretKeyRef"): + env_secrets.append(f"{pod.get('name', 'unknown')}:{env.get('name', 'unknown')}") + return Finding( + check_id="K8S-4.1", + title="Secrets not via env vars", + section="secrets", + severity="MEDIUM", + status="FAIL" if env_secrets else "PASS", + detail=f"{len(env_secrets)} secrets exposed via env" if env_secrets else "No secrets in environment variables", + remediation="Mount secrets as volumes instead of env vars. Env vars appear in logs and process listings.", + cis_k8s="5.4.1", + nist_csf="PR.DS-5", + resources=env_secrets, + ) + + +def check_4_2_secrets_encrypted_etcd(config: dict) -> Finding: + """K8S-4.2 — Secrets encryption at rest configured.""" + api_server = config.get("api_server", {}) + encryption = api_server.get("encryption_config", api_server.get("encryption-provider-config", "")) + return Finding( + check_id="K8S-4.2", + title="Secrets encrypted at rest (etcd)", + section="secrets", + severity="HIGH", + status="PASS" if encryption else "FAIL", + detail="Encryption provider configured" if encryption else "No encryption-at-rest for secrets in etcd", + remediation="Configure EncryptionConfiguration with aescbc or kms provider.", + cis_k8s="5.4.2", + nist_csf="PR.DS-1", + ) + + +# ═══════════════════════════════════════════════════════════════════════════ +# Section 5 — Images +# ═══════════════════════════════════════════════════════════════════════════ + + +def check_5_1_no_latest_tag(config: dict) -> Finding: + """K8S-5.1 — No containers using :latest image tag.""" + pods = config.get("pods", []) + latest = [] + for pod in pods: + containers = pod.get("containers", pod.get("spec", {}).get("containers", [])) + for c in containers: + image = c.get("image", "") + if image.endswith(":latest") or ":" not in image: + latest.append(f"{pod.get('name', 'unknown')}:{image}") + return Finding( + check_id="K8S-5.1", + title="No :latest image tags", + section="images", + severity="MEDIUM", + status="FAIL" if latest else "PASS", + detail=f"{len(latest)} containers using :latest" if latest else "All images pinned to specific tags", + remediation="Pin images to SHA digests or semantic version tags. Never use :latest.", + cis_k8s="5.5.1", + nist_csf="PR.DS-6", + resources=latest, + ) + + +# ═══════════════════════════════════════════════════════════════════════════ +# Orchestrator +# ═══════════════════════════════════════════════════════════════════════════ + +ALL_CHECKS = { + "pod_security": [check_1_1_no_privileged_pods, check_1_2_no_host_pid, check_1_3_no_host_network, check_1_4_drop_all_capabilities], + "rbac": [check_2_1_no_cluster_admin_default, check_2_2_no_wildcard_permissions], + "network": [check_3_1_default_deny], + "secrets": [check_4_1_no_env_secrets, check_4_2_secrets_encrypted_etcd], + "images": [check_5_1_no_latest_tag], +} + + +def run_benchmark(config: dict, *, section: str | None = None) -> list[Finding]: + findings: list[Finding] = [] + sections = {section: ALL_CHECKS[section]} if section and section in ALL_CHECKS else ALL_CHECKS + for checks in sections.values(): + for check_fn in checks: + findings.append(check_fn(config)) + return findings + + +def print_summary(findings: list[Finding]) -> None: + total = len(findings) + passed = sum(1 for f in findings if f.status == "PASS") + failed = sum(1 for f in findings if f.status == "FAIL") + print(f"\n{'=' * 60}") + print(" Kubernetes Security Benchmark — Results") + print(f"{'=' * 60}\n") + current = "" + for f in findings: + if f.section != current: + current = f.section + print(f"\n [{current.upper()}]") + icon = {"PASS": "+", "FAIL": "x", "WARN": "!", "ERROR": "?", "SKIP": "-"}[f.status] + print(f" [{icon}] {f.check_id} [{f.severity:8s}] {f.title}") + if f.status == "FAIL": + print(f" {f.detail}") + if f.remediation: + print(f" FIX: {f.remediation}") + print(f"\n {'─' * 56}") + print(f" Total: {total} | Passed: {passed} | Failed: {failed}") + print(f" Pass rate: {passed / total * 100:.0f}%\n" if total else "") + + +def main() -> None: + parser = argparse.ArgumentParser(description="Kubernetes Security Benchmark") + parser.add_argument("config", help="Path to K8s config (JSON/YAML)") + parser.add_argument("--section", choices=list(ALL_CHECKS.keys())) + parser.add_argument("--output", choices=["console", "json"], default="console") + args = parser.parse_args() + p = Path(args.config) + content = p.read_text() + config = json.loads(content) if p.suffix == ".json" else __import__("yaml").safe_load(content) + findings = run_benchmark(config, section=args.section) + if args.output == "json": + print(json.dumps([asdict(f) for f in findings], indent=2)) + else: + print_summary(findings) + sys.exit(1 if any(f.status == "FAIL" and f.severity in ("CRITICAL", "HIGH") for f in findings) else 0) + + +if __name__ == "__main__": + main() diff --git a/skills/k8s-security-benchmark/tests/test_checks.py b/skills/k8s-security-benchmark/tests/test_checks.py new file mode 100644 index 0000000..85026bf --- /dev/null +++ b/skills/k8s-security-benchmark/tests/test_checks.py @@ -0,0 +1,124 @@ +"""Tests for Kubernetes security benchmark.""" + +from __future__ import annotations + +import os +import sys + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src")) + +from checks import Finding, run_benchmark + + +class TestPodSecurity: + def test_privileged_pod_fails(self): + config = {"pods": [{"name": "bad", "containers": [{"name": "c1", "securityContext": {"privileged": True}}]}]} + findings = run_benchmark(config, section="pod_security") + priv = next(f for f in findings if f.check_id == "K8S-1.1") + assert priv.status == "FAIL" + + def test_safe_pod_passes(self): + config = { + "pods": [ + { + "name": "good", + "containers": [ + {"name": "c1", "securityContext": {"privileged": False, "capabilities": {"drop": ["ALL"]}}}, + ], + } + ] + } + findings = run_benchmark(config, section="pod_security") + assert findings[0].status == "PASS" # not privileged + assert findings[3].status == "PASS" # drops ALL + + def test_host_pid_fails(self): + config = {"pods": [{"name": "bad", "spec": {"hostPID": True}}]} + findings = run_benchmark(config, section="pod_security") + pid = next(f for f in findings if f.check_id == "K8S-1.2") + assert pid.status == "FAIL" + + def test_host_network_fails(self): + config = {"pods": [{"name": "bad", "spec": {"hostNetwork": True}}]} + findings = run_benchmark(config, section="pod_security") + net = next(f for f in findings if f.check_id == "K8S-1.3") + assert net.status == "FAIL" + + +class TestRBAC: + def test_cluster_admin_default_fails(self): + config = { + "cluster_role_bindings": [ + {"name": "bad-binding", "roleRef": {"name": "cluster-admin"}, "subjects": [{"name": "default", "namespace": "default"}]} + ] + } + findings = run_benchmark(config, section="rbac") + admin = next(f for f in findings if f.check_id == "K8S-2.1") + assert admin.status == "FAIL" + + def test_wildcard_permissions_fails(self): + config = {"cluster_roles": [{"name": "too-broad", "rules": [{"verbs": ["*"], "resources": ["*"]}]}]} + findings = run_benchmark(config, section="rbac") + wc = next(f for f in findings if f.check_id == "K8S-2.2") + assert wc.status == "FAIL" + + +class TestNetwork: + def test_no_deny_policy_fails(self): + config = {"namespaces": [{"name": "default", "network_policies": []}]} + findings = run_benchmark(config, section="network") + assert findings[0].status == "FAIL" + + def test_deny_policy_passes(self): + config = {"namespaces": [{"name": "production", "network_policies": [{"name": "default-deny-ingress"}]}]} + findings = run_benchmark(config, section="network") + assert findings[0].status == "PASS" + + +class TestSecrets: + def test_env_secrets_fails(self): + config = { + "pods": [ + { + "name": "app", + "containers": [ + {"name": "c1", "env": [{"name": "DB_PASS", "valueFrom": {"secretKeyRef": {"name": "db", "key": "password"}}}]} + ], + } + ] + } + findings = run_benchmark(config, section="secrets") + env = next(f for f in findings if f.check_id == "K8S-4.1") + assert env.status == "FAIL" + + def test_no_etcd_encryption_fails(self): + config = {"api_server": {}} + findings = run_benchmark(config, section="secrets") + enc = next(f for f in findings if f.check_id == "K8S-4.2") + assert enc.status == "FAIL" + + +class TestImages: + def test_latest_tag_fails(self): + config = {"pods": [{"name": "app", "containers": [{"name": "c1", "image": "nginx:latest"}]}]} + findings = run_benchmark(config, section="images") + assert findings[0].status == "FAIL" + + def test_pinned_tag_passes(self): + config = {"pods": [{"name": "app", "containers": [{"name": "c1", "image": "nginx:1.25.3-alpine"}]}]} + findings = run_benchmark(config, section="images") + assert findings[0].status == "PASS" + + +class TestRunner: + def test_run_all(self): + config = {"pods": [], "namespaces": [], "api_server": {}} + findings = run_benchmark(config) + assert len(findings) == 10 + assert all(isinstance(f, Finding) for f in findings) + + def test_all_have_cis_mapping(self): + config = {"pods": [{"name": "test", "containers": [{"name": "c", "image": "app:v1"}]}]} + findings = run_benchmark(config) + for f in findings: + assert f.cis_k8s, f"{f.check_id} missing CIS K8s mapping" diff --git a/skills/model-serving-security/examples/insecure-serving.json b/skills/model-serving-security/examples/insecure-serving.json new file mode 100644 index 0000000..06cb0f0 --- /dev/null +++ b/skills/model-serving-security/examples/insecure-serving.json @@ -0,0 +1,31 @@ +{ + "_comment": "Example: misconfigured model serving — multiple checks will fail", + "endpoints": [ + { + "name": "inference-api", + "url": "http://model.public:8080/v1/completions", + "auth": { + "type": "none" + }, + "visibility": "public", + "network": { + "public": true + } + } + ], + "containers": [ + { + "name": "model-server", + "security_context": { + "privileged": true, + "runAsUser": 0 + } + } + ], + "models": [ + { + "name": "my-model", + "version": "latest" + } + ] +} diff --git a/skills/model-serving-security/examples/secure-serving.json b/skills/model-serving-security/examples/secure-serving.json new file mode 100644 index 0000000..07e8d08 --- /dev/null +++ b/skills/model-serving-security/examples/secure-serving.json @@ -0,0 +1,61 @@ +{ + "_comment": "Example: well-configured model serving infrastructure — all 16 checks pass", + "endpoints": [ + { + "name": "inference-api", + "url": "https://model.internal:8443/v1/completions", + "auth": { + "type": "oauth2", + "enabled": true, + "roles": ["admin", "inference-user", "read-only"] + }, + "rate_limit": { + "enabled": true, + "rpm": 100, + "rpd": 10000 + }, + "limits": { + "max_tokens": 4096, + "max_input_size": 1048576 + }, + "tls": { + "enabled": true, + "min_version": "1.2" + }, + "network": { + "vpc": true, + "private": true + } + } + ], + "containers": [ + { + "name": "model-server", + "security_context": { + "privileged": false, + "readOnlyRootFilesystem": true, + "runAsNonRoot": true, + "runAsUser": 1000 + } + } + ], + "safety": { + "prompt_injection": true, + "content_classification": true, + "output_filter": true, + "categories": ["violence", "hate", "self-harm", "sexual"] + }, + "privacy": { + "memorization_guard": true + }, + "logging": { + "log_requests": true, + "redact_pii": true + }, + "models": [ + { + "name": "claude-3.5-sonnet", + "version": "20241022" + } + ] +} diff --git a/skills/vuln-remediation-pipeline/tests/test_triage.py b/skills/vuln-remediation-pipeline/tests/test_triage.py new file mode 100644 index 0000000..037c46e --- /dev/null +++ b/skills/vuln-remediation-pipeline/tests/test_triage.py @@ -0,0 +1,108 @@ +"""Tests for vulnerability triage logic — EPSS/KEV/CVSS classification.""" + +from __future__ import annotations + +import os +import sys +from unittest.mock import patch + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src")) + +from lambda_triage.handler import Tier, TriagedFinding, classify, parse_sarif, triage + + +class TestClassify: + def test_kev_is_p0(self): + finding = {"is_kev": True, "cvss_score": 5.0, "epss_score": 0.1} + assert classify(finding) == Tier.P0 + + def test_cvss_9_is_p0(self): + finding = {"is_kev": False, "cvss_score": 9.5, "epss_score": 0.1} + assert classify(finding) == Tier.P0 + + def test_high_cvss_high_epss_is_p1(self): + finding = {"is_kev": False, "cvss_score": 7.5, "epss_score": 0.8} + assert classify(finding) == Tier.P1 + + def test_medium_cvss_is_p2(self): + finding = {"is_kev": False, "cvss_score": 5.0, "epss_score": 0.4} + assert classify(finding) == Tier.P2 + + def test_low_everything_is_p3(self): + finding = {"is_kev": False, "cvss_score": 2.0, "epss_score": 0.1} + assert classify(finding) == Tier.P3 + + def test_missing_scores_default_p3(self): + finding = {} + assert classify(finding) == Tier.P3 + + +class TestParseSarif: + def test_parse_empty_sarif(self): + sarif = {"runs": [{"results": []}]} + findings = parse_sarif(sarif) + assert findings == [] + + def test_parse_sarif_with_result(self): + sarif = { + "runs": [ + { + "results": [ + { + "ruleId": "CVE-2024-1234", + "message": {"text": "Vulnerability in express"}, + "level": "error", + "properties": {"cvss_score": 7.5, "epss_score": 0.6, "is_kev": False}, + } + ] + } + ] + } + findings = parse_sarif(sarif) + assert len(findings) == 1 + assert findings[0]["vulnerability_id"] == "CVE-2024-1234" + + +class TestTriage: + @patch("lambda_triage.handler._is_already_remediated", return_value=False) + @patch("lambda_triage.handler._load_protected_packages", return_value=set()) + def test_triage_classifies_all(self, mock_protected, mock_remediated): + findings = [ + {"vulnerability_id": "CVE-1", "is_kev": True, "cvss_score": 9.8, "epss_score": 0.95, "package": "express"}, + {"vulnerability_id": "CVE-2", "is_kev": False, "cvss_score": 3.0, "epss_score": 0.05, "package": "lodash"}, + ] + triaged = triage(findings) + tiers = {t.tier for t in triaged if not t.skipped} + assert Tier.P0 in tiers + assert Tier.P3 in tiers + + @patch("lambda_triage.handler._is_already_remediated", return_value=True) + @patch("lambda_triage.handler._load_protected_packages", return_value=set()) + def test_already_remediated_skipped(self, mock_protected, mock_remediated): + findings = [{"vulnerability_id": "CVE-1", "is_kev": True, "cvss_score": 9.8, "epss_score": 0.95, "package": "express"}] + triaged = triage(findings) + assert all(t.skipped for t in triaged) + + @patch("lambda_triage.handler._is_already_remediated", return_value=False) + @patch("lambda_triage.handler._load_protected_packages", return_value={"lodash"}) + def test_protected_package_skipped(self, mock_protected, mock_remediated): + findings = [{"vulnerability_id": "CVE-1", "is_kev": False, "cvss_score": 5.0, "epss_score": 0.3, "package": "lodash"}] + triaged = triage(findings) + assert all(t.skipped for t in triaged) + + +class TestTriagedFinding: + def test_triaged_finding_fields(self): + tf = TriagedFinding( + vulnerability_id="CVE-2024-1234", + package="express", + tier=Tier.P0, + cvss_score=9.8, + epss_score=0.95, + is_kev=True, + skipped=False, + skip_reason="", + ) + assert tf.tier == Tier.P0 + assert tf.is_kev is True + assert not tf.skipped