Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
241 changes: 199 additions & 42 deletions scraper.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Dict, List, Optional, Set, Tuple
from typing import Dict, List, Optional, Sequence, Set, Tuple
from urllib.parse import urljoin

import fitz
Expand Down Expand Up @@ -66,6 +66,7 @@
PDF_FETCH_CONCURRENCY = max(1, int(os.getenv("PDF_FETCH_CONCURRENCY", "32")))
CERT_PROCESS_TIMEOUT = max(1, int(os.getenv("CERT_PROCESS_TIMEOUT", "900")))
FULL_REFRESH = os.getenv("FULL_REFRESH", "0") == "1"
DEFERRED_REASON_KEY = "_deferred_reason"

# Path to NIST-CMVP-ReportGen database (if available for importing algorithms)
CMVP_DB_PATH = os.getenv("CMVP_DB_PATH", "")
Expand Down Expand Up @@ -309,6 +310,7 @@
"algorithm_source_database",
"algorithm_source_none",
"certificate_timeouts",
"certificates_deferred",
)


Expand Down Expand Up @@ -886,6 +888,7 @@ def load_previous_outputs(output_dir: Path = Path("api")) -> Dict[str, object]:
"""Load previously generated module rows, certificate details, and metadata."""
active_data = load_json_file(output_dir / "modules.json") or {}
historical_data = load_json_file(output_dir / "historical-modules.json") or {}
modules_in_process_data = load_json_file(output_dir / "modules-in-process.json") or {}
metadata = load_json_file(output_dir / "metadata.json") or {}
detail_payloads: Dict[int, Dict] = {}

Expand All @@ -907,6 +910,11 @@ def build_module_map(records: List[Dict]) -> Dict[int, Dict]:

return {
"metadata": metadata,
"module_lists": {
"active": copy.deepcopy(active_data.get("modules", [])),
"historical": copy.deepcopy(historical_data.get("modules", [])),
},
"modules_in_process": copy.deepcopy(modules_in_process_data.get("modules_in_process", [])),
"modules": {
"active": build_module_map(active_data.get("modules", [])),
"historical": build_module_map(historical_data.get("modules", [])),
Expand Down Expand Up @@ -981,6 +989,13 @@ def should_reuse_certificate_detail(
return all(field in previous_detail for field in DETAIL_SCHEMA_REQUIRED_FIELDS)


def has_reusable_certificate_detail(previous_detail: Optional[Dict]) -> bool:
"""Return whether a cached detail payload is complete enough to publish."""
if FULL_REFRESH or not isinstance(previous_detail, dict):
return False
return all(field in previous_detail for field in DETAIL_SCHEMA_REQUIRED_FIELDS)


def prepare_reused_detail_payload(
previous_detail: Dict,
module: Dict,
Expand Down Expand Up @@ -1047,6 +1062,28 @@ def should_reuse_cached_algorithms(
return bool(categories or detailed)


def can_fallback_to_cached_algorithms(
algorithm_source: str,
previous_metadata: Dict,
previous_module: Optional[Dict],
previous_detail: Optional[Dict],
) -> bool:
"""Return whether a failed extraction can preserve the last published payload."""
if FULL_REFRESH or algorithm_source not in CACHEABLE_ALGORITHM_SOURCES:
return False
if previous_metadata.get("algorithm_source") not in CACHEABLE_ALGORITHM_SOURCES:
return False
if previous_module is None and previous_detail is None:
return False

previous_cache_version = previous_metadata.get("algorithm_cache_version")
if previous_cache_version == ALGORITHM_CACHE_VERSION:
return True

categories, detailed = cached_algorithm_fields(previous_module, previous_detail)
return bool(categories or detailed)


def prune_orphan_certificate_details(current_cert_numbers: Set[int], detail_dir: Path = DETAIL_DIR) -> int:
"""
Remove stale certificate detail files for certs no longer present upstream.
Expand Down Expand Up @@ -1742,10 +1779,42 @@ async def process_certificate_record(
)
stats["html_refreshed"] += 1
except Exception as exc:
stats["html_failed"] += 1
print(f"Warning: Failed to parse certificate {cert_number}: {exc}", file=sys.stderr)
else:
stats["html_failed"] += 1
if detail_payload is None:
if has_reusable_certificate_detail(previous_detail):
detail_payload = prepare_reused_detail_payload(
previous_detail,
module,
cert_number,
dataset,
generated_at,
)
stats["html_reused"] += 1
print(
f"Warning: Preserving cached detail for certificate {cert_number} after refresh failure.",
file=sys.stderr,
)
else:
stats["certificates_deferred"] += 1
strip_algorithm_fields(module_out)
apply_algorithm_extraction_provenance(
module_out,
build_algorithm_extraction_provenance(
algorithm_source,
"skipped",
"none",
None,
[],
[],
),
)
module_out["detail_available"] = False
module_out[DEFERRED_REASON_KEY] = "certificate_detail_unavailable"
print(
f"Warning: Deferring certificate {cert_number}; detail page is unavailable and no cache exists.",
file=sys.stderr,
)
return module_out, None, [], stats

if detail_payload:
module_out = dict(previous_module or {})
Expand Down Expand Up @@ -1845,9 +1914,45 @@ async def process_certificate_record(
stats["algorithm_source_security_policy_pdf"] += 1
if extraction_result.fallback_used:
stats["algorithm_fallbacks"] += 1
elif can_fallback_to_cached_algorithms(
algorithm_source,
previous_metadata,
previous_module,
previous_detail,
):
categories, detailed = cached_algorithm_fields(previous_module, previous_detail)
cached_source, cached_source_url = cached_algorithm_extraction_source(
previous_module,
previous_detail,
previous_metadata,
)
extraction_provenance = build_algorithm_extraction_provenance(
algorithm_source,
"cached",
cached_source,
cached_source_url,
categories,
detailed,
cached=True,
fallback_used=extraction_result.fallback_used,
attempts=extraction_result.attempts,
)
stats["pdf_reused"] += 1
stats["algorithm_cache_hits"] += 1
if categories or detailed:
stats["algorithm_successes"] += 1
print(
f"Warning: Preserving cached algorithms for certificate {cert_number} after extraction failure.",
file=sys.stderr,
)
else:
stats["pdf_failed"] += 1
stats["algorithm_misses"] += 1
stats["certificates_deferred"] += 1
module_out[DEFERRED_REASON_KEY] = "algorithm_unavailable"
print(
f"Warning: Deferring certificate {cert_number}; algorithms are unavailable and no cache exists.",
file=sys.stderr,
)
return module_out, detail_payload, [], stats

if detail_payload:
apply_algorithm_fields(detail_payload, categories, detailed)
Expand Down Expand Up @@ -1886,7 +1991,6 @@ def build_certificate_timeout_result(
) -> Tuple[Dict, Optional[Dict], List[str], Dict[str, int]]:
"""Build a bounded fallback result when one certificate exceeds the timeout."""
stats = new_processing_stats()
stats["certificate_timeouts"] += 1

cert_number = parse_certificate_number(module)
module_out = dict(previous_module or {})
Expand Down Expand Up @@ -1946,15 +2050,11 @@ def build_certificate_timeout_result(
module_out["detail_available"] = True
return module_out, detail_payload, categories, stats

stats["html_failed"] += 1
if algorithm_source in CACHEABLE_ALGORITHM_SOURCES:
stats["pdf_failed"] += 1
if algorithm_source != "none":
stats["algorithm_misses"] += 1
stats["certificates_deferred"] += 1
strip_algorithm_fields(module_out)
provenance = build_algorithm_extraction_provenance(
algorithm_source,
"miss",
"skipped",
"timeout",
source_url,
[],
Expand All @@ -1963,6 +2063,7 @@ def build_certificate_timeout_result(
)
apply_algorithm_extraction_provenance(module_out, provenance)
module_out["detail_available"] = False
module_out[DEFERRED_REASON_KEY] = "certificate_timeout"
return module_out, None, [], stats


Expand Down Expand Up @@ -2098,8 +2199,24 @@ def schedule_next_certificate() -> None:
for task in done:
index, module_out, detail_payload, categories, task_stats = await task
completed += 1
results[index] = module_out
cert_number = parse_certificate_number(module_out)
deferred_reason = module_out.pop(DEFERRED_REASON_KEY, None)
if deferred_reason:
print(
f" Deferred certificate {cert_number or 'unknown'} ({dataset}): {deferred_reason}",
file=sys.stderr,
)
add_processing_stats(stats, task_stats)
schedule_next_certificate()
if completed % 100 == 0 or completed == total:
print(
f" Progress: {completed}/{total} "
f"({stats['html_reused']} reused, {stats['html_refreshed']} refreshed, "
f"{stats['html_failed']} failed, {stats['certificates_deferred']} deferred)"
)
continue

results[index] = module_out
if cert_number is not None and detail_payload is not None:
payloads[cert_number] = detail_payload
if cert_number is not None and categories:
Expand All @@ -2109,10 +2226,11 @@ def schedule_next_certificate() -> None:
if completed % 100 == 0 or completed == total:
print(
f" Progress: {completed}/{total} "
f"({stats['html_reused']} reused, {stats['html_refreshed']} refreshed, {stats['html_failed']} failed)"
f"({stats['html_reused']} reused, {stats['html_refreshed']} refreshed, "
f"{stats['html_failed']} failed, {stats['certificates_deferred']} deferred)"
)

return [result or {} for result in results], payloads, algorithms_map, stats
return [result for result in results if result], payloads, algorithms_map, stats


def parse_modules_table(html: str) -> List[Dict]:
Expand Down Expand Up @@ -3013,6 +3131,35 @@ def validate_module_count(modules: List[Dict], label: str, min_expected: int = 1
sys.exit(1)


def module_rows_with_cache_fallback(
scraped_modules: List[Dict],
previous_modules: Sequence[Dict],
label: str,
min_expected: int,
) -> Tuple[List[Dict], bool]:
"""
Return scraped rows unless the top-level NIST list is unusable.

Scheduled updates should not publish an empty or partial top-level dataset
just because NIST rejected one request. When a previous checked-in dataset is
available, reuse it and let per-certificate cache validation keep artifacts
internally consistent.
"""
if len(scraped_modules) >= min_expected:
return scraped_modules, False

if FULL_REFRESH or not previous_modules:
validate_module_count(scraped_modules, label, min_expected)
return scraped_modules, False

print(
f"Warning: Only {len(scraped_modules)} {label} scraped; "
f"preserving {len(previous_modules)} checked-in rows for this run.",
file=sys.stderr,
)
return copy.deepcopy(list(previous_modules)), True


def format_count(value: int) -> str:
"""Format integer counts with thousands separators."""
return f"{value:,}"
Expand Down Expand Up @@ -4730,47 +4877,55 @@ def main():

generated_at = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ")

previous_outputs = load_previous_outputs() if not FULL_REFRESH else {
"metadata": {},
"module_lists": {"active": [], "historical": []},
"modules_in_process": [],
"modules": {"active": {}, "historical": {}},
"details": {},
}
if not FULL_REFRESH:
cached_detail_count = len(previous_outputs.get("details", {}))
print(f"Loaded {cached_detail_count} cached certificate detail records for reuse checks")

# Scrape all validated modules
print("Scraping validated modules...")
modules = scrape_all_modules()

if not modules:
print("No validated modules found!", file=sys.stderr)
sys.exit(1)

# Validate module counts to prevent silent data loss
validate_module_count(modules, "validated modules", min_expected=100)
print(f"\nTotal validated modules scraped: {len(modules)}")
modules, _ = module_rows_with_cache_fallback(
modules,
previous_outputs.get("module_lists", {}).get("active", []),
"validated modules",
min_expected=100,
)
print(f"\nTotal validated modules available: {len(modules)}")

# Scrape historical modules
print("\nScraping historical modules...")
historical_modules = scrape_historical_modules()

validate_module_count(historical_modules, "historical modules", min_expected=500)
print(f"Total historical modules scraped: {len(historical_modules)}")
historical_modules, _ = module_rows_with_cache_fallback(
historical_modules,
previous_outputs.get("module_lists", {}).get("historical", []),
"historical modules",
min_expected=500,
)
print(f"Total historical modules available: {len(historical_modules)}")

# Scrape modules in process
print("\nScraping modules in process...")
modules_in_process = scrape_modules_in_process()

# Lower threshold for in-process — this list is naturally smaller and more variable
validate_module_count(modules_in_process, "modules in process", min_expected=20)
print(f"Total modules in process scraped: {len(modules_in_process)}")
modules_in_process, _ = module_rows_with_cache_fallback(
modules_in_process,
previous_outputs.get("modules_in_process", []),
"modules in process",
min_expected=20,
)
print(f"Total modules in process available: {len(modules_in_process)}")

# Add security policy and detail URLs to all modules
print("\nEnriching modules with URLs...")
modules = enrich_modules_with_urls(modules)
historical_modules = enrich_modules_with_urls(historical_modules)

previous_outputs = load_previous_outputs() if not FULL_REFRESH else {
"metadata": {},
"modules": {"active": {}, "historical": {}},
"details": {},
}
if not FULL_REFRESH:
cached_detail_count = len(previous_outputs.get("details", {}))
print(f"Loaded {cached_detail_count} cached certificate detail records for reuse checks")

database_algorithms_map: Dict[int, List[str]] = {}
if algorithm_source == "database":
print("\nImporting algorithms from database...")
Expand Down Expand Up @@ -4969,11 +5124,13 @@ def main():
print(f" - Algorithm source: {algorithm_source}")
print(
" - Active detail reuse: "
f"{active_stats['html_reused']} reused, {active_stats['html_refreshed']} refreshed, {active_stats['html_failed']} failed"
f"{active_stats['html_reused']} reused, {active_stats['html_refreshed']} refreshed, "
f"{active_stats['html_failed']} failed, {active_stats['certificates_deferred']} deferred"
)
print(
" - Historical detail reuse: "
f"{historical_stats['html_reused']} reused, {historical_stats['html_refreshed']} refreshed, {historical_stats['html_failed']} failed"
f"{historical_stats['html_reused']} reused, {historical_stats['html_refreshed']} refreshed, "
f"{historical_stats['html_failed']} failed, {historical_stats['certificates_deferred']} deferred"
)
if algorithm_source in CACHEABLE_ALGORITHM_SOURCES:
print(
Expand Down
Loading