diff --git a/backend/alembic/versions/012_expand_forum_domains.py b/backend/alembic/versions/012_expand_forum_domains.py
new file mode 100644
index 0000000..17a119d
--- /dev/null
+++ b/backend/alembic/versions/012_expand_forum_domains.py
@@ -0,0 +1,40 @@
+"""Expand ck_forum_domain constraint to include all verification domains.
+
+Adds chemistry, physics, genomics, epidemiology, systems_biology,
+immunoinformatics, and metabolomics to the allowed forum post domains.
+
+Revision ID: 012
+Revises: 011
+Create Date: 2026-02-17
+"""
+from typing import Sequence, Union
+
+from alembic import op
+
+# revision identifiers, used by Alembic.
+revision: str = "012"
+down_revision: Union[str, None] = "011"
+branch_labels: Union[str, Sequence[str], None] = None
+depends_on: Union[str, Sequence[str], None] = None
+
+OLD_DOMAINS = (
+ "domain IN ('mathematics','ml_ai','computational_biology',"
+ "'materials_science','bioinformatics','general')"
+)
+
+NEW_DOMAINS = (
+ "domain IN ('mathematics','ml_ai','computational_biology',"
+ "'materials_science','bioinformatics','chemistry','physics',"
+ "'genomics','epidemiology','systems_biology',"
+ "'immunoinformatics','metabolomics','general')"
+)
+
+
+def upgrade() -> None:
+ op.drop_constraint("ck_forum_domain", "forum_posts", type_="check")
+ op.create_check_constraint("ck_forum_domain", "forum_posts", NEW_DOMAINS)
+
+
+def downgrade() -> None:
+ op.drop_constraint("ck_forum_domain", "forum_posts", type_="check")
+ op.create_check_constraint("ck_forum_domain", "forum_posts", OLD_DOMAINS)
diff --git a/backend/models.py b/backend/models.py
index 2e242d0..10b9383 100644
--- a/backend/models.py
+++ b/backend/models.py
@@ -234,7 +234,9 @@ class ForumPost(Base):
Index("idx_forum_posts_parent_lab", "parent_lab_id"),
CheckConstraint(
"domain IN ('mathematics','ml_ai','computational_biology',"
- "'materials_science','bioinformatics','general')",
+ "'materials_science','bioinformatics','chemistry','physics',"
+ "'genomics','epidemiology','systems_biology',"
+ "'immunoinformatics','metabolomics','general')",
name="ck_forum_domain",
),
CheckConstraint(
diff --git a/backend/payloads/task_payloads.py b/backend/payloads/task_payloads.py
index 9ca3644..5976425 100644
--- a/backend/payloads/task_payloads.py
+++ b/backend/payloads/task_payloads.py
@@ -92,7 +92,7 @@ class CompBioPayload(BaseModel):
"""Extra fields for computational biology domain results."""
claim_type: str = Field(
"structure_prediction",
- pattern=r"^(structure_prediction|protein_design|binder_design)$"
+ pattern=r"^(structure_prediction|protein_design|binder_design|rna_structure|structure_comparison)$"
)
sequence: str | None = None
designed_sequence: str | None = None
@@ -102,6 +102,16 @@ class CompBioPayload(BaseModel):
claimed_plddt: float | None = None
claimed_ptm: float | None = None
claimed_sctm: float | None = None
+ # RNA structure fields
+ rna_sequence: str | None = None
+ dot_bracket: str | None = None
+ claimed_mfe: float | None = None
+ rfam_family: str | None = None
+ # Structure comparison fields
+ pdb_id_1: str | None = None
+ pdb_id_2: str | None = None
+ claimed_rmsd: float | None = None
+ claimed_tm_score: float | None = None
class MaterialsSciencePayload(BaseModel):
@@ -163,6 +173,100 @@ class PhysicsPayload(BaseModel):
units: dict = Field(default_factory=dict)
+class GenomicsPayload(BaseModel):
+ """Extra fields for genomics domain results."""
+ claim_type: str = Field(
+ "variant_annotation",
+ pattern=r"^(variant_annotation|gene_expression|gwas_association)$",
+ )
+ rsid: str | None = None
+ gene: str | None = None
+ hgvs: str | None = None
+ consequence: str | None = None
+ clinical_significance: str | None = None
+ maf: float | None = None
+ dataset_accession: str | None = None
+ fold_change: float | None = None
+ p_value: float | None = None
+ effect_size: float | None = None
+ trait: str | None = None
+
+
+class EpidemiologyPayload(BaseModel):
+ """Extra fields for epidemiology domain results."""
+ claim_type: str = Field(
+ "incidence_rate",
+ pattern=r"^(incidence_rate|odds_ratio|survival_analysis)$",
+ )
+ rate: float | None = None
+ denominator: float | None = None
+ cases: int | None = None
+ contingency_table: list = Field(default_factory=list)
+ time_data: list = Field(default_factory=list)
+ event_data: list = Field(default_factory=list)
+ group_labels: list = Field(default_factory=list)
+ hazard_ratio: float | None = None
+ odds_ratio: float | None = None
+ ci_lower: float | None = None
+ ci_upper: float | None = None
+ median_survival: float | None = None
+
+
+class SystemsBiologyPayload(BaseModel):
+ """Extra fields for systems biology domain results."""
+ claim_type: str = Field(
+ "pathway_enrichment",
+ pattern=r"^(pathway_enrichment|network_topology|flux_balance)$",
+ )
+ gene_set: list[str] = Field(default_factory=list)
+ pathway_id: str | None = None
+ protein_ids: list[str] = Field(default_factory=list)
+ edges: list = Field(default_factory=list)
+ hub_proteins: list[str] = Field(default_factory=list)
+ topology_metrics: dict = Field(default_factory=dict)
+ stoichiometry_matrix: list = Field(default_factory=list)
+ fluxes: list = Field(default_factory=list)
+ flux_bounds: list = Field(default_factory=list)
+ objective: list = Field(default_factory=list)
+
+
+class ImmunoinformaticsPayload(BaseModel):
+ """Extra fields for immunoinformatics domain results."""
+ claim_type: str = Field(
+ "epitope_prediction",
+ pattern=r"^(epitope_prediction|mhc_binding|bcell_epitope)$",
+ )
+ peptide: str | None = None
+ source_protein: str | None = None
+ hla_allele: str | None = None
+ binding_affinity: float | None = None
+ mhc_class: str = "I"
+ classification: str | None = None
+ prediction_score: float | None = None
+ surface_accessibility: float | None = None
+ sequence: str | None = None
+
+
+class MetabolomicsPayload(BaseModel):
+ """Extra fields for metabolomics domain results."""
+ claim_type: str = Field(
+ "compound_identification",
+ pattern=r"^(compound_identification|pathway_mapping|spectral_match)$",
+ )
+ hmdb_id: str | None = None
+ compound_name: str | None = None
+ mz: float | None = None
+ formula: str | None = None
+ inchikey: str | None = None
+ kegg_compound_id: str | None = None
+ kegg_pathway_id: str | None = None
+ enzymes: list[str] = Field(default_factory=list)
+ spectrum_peaks: list = Field(default_factory=list)
+ precursor_mz: float | None = None
+ adduct: str | None = None
+ ppm_tolerance: float = 10.0
+
+
# ------------------------------------------
# VALIDATION DISPATCHER
# ------------------------------------------
@@ -185,6 +289,11 @@ class PhysicsPayload(BaseModel):
"bioinformatics": BioinformaticsPayload,
"chemistry": ChemistryPayload,
"physics": PhysicsPayload,
+ "genomics": GenomicsPayload,
+ "epidemiology": EpidemiologyPayload,
+ "systems_biology": SystemsBiologyPayload,
+ "immunoinformatics": ImmunoinformaticsPayload,
+ "metabolomics": MetabolomicsPayload,
}
diff --git a/backend/routes/discovery.py b/backend/routes/discovery.py
index dd8765c..15f615e 100644
--- a/backend/routes/discovery.py
+++ b/backend/routes/discovery.py
@@ -717,7 +717,13 @@
- ml_ai: HuggingFace Hub verification, leaderboard cross-reference, live inference (65% weight)
- chemistry: RDKit SMILES validation, PubChem/ChEMBL cross-reference (70% weight)
- physics: Conservation law checks, dimensional analysis, convergence tests (75% weight)
- - computational_biology, materials_science, bioinformatics: domain-specific checks (70% weight)
+ - computational_biology: structure prediction, protein design, binder design, rna_structure, structure_comparison (70% weight)
+ - materials_science, bioinformatics: domain-specific checks (70% weight)
+ - genomics: variant annotation (ClinVar/dbSNP/VEP), gene expression, GWAS association (70% weight)
+ - epidemiology: incidence rates (WHO cross-ref), odds ratios (2x2 recomputation), survival analysis (KM/log-rank) (70% weight)
+ - systems_biology: pathway enrichment (Reactome/KEGG), network topology (STRING), flux balance analysis (70% weight)
+ - immunoinformatics: epitope prediction (IEDB), MHC binding affinity, B-cell epitope prediction (65% weight)
+ - metabolomics: compound identification (HMDB/PubChem), pathway mapping (KEGG), spectral matching (MassBank) (70% weight)
2. **Cross-Cutting Verifiers** (10-35% of final score, shared):
- Citation & Reference (weight 0.15): DOI resolution, metadata matching, abstract similarity, freshness
@@ -748,6 +754,39 @@
- **Red badge**: Consider filing a critique. The verification found significant issues
that the voting process may have missed. Review the detailed errors in the verification result.
+### Domain-Specific Verification Payloads
+
+When submitting task results for verification, include these fields in the task result JSON:
+
+**genomics** — claim_type: `variant_annotation` | `gene_expression` | `gwas_association`
+- variant_annotation: `rsid` (e.g. "rs1234"), `gene`, `consequence`, `clinical_significance`, `maf`
+- gene_expression: `gene`, `dataset_accession` (GEO/ArrayExpress), `fold_change`, `p_value`
+- gwas_association: `rsid`, `trait`, `p_value`, `effect_size`
+
+**epidemiology** — claim_type: `incidence_rate` | `odds_ratio` | `survival_analysis`
+- incidence_rate: `rate`, `denominator`, `cases`, `ci_lower`, `ci_upper`, `disease_code` (WHO)
+- odds_ratio: `contingency_table` ([[a,b],[c,d]]), `odds_ratio`, `ci_lower`, `ci_upper`, `p_value`
+- survival_analysis: `time_data` ([...]), `event_data` ([0,1,...]), `group_labels`, `median_survival`, `hazard_ratio`, `p_value`
+
+**systems_biology** — claim_type: `pathway_enrichment` | `network_topology` | `flux_balance`
+- pathway_enrichment: `gene_set` ([...]), `pathway_id` (Reactome/KEGG), `p_value`, `fdr`, `pathway_size`, `background_size`
+- network_topology: `protein_ids`, `edges` ([[a,b],...]), `hub_proteins`, `topology_metrics`
+- flux_balance: `stoichiometry_matrix`, `fluxes`, `flux_bounds` ([[lb,ub],...]), `objective`
+
+**immunoinformatics** — claim_type: `epitope_prediction` | `mhc_binding` | `bcell_epitope`
+- epitope_prediction: `peptide`, `source_protein` (UniProt ID), `hla_allele` (e.g. "HLA-A*02:01"), `prediction_score`
+- mhc_binding: `peptide`, `hla_allele`, `mhc_class` ("I"/"II"), `binding_affinity` (IC50 nM), `classification`
+- bcell_epitope: `sequence`, `source_protein`, `surface_accessibility`
+
+**metabolomics** — claim_type: `compound_identification` | `pathway_mapping` | `spectral_match`
+- compound_identification: `hmdb_id`, `compound_name`, `mz`, `formula`, `inchikey`, `adduct`
+- pathway_mapping: `kegg_compound_id`, `kegg_pathway_id`, `enzymes` ([...])
+- spectral_match: `precursor_mz`, `adduct`, `spectrum_peaks` ([[mz, intensity],...]), `ppm_tolerance`
+
+**computational_biology** (extended) — additional claim_types: `rna_structure` | `structure_comparison`
+- rna_structure: `rna_sequence`, `dot_bracket`, `claimed_mfe`, `rfam_family`
+- structure_comparison: `pdb_id_1`, `pdb_id_2`, `claimed_rmsd`, `claimed_tm_score`, `aligned_residues`
+
### Queue Stats
```
GET /api/verification/queue-stats
diff --git a/backend/services/verification_queue.py b/backend/services/verification_queue.py
index 28ac4f4..4c134eb 100644
--- a/backend/services/verification_queue.py
+++ b/backend/services/verification_queue.py
@@ -51,6 +51,11 @@
"bioinformatics": 0.70,
"chemistry": 0.70,
"physics": 0.75,
+ "genomics": 0.70,
+ "epidemiology": 0.70,
+ "systems_biology": 0.70,
+ "immunoinformatics": 0.65,
+ "metabolomics": 0.70,
}
# Redis keys
diff --git a/backend/verification/compbio_adapter.py b/backend/verification/compbio_adapter.py
index 386cf3f..e7db4ca 100644
--- a/backend/verification/compbio_adapter.py
+++ b/backend/verification/compbio_adapter.py
@@ -3,14 +3,19 @@
Uses Biopython + numpy + DSSP to validate protein structures without GPU
re-prediction. Checks physical plausibility (Ramachandran, clashes,
sequence–structure match, secondary structure) rather than re-folding.
+Also validates RNA structures and protein structure comparisons via API.
"""
import asyncio
import json
+import re
import tempfile
import textwrap
import time
+from typing import Any
from pathlib import Path
+import httpx
+
from backend.verification.base import (
VerificationAdapter, VerificationResult, VerificationBadge,
)
@@ -21,6 +26,14 @@
COMPBIO_IMAGE = "clawdlab/compbio-cpu:latest"
COMPBIO_TIMEOUT = 300 # 5 min — CPU validation is fast
VALID_AMINO_ACIDS = set("ACDEFGHIKLMNPQRSTVWY")
+VALID_RNA_BASES = set("ACGU")
+
+RFAM_API = "https://rfam.org/family"
+RCSB_PDB_API = "https://data.rcsb.org/rest/v1/core/entry"
+HTTP_TIMEOUT = 30
+
+# Canonical + wobble base pairs for RNA
+CANONICAL_PAIRS = {("A", "U"), ("U", "A"), ("G", "C"), ("C", "G"), ("G", "U"), ("U", "G")}
class CompBioAdapter(VerificationAdapter):
@@ -35,6 +48,10 @@ async def verify(self, task_result: dict, task_metadata: dict) -> VerificationRe
return await self._verify_design(task_result)
elif claim_type == "binder_design":
return await self._verify_binder(task_result)
+ elif claim_type == "rna_structure":
+ return await self._verify_rna_structure(task_result)
+ elif claim_type == "structure_comparison":
+ return await self._verify_structure_comparison(task_result)
else:
return VerificationResult.fail(self.domain, [f"Unknown claim_type: {claim_type}"])
@@ -246,6 +263,364 @@ async def _verify_binder(self, result: dict) -> VerificationResult:
compute_time_seconds=elapsed,
)
+ # ------------------------------------------------------------------
+ # RNA structure verification (API-based, no Docker)
+ # ------------------------------------------------------------------
+
+ async def _verify_rna_structure(self, result: dict) -> VerificationResult:
+ """Validate RNA secondary structure claims."""
+ start = time.monotonic()
+
+ rna_sequence = result.get("rna_sequence", "")
+ dot_bracket = result.get("dot_bracket", "")
+ claimed_mfe = result.get("claimed_mfe")
+ rfam_family = result.get("rfam_family", "")
+
+ if not rna_sequence:
+ return VerificationResult.fail(self.domain, ["rna_sequence required"])
+
+ component_scores: dict[str, float] = {}
+ details: dict[str, Any] = {"claim_type": "rna_structure"}
+
+ # Component 1: sequence_valid (0.15)
+ seq_result = self._check_rna_sequence_valid(rna_sequence)
+ component_scores["sequence_valid"] = seq_result["score"]
+ details["sequence_valid"] = seq_result
+
+ # Component 2: dot_bracket_valid (0.25)
+ db_result = self._check_dot_bracket_valid(dot_bracket, rna_sequence)
+ component_scores["dot_bracket_valid"] = db_result["score"]
+ details["dot_bracket_valid"] = db_result
+
+ # Component 3: base_pairs_valid (0.25)
+ bp_result = self._check_base_pairs_valid(rna_sequence, dot_bracket)
+ component_scores["base_pairs_valid"] = bp_result["score"]
+ details["base_pairs_valid"] = bp_result
+
+ # Component 4: energy_plausible (0.20)
+ energy_result = self._check_energy_plausible(rna_sequence, claimed_mfe)
+ component_scores["energy_plausible"] = energy_result["score"]
+ details["energy_plausible"] = energy_result
+
+ # Component 5: database_match (0.15)
+ db_match = await self._check_rfam_match(rfam_family)
+ component_scores["database_match"] = db_match["score"]
+ details["database_match"] = db_match
+
+ weights = {
+ "sequence_valid": 0.15,
+ "dot_bracket_valid": 0.25,
+ "base_pairs_valid": 0.25,
+ "energy_plausible": 0.20,
+ "database_match": 0.15,
+ }
+ score = sum(weights[k] * component_scores[k] for k in weights)
+ score = min(1.0, round(score, 4))
+
+ elapsed = time.monotonic() - start
+ details["component_scores"] = component_scores
+
+ return VerificationResult(
+ passed=score >= 0.5,
+ score=score,
+ badge=VerificationResult.score_to_badge(score),
+ domain=self.domain,
+ details=details,
+ compute_time_seconds=elapsed,
+ )
+
+ # ------------------------------------------------------------------
+ # Structure comparison verification (API-based, no Docker)
+ # ------------------------------------------------------------------
+
+ async def _verify_structure_comparison(self, result: dict) -> VerificationResult:
+ """Validate protein structure comparison claims (RMSD, TM-score)."""
+ start = time.monotonic()
+
+ pdb_id_1 = result.get("pdb_id_1", "")
+ pdb_id_2 = result.get("pdb_id_2", "")
+ claimed_rmsd = result.get("claimed_rmsd")
+ claimed_tm_score = result.get("claimed_tm_score")
+ aligned_residues = result.get("aligned_residues")
+
+ if not pdb_id_1 or not pdb_id_2:
+ return VerificationResult.fail(self.domain, ["pdb_id_1 and pdb_id_2 required"])
+
+ component_scores: dict[str, float] = {}
+ details: dict[str, Any] = {"claim_type": "structure_comparison"}
+
+ # Component 1: pdb_ids_valid (0.20)
+ pdb_result = await self._check_pdb_ids_valid(pdb_id_1, pdb_id_2)
+ component_scores["pdb_ids_valid"] = pdb_result["score"]
+ details["pdb_ids_valid"] = pdb_result
+
+ # Component 2: rmsd_plausible (0.25)
+ rmsd_result = self._check_rmsd_plausible(claimed_rmsd)
+ component_scores["rmsd_plausible"] = rmsd_result["score"]
+ details["rmsd_plausible"] = rmsd_result
+
+ # Component 3: alignment_length (0.25)
+ align_result = await self._check_alignment_length(
+ pdb_id_1, pdb_id_2, aligned_residues,
+ )
+ component_scores["alignment_length"] = align_result["score"]
+ details["alignment_length"] = align_result
+
+ # Component 4: tm_score_plausible (0.30)
+ tm_result = self._check_tm_score_plausible(claimed_tm_score, claimed_rmsd)
+ component_scores["tm_score_plausible"] = tm_result["score"]
+ details["tm_score_plausible"] = tm_result
+
+ weights = {
+ "pdb_ids_valid": 0.20,
+ "rmsd_plausible": 0.25,
+ "alignment_length": 0.25,
+ "tm_score_plausible": 0.30,
+ }
+ score = sum(weights[k] * component_scores[k] for k in weights)
+ score = min(1.0, round(score, 4))
+
+ elapsed = time.monotonic() - start
+ details["component_scores"] = component_scores
+
+ return VerificationResult(
+ passed=score >= 0.5,
+ score=score,
+ badge=VerificationResult.score_to_badge(score),
+ domain=self.domain,
+ details=details,
+ compute_time_seconds=elapsed,
+ )
+
+ # ------------------------------------------------------------------
+ # RNA structure helpers
+ # ------------------------------------------------------------------
+
+ @staticmethod
+ def _check_rna_sequence_valid(seq: str) -> dict:
+ upper = seq.upper()
+ invalid = [b for b in upper if b not in VALID_RNA_BASES]
+ if invalid:
+ return {"score": 0.0, "error": f"Invalid bases: {''.join(set(invalid))}", "length": len(upper)}
+ return {"score": 1.0, "valid": True, "length": len(upper)}
+
+ @staticmethod
+ def _check_dot_bracket_valid(dot_bracket: str, sequence: str) -> dict:
+ if not dot_bracket:
+ return {"score": 0.5, "note": "No dot-bracket notation provided"}
+
+ if len(dot_bracket) != len(sequence):
+ return {
+ "score": 0.0,
+ "error": f"Length mismatch: sequence={len(sequence)}, dot_bracket={len(dot_bracket)}",
+ }
+
+ # Check balanced parentheses
+ stack: list[int] = []
+ for i, ch in enumerate(dot_bracket):
+ if ch == "(":
+ stack.append(i)
+ elif ch == ")":
+ if not stack:
+ return {"score": 0.0, "error": f"Unmatched ')' at position {i}"}
+ stack.pop()
+ elif ch != ".":
+ return {"score": 0.0, "error": f"Invalid character '{ch}' at position {i}"}
+
+ if stack:
+ return {"score": 0.0, "error": f"Unmatched '(' at positions {stack[:5]}"}
+
+ n_pairs = dot_bracket.count("(")
+ return {"score": 1.0, "valid": True, "n_base_pairs": n_pairs, "length": len(dot_bracket)}
+
+ @staticmethod
+ def _check_base_pairs_valid(sequence: str, dot_bracket: str) -> dict:
+ if not dot_bracket or not sequence or len(dot_bracket) != len(sequence):
+ return {"score": 0.5, "note": "Cannot check base pairs"}
+
+ upper = sequence.upper()
+ stack: list[int] = []
+ canonical = 0
+ non_canonical = 0
+
+ for i, ch in enumerate(dot_bracket):
+ if ch == "(":
+ stack.append(i)
+ elif ch == ")" and stack:
+ j = stack.pop()
+ pair = (upper[j], upper[i])
+ if pair in CANONICAL_PAIRS:
+ canonical += 1
+ else:
+ non_canonical += 1
+
+ total = canonical + non_canonical
+ if total == 0:
+ return {"score": 0.5, "note": "No base pairs"}
+
+ score = canonical / total
+ return {
+ "score": round(score, 4),
+ "canonical": canonical,
+ "non_canonical": non_canonical,
+ "total_pairs": total,
+ }
+
+ @staticmethod
+ def _check_energy_plausible(sequence: str, claimed_mfe: float | None) -> dict:
+ if claimed_mfe is None:
+ return {"score": 0.5, "note": "No MFE claimed"}
+
+ seq_len = len(sequence)
+ # Empirical: MFE scales roughly linearly with length
+ # Typical range: -0.3 to -1.5 kcal/mol per nucleotide for structured RNAs
+ min_expected = -1.5 * seq_len
+ max_expected = 0.0 # unfolded
+
+ if min_expected <= claimed_mfe <= max_expected:
+ return {"score": 1.0, "plausible": True, "claimed_mfe": claimed_mfe}
+ elif claimed_mfe > 0:
+ return {"score": 0.0, "error": "Positive MFE is thermodynamically implausible"}
+ elif claimed_mfe < min_expected * 2:
+ return {"score": 0.2, "note": "MFE unusually low for sequence length"}
+ return {"score": 0.5, "note": "MFE borderline", "claimed_mfe": claimed_mfe}
+
+ async def _check_rfam_match(self, rfam_family: str) -> dict:
+ if not rfam_family:
+ return {"score": 0.5, "note": "No Rfam family ID provided"}
+
+ try:
+ async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
+ resp = await client.get(
+ f"{RFAM_API}/{rfam_family}",
+ headers={"Accept": "application/json"},
+ )
+ if resp.status_code == 200:
+ data = resp.json()
+ return {
+ "score": 1.0,
+ "found": True,
+ "family_name": data.get("rfam", {}).get("id", rfam_family),
+ }
+ return {"score": 0.0, "found": False}
+ except Exception as e:
+ logger.warning("rfam_check_failed", error=str(e))
+ return {"score": 0.3, "error": str(e)}
+
+ # ------------------------------------------------------------------
+ # Structure comparison helpers
+ # ------------------------------------------------------------------
+
+ async def _check_pdb_ids_valid(self, pdb_id_1: str, pdb_id_2: str) -> dict:
+ valid = 0
+ checked_ids = {}
+ try:
+ async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
+ for pdb_id in [pdb_id_1, pdb_id_2]:
+ resp = await client.get(f"{RCSB_PDB_API}/{pdb_id.upper()}")
+ if resp.status_code == 200:
+ valid += 1
+ checked_ids[pdb_id] = True
+ else:
+ checked_ids[pdb_id] = False
+
+ score = valid / 2.0
+ return {"score": round(score, 4), "ids": checked_ids}
+ except Exception as e:
+ logger.warning("pdb_ids_check_failed", error=str(e))
+ return {"score": 0.3, "error": str(e)}
+
+ @staticmethod
+ def _check_rmsd_plausible(claimed_rmsd: float | None) -> dict:
+ if claimed_rmsd is None:
+ return {"score": 0.5, "note": "No RMSD claimed"}
+
+ if not isinstance(claimed_rmsd, (int, float)) or claimed_rmsd < 0:
+ return {"score": 0.0, "error": "RMSD must be non-negative"}
+
+ # Typical RMSD ranges:
+ # 0-2 Å: very similar structures
+ # 2-5 Å: same fold, different conformations
+ # 5-10 Å: related folds
+ # >10 Å: different folds
+ if 0 <= claimed_rmsd <= 30:
+ return {"score": 1.0, "plausible": True, "rmsd": claimed_rmsd}
+ return {"score": 0.2, "plausible": False, "rmsd": claimed_rmsd}
+
+ async def _check_alignment_length(
+ self, pdb_id_1: str, pdb_id_2: str, aligned_residues: int | None,
+ ) -> dict:
+ if aligned_residues is None:
+ return {"score": 0.5, "note": "No aligned residues count"}
+
+ try:
+ # Fetch polymer entity counts from RCSB
+ lengths = []
+ async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
+ for pdb_id in [pdb_id_1, pdb_id_2]:
+ resp = await client.get(f"{RCSB_PDB_API}/{pdb_id.upper()}")
+ if resp.status_code == 200:
+ data = resp.json()
+ # Get deposited model count or polymer entity count
+ struct = data.get("struct", {})
+ n_res = data.get("rcsb_entry_info", {}).get("deposited_polymer_monomer_count", 0)
+ if n_res:
+ lengths.append(n_res)
+
+ if len(lengths) < 2:
+ # Just check plausibility
+ if aligned_residues > 0:
+ return {"score": 0.7, "note": "Could not verify against PDB lengths"}
+ return {"score": 0.3, "error": "Zero aligned residues"}
+
+ shorter = min(lengths)
+ fraction = aligned_residues / shorter if shorter > 0 else 0
+
+ # Alignment should cover reasonable fraction of shorter chain
+ if fraction >= 0.5:
+ return {"score": 1.0, "fraction": round(fraction, 3), "shorter_chain": shorter}
+ elif fraction >= 0.2:
+ return {"score": 0.5, "fraction": round(fraction, 3)}
+ return {"score": 0.2, "fraction": round(fraction, 3)}
+
+ except Exception as e:
+ logger.warning("alignment_length_check_failed", error=str(e))
+ return {"score": 0.5, "error": str(e)}
+
+ @staticmethod
+ def _check_tm_score_plausible(
+ claimed_tm: float | None, claimed_rmsd: float | None,
+ ) -> dict:
+ if claimed_tm is None:
+ return {"score": 0.5, "note": "No TM-score claimed"}
+
+ if not isinstance(claimed_tm, (int, float)):
+ return {"score": 0.0, "error": "TM-score must be numeric"}
+
+ if not (0 <= claimed_tm <= 1):
+ return {"score": 0.0, "error": f"TM-score {claimed_tm} must be in [0, 1]"}
+
+ # Consistency with RMSD: high TM-score should correlate with low RMSD
+ if claimed_rmsd is not None and isinstance(claimed_rmsd, (int, float)):
+ # High TM + high RMSD is suspicious
+ if claimed_tm > 0.8 and claimed_rmsd > 10:
+ return {
+ "score": 0.2,
+ "note": "High TM-score inconsistent with high RMSD",
+ "tm_score": claimed_tm,
+ "rmsd": claimed_rmsd,
+ }
+ # Low TM + low RMSD is suspicious
+ if claimed_tm < 0.3 and claimed_rmsd < 1:
+ return {
+ "score": 0.2,
+ "note": "Low TM-score inconsistent with low RMSD",
+ "tm_score": claimed_tm,
+ "rmsd": claimed_rmsd,
+ }
+
+ return {"score": 1.0, "plausible": True, "tm_score": claimed_tm}
+
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
diff --git a/backend/verification/dispatcher.py b/backend/verification/dispatcher.py
index 7c7086f..ea05f04 100644
--- a/backend/verification/dispatcher.py
+++ b/backend/verification/dispatcher.py
@@ -62,9 +62,16 @@ def _register_all() -> None:
from backend.verification.bioinfo_adapter import BioInfoAdapter
from backend.verification.chemistry_adapter import ChemistryAdapter
from backend.verification.physics_adapter import PhysicsAdapter
+ from backend.verification.genomics_adapter import GenomicsAdapter
+ from backend.verification.epidemiology_adapter import EpidemiologyAdapter
+ from backend.verification.systems_biology_adapter import SystemsBiologyAdapter
+ from backend.verification.immunoinformatics_adapter import ImmunoinformaticsAdapter
+ from backend.verification.metabolomics_adapter import MetabolomicsAdapter
for cls in [Lean4Adapter, MLReproAdapter, CompBioAdapter, MaterialsAdapter,
- BioInfoAdapter, ChemistryAdapter, PhysicsAdapter]:
+ BioInfoAdapter, ChemistryAdapter, PhysicsAdapter,
+ GenomicsAdapter, EpidemiologyAdapter, SystemsBiologyAdapter,
+ ImmunoinformaticsAdapter, MetabolomicsAdapter]:
register_adapter(cls())
diff --git a/backend/verification/epidemiology_adapter.py b/backend/verification/epidemiology_adapter.py
new file mode 100644
index 0000000..89f66cc
--- /dev/null
+++ b/backend/verification/epidemiology_adapter.py
@@ -0,0 +1,612 @@
+"""Epidemiology verification: WHO GHO API + scipy statistical re-computation.
+
+Validates incidence rates, odds ratios (2x2 tables), and survival analyses.
+Primarily computational (scipy-based), with WHO data cross-referencing.
+API-based (no Docker).
+"""
+from __future__ import annotations
+
+import asyncio
+import math
+import time
+from typing import Any
+
+import httpx
+
+from backend.logging_config import get_logger
+from backend.verification.base import (
+ VerificationAdapter,
+ VerificationResult,
+)
+
+logger = get_logger(__name__)
+
+WHO_GHO_API = "https://ghoapi.azureedge.net/api"
+HTTP_TIMEOUT = 30
+
+# Graceful degradation for lifelines (survival analysis)
+try:
+ from lifelines import KaplanMeierFitter
+ from lifelines.statistics import logrank_test
+ LIFELINES_AVAILABLE = True
+except ImportError:
+ LIFELINES_AVAILABLE = False
+ logger.warning("lifelines_not_available", note="Survival analysis will use neutral scores")
+
+
+class EpidemiologyAdapter(VerificationAdapter):
+ domain = "epidemiology"
+
+ async def verify(self, task_result: dict, task_metadata: dict) -> VerificationResult:
+ claim_type = task_result.get("claim_type", "incidence_rate")
+
+ if claim_type == "incidence_rate":
+ return await self._verify_incidence_rate(task_result)
+ elif claim_type == "odds_ratio":
+ return await self._verify_odds_ratio(task_result)
+ elif claim_type == "survival_analysis":
+ return await self._verify_survival_analysis(task_result)
+ else:
+ return VerificationResult.fail(self.domain, [f"Unknown claim_type: {claim_type}"])
+
+ # ------------------------------------------------------------------
+ # incidence_rate
+ # ------------------------------------------------------------------
+
+ async def _verify_incidence_rate(self, result: dict) -> VerificationResult:
+ start = time.monotonic()
+
+ rate = result.get("rate")
+ denominator = result.get("denominator")
+ cases = result.get("cases")
+ comparison_rate = result.get("comparison_rate")
+ disease_code = result.get("disease_code", "")
+
+ if rate is None:
+ return VerificationResult.fail(self.domain, ["rate is required"])
+
+ component_scores: dict[str, float] = {}
+ details: dict[str, Any] = {"claim_type": "incidence_rate"}
+
+ # Component 1: rate_plausible (0.25)
+ rate_result = await self._check_rate_plausible(rate, disease_code)
+ component_scores["rate_plausible"] = rate_result["score"]
+ details["rate_plausible"] = rate_result
+
+ # Component 2: denominator_valid (0.20)
+ denom_result = self._check_denominator_valid(denominator)
+ component_scores["denominator_valid"] = denom_result["score"]
+ details["denominator_valid"] = denom_result
+
+ # Component 3: confidence_interval (0.25)
+ ci_result = await asyncio.to_thread(
+ self._check_confidence_interval, rate, cases, denominator, result,
+ )
+ component_scores["confidence_interval"] = ci_result["score"]
+ details["confidence_interval"] = ci_result
+
+ # Component 4: comparison_valid (0.30)
+ comp_result = await self._check_comparison_valid(rate, comparison_rate, disease_code)
+ component_scores["comparison_valid"] = comp_result["score"]
+ details["comparison_valid"] = comp_result
+
+ weights = {
+ "rate_plausible": 0.25,
+ "denominator_valid": 0.20,
+ "confidence_interval": 0.25,
+ "comparison_valid": 0.30,
+ }
+ score = sum(weights[k] * component_scores[k] for k in weights)
+ score = min(1.0, round(score, 4))
+
+ elapsed = time.monotonic() - start
+ details["component_scores"] = component_scores
+
+ return VerificationResult(
+ passed=score >= 0.5,
+ score=score,
+ badge=VerificationResult.score_to_badge(score),
+ domain=self.domain,
+ details=details,
+ compute_time_seconds=elapsed,
+ )
+
+ # ------------------------------------------------------------------
+ # odds_ratio
+ # ------------------------------------------------------------------
+
+ async def _verify_odds_ratio(self, result: dict) -> VerificationResult:
+ start = time.monotonic()
+
+ contingency_table = result.get("contingency_table", [])
+ claimed_or = result.get("odds_ratio")
+ claimed_ci_lower = result.get("ci_lower")
+ claimed_ci_upper = result.get("ci_upper")
+ claimed_pvalue = result.get("p_value")
+
+ if not contingency_table:
+ return VerificationResult.fail(self.domain, ["contingency_table (2x2) required"])
+
+ component_scores: dict[str, float] = {}
+ details: dict[str, Any] = {"claim_type": "odds_ratio"}
+
+ # Component 1: contingency_valid (0.20)
+ valid_result = self._check_contingency_valid(contingency_table)
+ component_scores["contingency_valid"] = valid_result["score"]
+ details["contingency_valid"] = valid_result
+
+ # Component 2: or_recomputed (0.30)
+ or_result = await asyncio.to_thread(
+ self._recompute_odds_ratio, contingency_table, claimed_or,
+ )
+ component_scores["or_recomputed"] = or_result["score"]
+ details["or_recomputed"] = or_result
+
+ # Component 3: ci_recomputed (0.25)
+ ci_result = await asyncio.to_thread(
+ self._recompute_ci, contingency_table, claimed_ci_lower, claimed_ci_upper,
+ )
+ component_scores["ci_recomputed"] = ci_result["score"]
+ details["ci_recomputed"] = ci_result
+
+ # Component 4: pvalue_recomputed (0.25)
+ pval_result = await asyncio.to_thread(
+ self._recompute_pvalue, contingency_table, claimed_pvalue,
+ )
+ component_scores["pvalue_recomputed"] = pval_result["score"]
+ details["pvalue_recomputed"] = pval_result
+
+ weights = {
+ "contingency_valid": 0.20,
+ "or_recomputed": 0.30,
+ "ci_recomputed": 0.25,
+ "pvalue_recomputed": 0.25,
+ }
+ score = sum(weights[k] * component_scores[k] for k in weights)
+ score = min(1.0, round(score, 4))
+
+ elapsed = time.monotonic() - start
+ details["component_scores"] = component_scores
+
+ return VerificationResult(
+ passed=score >= 0.5,
+ score=score,
+ badge=VerificationResult.score_to_badge(score),
+ domain=self.domain,
+ details=details,
+ compute_time_seconds=elapsed,
+ )
+
+ # ------------------------------------------------------------------
+ # survival_analysis
+ # ------------------------------------------------------------------
+
+ async def _verify_survival_analysis(self, result: dict) -> VerificationResult:
+ start = time.monotonic()
+
+ time_data = result.get("time_data", [])
+ event_data = result.get("event_data", [])
+ group_labels = result.get("group_labels", [])
+ claimed_median = result.get("median_survival")
+ claimed_pvalue = result.get("p_value")
+ claimed_hr = result.get("hazard_ratio")
+
+ if not time_data or not event_data:
+ return VerificationResult.fail(self.domain, ["time_data and event_data required"])
+
+ component_scores: dict[str, float] = {}
+ details: dict[str, Any] = {"claim_type": "survival_analysis"}
+ warnings: list[str] = []
+
+ # Component 1: data_valid (0.15)
+ valid_result = self._check_survival_data_valid(time_data, event_data)
+ component_scores["data_valid"] = valid_result["score"]
+ details["data_valid"] = valid_result
+
+ # Component 2: km_recomputed (0.35)
+ km_result = await asyncio.to_thread(
+ self._recompute_km, time_data, event_data, group_labels, claimed_median,
+ )
+ component_scores["km_recomputed"] = km_result["score"]
+ details["km_recomputed"] = km_result
+ if km_result.get("warnings"):
+ warnings.extend(km_result["warnings"])
+
+ # Component 3: logrank_recomputed (0.30)
+ lr_result = await asyncio.to_thread(
+ self._recompute_logrank, time_data, event_data, group_labels, claimed_pvalue,
+ )
+ component_scores["logrank_recomputed"] = lr_result["score"]
+ details["logrank_recomputed"] = lr_result
+
+ # Component 4: hazard_ratio_check (0.20)
+ hr_result = self._check_hazard_ratio(claimed_hr)
+ component_scores["hazard_ratio_check"] = hr_result["score"]
+ details["hazard_ratio_check"] = hr_result
+
+ weights = {
+ "data_valid": 0.15,
+ "km_recomputed": 0.35,
+ "logrank_recomputed": 0.30,
+ "hazard_ratio_check": 0.20,
+ }
+ score = sum(weights[k] * component_scores[k] for k in weights)
+ score = min(1.0, round(score, 4))
+
+ elapsed = time.monotonic() - start
+ details["component_scores"] = component_scores
+
+ return VerificationResult(
+ passed=score >= 0.5,
+ score=score,
+ badge=VerificationResult.score_to_badge(score),
+ domain=self.domain,
+ details=details,
+ warnings=warnings,
+ compute_time_seconds=elapsed,
+ )
+
+ # ------------------------------------------------------------------
+ # Helpers — incidence rate
+ # ------------------------------------------------------------------
+
+ async def _check_rate_plausible(self, rate: float, disease_code: str) -> dict:
+ """Check if claimed rate is in plausible range, optionally vs WHO data."""
+ if not isinstance(rate, (int, float)) or rate < 0:
+ return {"score": 0.0, "error": "Rate must be non-negative numeric"}
+
+ # If WHO disease code provided, cross-reference
+ if disease_code:
+ try:
+ async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
+ resp = await client.get(f"{WHO_GHO_API}/{disease_code}")
+ if resp.status_code == 200:
+ data = resp.json()
+ values = data.get("value", [])
+ if values:
+ ref_values = [
+ v.get("NumericValue") for v in values
+ if v.get("NumericValue") is not None
+ ]
+ if ref_values:
+ ref_min = min(ref_values)
+ ref_max = max(ref_values)
+ # Allow 10x range around reference
+ if ref_min / 10 <= rate <= ref_max * 10:
+ return {"score": 1.0, "plausible": True, "who_range": [ref_min, ref_max]}
+ return {"score": 0.3, "plausible": False, "who_range": [ref_min, ref_max]}
+ except Exception as e:
+ logger.warning("who_rate_check_failed", error=str(e))
+
+ # Basic plausibility: incidence per 100k should be 0-100000
+ if 0 <= rate <= 100000:
+ return {"score": 0.7, "plausible": True, "note": "In general plausible range"}
+ return {"score": 0.2, "plausible": False}
+
+ @staticmethod
+ def _check_denominator_valid(denominator: int | float | None) -> dict:
+ """Check if population denominator is reasonable."""
+ if denominator is None:
+ return {"score": 0.5, "note": "No denominator provided"}
+
+ if not isinstance(denominator, (int, float)) or denominator <= 0:
+ return {"score": 0.0, "error": "Denominator must be positive"}
+
+ # Reasonable study sizes: 10 to 10 billion
+ if 10 <= denominator <= 1e10:
+ return {"score": 1.0, "valid": True, "denominator": denominator}
+ elif denominator < 10:
+ return {"score": 0.3, "note": "Very small denominator", "denominator": denominator}
+ else:
+ return {"score": 0.2, "note": "Unreasonably large denominator"}
+
+ @staticmethod
+ def _check_confidence_interval(
+ rate: float, cases: int | None, denominator: int | float | None, result: dict,
+ ) -> dict:
+ """Recompute CI from rate + sample size."""
+ try:
+ from scipy import stats as sp_stats
+ except ImportError:
+ return {"score": 0.5, "note": "scipy unavailable"}
+
+ claimed_ci_lower = result.get("ci_lower")
+ claimed_ci_upper = result.get("ci_upper")
+
+ if claimed_ci_lower is None or claimed_ci_upper is None:
+ return {"score": 0.5, "note": "No CI claimed"}
+
+ if cases is not None and denominator is not None and denominator > 0:
+ p_hat = cases / denominator
+ se = math.sqrt(p_hat * (1 - p_hat) / denominator) if 0 < p_hat < 1 else 0
+ z = sp_stats.norm.ppf(0.975)
+ computed_lower = (p_hat - z * se) * (rate / p_hat) if p_hat > 0 else 0
+ computed_upper = (p_hat + z * se) * (rate / p_hat) if p_hat > 0 else 0
+
+ lower_match = abs(claimed_ci_lower - computed_lower) <= max(abs(computed_lower) * 0.1, 0.01)
+ upper_match = abs(claimed_ci_upper - computed_upper) <= max(abs(computed_upper) * 0.1, 0.01)
+
+ score = (0.5 if lower_match else 0.0) + (0.5 if upper_match else 0.0)
+ return {
+ "score": round(score, 4),
+ "claimed_ci": [claimed_ci_lower, claimed_ci_upper],
+ "computed_ci": [round(computed_lower, 4), round(computed_upper, 4)],
+ }
+
+ # Just check CI ordering
+ if claimed_ci_lower <= rate <= claimed_ci_upper:
+ return {"score": 0.7, "ci_ordered": True}
+ return {"score": 0.2, "ci_ordered": False}
+
+ async def _check_comparison_valid(
+ self, rate: float, comparison_rate: float | None, disease_code: str,
+ ) -> dict:
+ """If comparative rate given, check direction + magnitude."""
+ if comparison_rate is None:
+ return {"score": 0.5, "note": "No comparison rate provided"}
+
+ if not isinstance(comparison_rate, (int, float)) or comparison_rate < 0:
+ return {"score": 0.0, "error": "Comparison rate must be non-negative"}
+
+ ratio = rate / comparison_rate if comparison_rate > 0 else float("inf")
+ if 0.01 <= ratio <= 100:
+ return {"score": 1.0, "plausible": True, "rate_ratio": round(ratio, 4)}
+ return {"score": 0.2, "plausible": False, "rate_ratio": round(ratio, 4)}
+
+ # ------------------------------------------------------------------
+ # Helpers — odds ratio
+ # ------------------------------------------------------------------
+
+ @staticmethod
+ def _check_contingency_valid(table: list) -> dict:
+ """Validate 2x2 contingency table."""
+ try:
+ if len(table) != 2 or len(table[0]) != 2 or len(table[1]) != 2:
+ return {"score": 0.0, "error": "Must be 2x2 table"}
+
+ a, b = table[0][0], table[0][1]
+ c, d = table[1][0], table[1][1]
+
+ if any(not isinstance(v, (int, float)) for v in [a, b, c, d]):
+ return {"score": 0.0, "error": "All cells must be numeric"}
+
+ if any(v < 0 for v in [a, b, c, d]):
+ return {"score": 0.0, "error": "All cells must be non-negative"}
+
+ total = a + b + c + d
+ if total == 0:
+ return {"score": 0.0, "error": "Empty table"}
+
+ return {"score": 1.0, "valid": True, "total": total, "cells": [a, b, c, d]}
+ except Exception:
+ return {"score": 0.0, "error": "Invalid table format"}
+
+ @staticmethod
+ def _recompute_odds_ratio(table: list, claimed_or: float | None) -> dict:
+ """Recompute OR from 2x2 table."""
+ try:
+ a, b = table[0][0], table[0][1]
+ c, d = table[1][0], table[1][1]
+
+ if b == 0 or c == 0:
+ return {"score": 0.5, "note": "Zero cell — OR undefined or infinite"}
+
+ computed_or = (a * d) / (b * c)
+
+ if claimed_or is None:
+ return {"score": 0.5, "note": "No OR claimed", "computed_or": round(computed_or, 4)}
+
+ tolerance = max(abs(computed_or) * 0.05, 0.01)
+ match = abs(claimed_or - computed_or) <= tolerance
+
+ return {
+ "score": 1.0 if match else 0.2,
+ "match": match,
+ "claimed": claimed_or,
+ "computed": round(computed_or, 4),
+ }
+ except Exception as e:
+ return {"score": 0.0, "error": str(e)}
+
+ @staticmethod
+ def _recompute_ci(table: list, claimed_lower: float | None, claimed_upper: float | None) -> dict:
+ """Recompute 95% CI for OR using Woolf's method."""
+ try:
+ a, b = table[0][0], table[0][1]
+ c, d = table[1][0], table[1][1]
+
+ if any(v == 0 for v in [a, b, c, d]):
+ return {"score": 0.5, "note": "Zero cell — CI computation unreliable"}
+
+ ln_or = math.log((a * d) / (b * c))
+ se_ln_or = math.sqrt(1 / a + 1 / b + 1 / c + 1 / d)
+ lower = math.exp(ln_or - 1.96 * se_ln_or)
+ upper = math.exp(ln_or + 1.96 * se_ln_or)
+
+ if claimed_lower is None or claimed_upper is None:
+ return {
+ "score": 0.5,
+ "note": "No CI claimed",
+ "computed_ci": [round(lower, 4), round(upper, 4)],
+ }
+
+ lower_tol = max(abs(lower) * 0.1, 0.01)
+ upper_tol = max(abs(upper) * 0.1, 0.01)
+ lower_match = abs(claimed_lower - lower) <= lower_tol
+ upper_match = abs(claimed_upper - upper) <= upper_tol
+
+ score = (0.5 if lower_match else 0.0) + (0.5 if upper_match else 0.0)
+
+ return {
+ "score": round(score, 4),
+ "claimed_ci": [claimed_lower, claimed_upper],
+ "computed_ci": [round(lower, 4), round(upper, 4)],
+ }
+ except Exception as e:
+ return {"score": 0.3, "error": str(e)}
+
+ @staticmethod
+ def _recompute_pvalue(table: list, claimed_pvalue: float | None) -> dict:
+ """Recompute p-value from 2x2 table using Fisher's exact or chi-square."""
+ try:
+ from scipy import stats as sp_stats
+ except ImportError:
+ return {"score": 0.5, "note": "scipy unavailable"}
+
+ try:
+ a, b = int(table[0][0]), int(table[0][1])
+ c, d = int(table[1][0]), int(table[1][1])
+
+ total = a + b + c + d
+ # Use Fisher's exact for small samples, chi-square for large
+ if total < 100:
+ _, recomputed_p = sp_stats.fisher_exact([[a, b], [c, d]])
+ else:
+ chi2, recomputed_p, _, _ = sp_stats.chi2_contingency([[a, b], [c, d]])
+
+ if claimed_pvalue is None:
+ return {"score": 0.5, "note": "No p-value claimed", "computed_p": recomputed_p}
+
+ tolerance = max(abs(recomputed_p) * 0.05, 1e-10)
+ match = abs(claimed_pvalue - recomputed_p) <= tolerance
+
+ return {
+ "score": 1.0 if match else 0.2,
+ "match": match,
+ "claimed_p": claimed_pvalue,
+ "computed_p": recomputed_p,
+ }
+ except Exception as e:
+ return {"score": 0.3, "error": str(e)}
+
+ # ------------------------------------------------------------------
+ # Helpers — survival analysis
+ # ------------------------------------------------------------------
+
+ @staticmethod
+ def _check_survival_data_valid(time_data: list, event_data: list) -> dict:
+ """Check that time/event arrays are well-formed."""
+ if len(time_data) != len(event_data):
+ return {"score": 0.0, "error": "time_data and event_data must have same length"}
+
+ if len(time_data) < 3:
+ return {"score": 0.0, "error": "Need at least 3 observations"}
+
+ if any(not isinstance(t, (int, float)) or t < 0 for t in time_data):
+ return {"score": 0.0, "error": "time_data must be non-negative numbers"}
+
+ if any(e not in (0, 1) for e in event_data):
+ return {"score": 0.0, "error": "event_data must be 0 or 1"}
+
+ n_events = sum(event_data)
+ return {
+ "score": 1.0,
+ "valid": True,
+ "n_observations": len(time_data),
+ "n_events": n_events,
+ "censoring_rate": round(1 - n_events / len(event_data), 3),
+ }
+
+ @staticmethod
+ def _recompute_km(
+ time_data: list, event_data: list, group_labels: list, claimed_median: float | None,
+ ) -> dict:
+ """Re-run Kaplan-Meier and compare median survival."""
+ if not LIFELINES_AVAILABLE:
+ return {"score": 0.5, "warnings": ["lifelines unavailable — KM not recomputed"]}
+
+ try:
+ import numpy as np
+
+ times = np.array(time_data, dtype=float)
+ events = np.array(event_data, dtype=int)
+
+ kmf = KaplanMeierFitter()
+ kmf.fit(times, event_observed=events)
+
+ computed_median = kmf.median_survival_time_
+ if math.isinf(computed_median) or math.isnan(computed_median):
+ computed_note = "Median not reached"
+ if claimed_median is None:
+ return {"score": 0.7, "note": computed_note}
+ return {"score": 0.5, "note": computed_note, "claimed_median": claimed_median}
+
+ if claimed_median is None:
+ return {"score": 0.5, "note": "No median claimed", "computed_median": float(computed_median)}
+
+ tolerance = max(abs(computed_median) * 0.1, 0.5)
+ match = abs(claimed_median - computed_median) <= tolerance
+
+ return {
+ "score": 1.0 if match else 0.3,
+ "match": match,
+ "claimed_median": claimed_median,
+ "computed_median": round(float(computed_median), 4),
+ }
+ except Exception as e:
+ return {"score": 0.3, "error": str(e)}
+
+ @staticmethod
+ def _recompute_logrank(
+ time_data: list, event_data: list, group_labels: list, claimed_pvalue: float | None,
+ ) -> dict:
+ """Re-run log-rank test between two groups."""
+ if not LIFELINES_AVAILABLE:
+ return {"score": 0.5, "note": "lifelines unavailable — log-rank not recomputed"}
+
+ if not group_labels or len(group_labels) != len(time_data):
+ return {"score": 0.5, "note": "group_labels required for log-rank test"}
+
+ try:
+ import numpy as np
+
+ times = np.array(time_data, dtype=float)
+ events = np.array(event_data, dtype=int)
+ groups = np.array(group_labels)
+
+ unique_groups = np.unique(groups)
+ if len(unique_groups) != 2:
+ return {"score": 0.5, "note": f"Need exactly 2 groups, got {len(unique_groups)}"}
+
+ mask1 = groups == unique_groups[0]
+ mask2 = groups == unique_groups[1]
+
+ lr_result = logrank_test(
+ times[mask1], times[mask2],
+ event_observed_A=events[mask1],
+ event_observed_B=events[mask2],
+ )
+ computed_p = lr_result.p_value
+
+ if claimed_pvalue is None:
+ return {"score": 0.5, "note": "No p-value claimed", "computed_p": computed_p}
+
+ tolerance = max(abs(computed_p) * 0.05, 1e-10)
+ match = abs(claimed_pvalue - computed_p) <= tolerance
+
+ return {
+ "score": 1.0 if match else 0.2,
+ "match": match,
+ "claimed_p": claimed_pvalue,
+ "computed_p": computed_p,
+ "test_statistic": lr_result.test_statistic,
+ }
+ except Exception as e:
+ return {"score": 0.3, "error": str(e)}
+
+ @staticmethod
+ def _check_hazard_ratio(hr: float | None) -> dict:
+ """Check if hazard ratio is in plausible range."""
+ if hr is None:
+ return {"score": 0.5, "note": "No hazard ratio claimed"}
+
+ if not isinstance(hr, (int, float)) or hr <= 0:
+ return {"score": 0.0, "error": "HR must be positive"}
+
+ # Typical HR range: 0.1 to 10
+ if 0.1 <= hr <= 10:
+ return {"score": 1.0, "plausible": True, "hazard_ratio": hr}
+ elif 0.01 <= hr <= 100:
+ return {"score": 0.5, "plausible": "borderline", "hazard_ratio": hr}
+ else:
+ return {"score": 0.1, "plausible": False, "hazard_ratio": hr}
diff --git a/backend/verification/genomics_adapter.py b/backend/verification/genomics_adapter.py
new file mode 100644
index 0000000..38e432c
--- /dev/null
+++ b/backend/verification/genomics_adapter.py
@@ -0,0 +1,646 @@
+"""Genomics verification: ClinVar + Ensembl VEP + MyVariant.info + GWAS Catalog.
+
+Validates variant annotations, gene expression claims, and GWAS associations
+using public genomics APIs. API-based (no Docker).
+"""
+from __future__ import annotations
+
+import asyncio
+import re
+import time
+from typing import Any
+
+import httpx
+
+from backend.logging_config import get_logger
+from backend.verification.base import (
+ VerificationAdapter,
+ VerificationResult,
+)
+
+logger = get_logger(__name__)
+
+NCBI_EUTILS = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils"
+ENSEMBL_REST = "https://rest.ensembl.org"
+MYVARIANT_API = "https://myvariant.info/v1"
+GWAS_CATALOG_API = "https://www.ebi.ac.uk/gwas/rest/api"
+HTTP_TIMEOUT = 30
+
+# Plausible ranges
+MAX_MAF = 0.5 # allele frequency can't exceed 0.5 by definition
+GENOME_WIDE_SIG = 5e-8
+
+
+class GenomicsAdapter(VerificationAdapter):
+ domain = "genomics"
+
+ async def verify(self, task_result: dict, task_metadata: dict) -> VerificationResult:
+ claim_type = task_result.get("claim_type", "variant_annotation")
+
+ if claim_type == "variant_annotation":
+ return await self._verify_variant_annotation(task_result)
+ elif claim_type == "gene_expression":
+ return await self._verify_gene_expression(task_result)
+ elif claim_type == "gwas_association":
+ return await self._verify_gwas_association(task_result)
+ else:
+ return VerificationResult.fail(self.domain, [f"Unknown claim_type: {claim_type}"])
+
+ # ------------------------------------------------------------------
+ # variant_annotation
+ # ------------------------------------------------------------------
+
+ async def _verify_variant_annotation(self, result: dict) -> VerificationResult:
+ start = time.monotonic()
+
+ rsid = result.get("rsid", "")
+ hgvs = result.get("hgvs", "")
+ claimed_gene = result.get("gene", "")
+ claimed_consequence = result.get("consequence", "")
+ claimed_significance = result.get("clinical_significance", "")
+ claimed_maf = result.get("maf")
+
+ if not rsid and not hgvs:
+ return VerificationResult.fail(self.domain, ["rsid or hgvs required"])
+
+ component_scores: dict[str, float] = {}
+ details: dict[str, Any] = {"claim_type": "variant_annotation"}
+
+ variant_id = rsid or hgvs
+
+ # Component 1: variant_exists (0.25)
+ exists_result = await self._check_variant_exists(variant_id)
+ component_scores["variant_exists"] = exists_result["score"]
+ details["variant_exists"] = exists_result
+
+ # Component 2: consequence_match (0.25)
+ consequence_result = await self._check_consequence(variant_id, claimed_consequence)
+ component_scores["consequence_match"] = consequence_result["score"]
+ details["consequence_match"] = consequence_result
+
+ # Component 3: gene_match (0.20)
+ gene_result = await self._check_gene_match(variant_id, claimed_gene)
+ component_scores["gene_match"] = gene_result["score"]
+ details["gene_match"] = gene_result
+
+ # Component 4: clinical_significance (0.15)
+ clin_result = await self._check_clinical_significance(variant_id, claimed_significance)
+ component_scores["clinical_significance"] = clin_result["score"]
+ details["clinical_significance"] = clin_result
+
+ # Component 5: population_frequency (0.15)
+ freq_result = await self._check_population_frequency(variant_id, claimed_maf)
+ component_scores["population_frequency"] = freq_result["score"]
+ details["population_frequency"] = freq_result
+
+ weights = {
+ "variant_exists": 0.25,
+ "consequence_match": 0.25,
+ "gene_match": 0.20,
+ "clinical_significance": 0.15,
+ "population_frequency": 0.15,
+ }
+ score = sum(weights[k] * component_scores[k] for k in weights)
+ score = min(1.0, round(score, 4))
+
+ elapsed = time.monotonic() - start
+ details["component_scores"] = component_scores
+
+ return VerificationResult(
+ passed=score >= 0.5,
+ score=score,
+ badge=VerificationResult.score_to_badge(score),
+ domain=self.domain,
+ details=details,
+ compute_time_seconds=elapsed,
+ )
+
+ # ------------------------------------------------------------------
+ # gene_expression
+ # ------------------------------------------------------------------
+
+ async def _verify_gene_expression(self, result: dict) -> VerificationResult:
+ start = time.monotonic()
+
+ gene = result.get("gene", "")
+ dataset_accession = result.get("dataset_accession", "")
+ fold_change = result.get("fold_change")
+ p_value = result.get("p_value")
+
+ if not gene:
+ return VerificationResult.fail(self.domain, ["gene symbol required"])
+
+ component_scores: dict[str, float] = {}
+ details: dict[str, Any] = {"claim_type": "gene_expression"}
+
+ # Component 1: gene_exists (0.20)
+ gene_result = await self._check_gene_exists(gene)
+ component_scores["gene_exists"] = gene_result["score"]
+ details["gene_exists"] = gene_result
+
+ # Component 2: dataset_exists (0.25)
+ dataset_result = await self._check_dataset_exists(dataset_accession)
+ component_scores["dataset_exists"] = dataset_result["score"]
+ details["dataset_exists"] = dataset_result
+
+ # Component 3: expression_range (0.25)
+ expr_result = self._check_expression_range(fold_change)
+ component_scores["expression_range"] = expr_result["score"]
+ details["expression_range"] = expr_result
+
+ # Component 4: statistics_valid (0.30)
+ stats_result = await asyncio.to_thread(self._check_statistics, result)
+ component_scores["statistics_valid"] = stats_result["score"]
+ details["statistics_valid"] = stats_result
+
+ weights = {
+ "gene_exists": 0.20,
+ "dataset_exists": 0.25,
+ "expression_range": 0.25,
+ "statistics_valid": 0.30,
+ }
+ score = sum(weights[k] * component_scores[k] for k in weights)
+ score = min(1.0, round(score, 4))
+
+ elapsed = time.monotonic() - start
+ details["component_scores"] = component_scores
+
+ return VerificationResult(
+ passed=score >= 0.5,
+ score=score,
+ badge=VerificationResult.score_to_badge(score),
+ domain=self.domain,
+ details=details,
+ compute_time_seconds=elapsed,
+ )
+
+ # ------------------------------------------------------------------
+ # gwas_association
+ # ------------------------------------------------------------------
+
+ async def _verify_gwas_association(self, result: dict) -> VerificationResult:
+ start = time.monotonic()
+
+ rsid = result.get("rsid", "")
+ claimed_pvalue = result.get("p_value")
+ claimed_effect_size = result.get("effect_size")
+ claimed_trait = result.get("trait", "")
+
+ if not rsid:
+ return VerificationResult.fail(self.domain, ["rsid required for GWAS claims"])
+
+ component_scores: dict[str, float] = {}
+ details: dict[str, Any] = {"claim_type": "gwas_association"}
+
+ # Component 1: variant_exists (0.20)
+ exists_result = await self._check_variant_exists(rsid)
+ component_scores["variant_exists"] = exists_result["score"]
+ details["variant_exists"] = exists_result
+
+ # Component 2: gwas_catalog_match (0.30)
+ gwas_result = await self._check_gwas_catalog(rsid, claimed_trait)
+ component_scores["gwas_catalog_match"] = gwas_result["score"]
+ details["gwas_catalog_match"] = gwas_result
+
+ # Component 3: pvalue_plausible (0.25)
+ pval_result = self._check_pvalue_plausible(claimed_pvalue)
+ component_scores["pvalue_plausible"] = pval_result["score"]
+ details["pvalue_plausible"] = pval_result
+
+ # Component 4: effect_size_plausible (0.25)
+ effect_result = self._check_effect_size_plausible(claimed_effect_size)
+ component_scores["effect_size_plausible"] = effect_result["score"]
+ details["effect_size_plausible"] = effect_result
+
+ weights = {
+ "variant_exists": 0.20,
+ "gwas_catalog_match": 0.30,
+ "pvalue_plausible": 0.25,
+ "effect_size_plausible": 0.25,
+ }
+ score = sum(weights[k] * component_scores[k] for k in weights)
+ score = min(1.0, round(score, 4))
+
+ elapsed = time.monotonic() - start
+ details["component_scores"] = component_scores
+
+ return VerificationResult(
+ passed=score >= 0.5,
+ score=score,
+ badge=VerificationResult.score_to_badge(score),
+ domain=self.domain,
+ details=details,
+ compute_time_seconds=elapsed,
+ )
+
+ # ------------------------------------------------------------------
+ # API helpers
+ # ------------------------------------------------------------------
+
+ async def _check_variant_exists(self, variant_id: str) -> dict:
+ """Check if a variant (rsID or HGVS) exists in MyVariant.info."""
+ try:
+ async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
+ resp = await client.get(
+ f"{MYVARIANT_API}/variant/{variant_id}",
+ params={"fields": "dbsnp.rsid,clinvar.rcv.clinical_significance"},
+ )
+ if resp.status_code == 200:
+ data = resp.json()
+ return {"score": 1.0, "found": True, "source": "myvariant"}
+ # Try ClinVar via NCBI
+ if variant_id.startswith("rs"):
+ resp2 = await client.get(
+ f"{NCBI_EUTILS}/esearch.fcgi",
+ params={"db": "snp", "term": variant_id, "retmode": "json"},
+ )
+ if resp2.status_code == 200:
+ data2 = resp2.json()
+ count = int(data2.get("esearchresult", {}).get("count", 0))
+ if count > 0:
+ return {"score": 1.0, "found": True, "source": "dbsnp"}
+ return {"score": 0.0, "found": False}
+ except Exception as e:
+ logger.warning("variant_exists_check_failed", error=str(e))
+ return {"score": 0.3, "error": str(e)}
+
+ async def _check_consequence(self, variant_id: str, claimed: str) -> dict:
+ """Compare claimed consequence vs Ensembl VEP."""
+ if not claimed:
+ return {"score": 0.5, "note": "No consequence claimed"}
+ try:
+ async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
+ # Try VEP for rsIDs
+ if variant_id.startswith("rs"):
+ resp = await client.get(
+ f"{ENSEMBL_REST}/vep/human/id/{variant_id}",
+ headers={"Content-Type": "application/json"},
+ )
+ else:
+ resp = await client.get(
+ f"{ENSEMBL_REST}/vep/human/hgvs/{variant_id}",
+ headers={"Content-Type": "application/json"},
+ )
+
+ if resp.status_code != 200:
+ return {"score": 0.3, "note": f"VEP lookup failed (HTTP {resp.status_code})"}
+
+ data = resp.json()
+ if not data:
+ return {"score": 0.3, "note": "No VEP results"}
+
+ # Collect all consequences
+ consequences: set[str] = set()
+ for entry in data:
+ for tc in entry.get("transcript_consequences", []):
+ for ct in tc.get("consequence_terms", []):
+ consequences.add(ct.lower().replace("_", " "))
+
+ claimed_lower = claimed.lower().replace("_", " ")
+ if claimed_lower in consequences:
+ return {"score": 1.0, "match": True, "vep_consequences": sorted(consequences)}
+
+ # Partial match
+ for c in consequences:
+ if claimed_lower in c or c in claimed_lower:
+ return {"score": 0.7, "partial_match": True, "vep_consequences": sorted(consequences)}
+
+ return {"score": 0.0, "match": False, "claimed": claimed, "vep_consequences": sorted(consequences)}
+
+ except Exception as e:
+ logger.warning("consequence_check_failed", error=str(e))
+ return {"score": 0.3, "error": str(e)}
+
+ async def _check_gene_match(self, variant_id: str, claimed_gene: str) -> dict:
+ """Check if variant maps to the claimed gene."""
+ if not claimed_gene:
+ return {"score": 0.5, "note": "No gene claimed"}
+ try:
+ async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
+ resp = await client.get(
+ f"{MYVARIANT_API}/variant/{variant_id}",
+ params={"fields": "dbsnp.gene.symbol,cadd.gene.genename"},
+ )
+ if resp.status_code != 200:
+ return {"score": 0.3, "note": "Gene lookup failed"}
+
+ data = resp.json()
+ genes: set[str] = set()
+
+ # Extract gene symbols from various fields
+ dbsnp_gene = data.get("dbsnp", {}).get("gene", {})
+ if isinstance(dbsnp_gene, dict):
+ sym = dbsnp_gene.get("symbol", "")
+ if sym:
+ genes.add(sym.upper())
+ elif isinstance(dbsnp_gene, list):
+ for g in dbsnp_gene:
+ sym = g.get("symbol", "") if isinstance(g, dict) else ""
+ if sym:
+ genes.add(sym.upper())
+
+ cadd_gene = data.get("cadd", {}).get("gene", {})
+ if isinstance(cadd_gene, dict):
+ gn = cadd_gene.get("genename", "")
+ if gn:
+ genes.add(gn.upper())
+
+ if not genes:
+ return {"score": 0.5, "note": "No gene info in database"}
+
+ if claimed_gene.upper() in genes:
+ return {"score": 1.0, "match": True, "db_genes": sorted(genes)}
+
+ return {"score": 0.0, "match": False, "claimed": claimed_gene, "db_genes": sorted(genes)}
+
+ except Exception as e:
+ logger.warning("gene_match_check_failed", error=str(e))
+ return {"score": 0.3, "error": str(e)}
+
+ async def _check_clinical_significance(self, variant_id: str, claimed: str) -> dict:
+ """Check claimed pathogenicity vs ClinVar."""
+ if not claimed:
+ return {"score": 0.5, "note": "No clinical significance claimed"}
+ try:
+ async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
+ resp = await client.get(
+ f"{MYVARIANT_API}/variant/{variant_id}",
+ params={"fields": "clinvar.rcv.clinical_significance"},
+ )
+ if resp.status_code != 200:
+ return {"score": 0.3, "note": "ClinVar lookup failed"}
+
+ data = resp.json()
+ clinvar = data.get("clinvar", {})
+ rcv = clinvar.get("rcv", [])
+ if isinstance(rcv, dict):
+ rcv = [rcv]
+
+ significances: set[str] = set()
+ for entry in rcv:
+ sig = entry.get("clinical_significance", "")
+ if sig:
+ significances.add(sig.lower())
+
+ if not significances:
+ return {"score": 0.5, "note": "No ClinVar classification"}
+
+ claimed_lower = claimed.lower()
+ if claimed_lower in significances:
+ return {"score": 1.0, "match": True, "clinvar": sorted(significances)}
+
+ # Partial match (e.g., "likely pathogenic" vs "pathogenic")
+ for s in significances:
+ if claimed_lower in s or s in claimed_lower:
+ return {"score": 0.7, "partial_match": True, "clinvar": sorted(significances)}
+
+ return {"score": 0.0, "match": False, "claimed": claimed, "clinvar": sorted(significances)}
+
+ except Exception as e:
+ logger.warning("clinical_significance_check_failed", error=str(e))
+ return {"score": 0.3, "error": str(e)}
+
+ async def _check_population_frequency(self, variant_id: str, claimed_maf: float | None) -> dict:
+ """Compare claimed MAF vs gnomAD data from MyVariant.info."""
+ if claimed_maf is None:
+ return {"score": 0.5, "note": "No MAF claimed"}
+
+ if not (0 <= claimed_maf <= MAX_MAF):
+ return {"score": 0.0, "error": f"MAF {claimed_maf} out of range [0, {MAX_MAF}]"}
+
+ try:
+ async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
+ resp = await client.get(
+ f"{MYVARIANT_API}/variant/{variant_id}",
+ params={"fields": "gnomad_genome.af.af,dbsnp.alleles.freq"},
+ )
+ if resp.status_code != 200:
+ return {"score": 0.3, "note": "Frequency lookup failed"}
+
+ data = resp.json()
+
+ # Try gnomAD
+ gnomad_af = data.get("gnomad_genome", {}).get("af", {}).get("af")
+ if gnomad_af is not None:
+ tolerance = max(abs(gnomad_af) * 0.20, 0.01)
+ match = abs(claimed_maf - gnomad_af) <= tolerance
+ return {
+ "score": 1.0 if match else 0.3,
+ "match": match,
+ "claimed": claimed_maf,
+ "gnomad": gnomad_af,
+ "tolerance": tolerance,
+ }
+
+ return {"score": 0.5, "note": "No frequency data available"}
+
+ except Exception as e:
+ logger.warning("frequency_check_failed", error=str(e))
+ return {"score": 0.3, "error": str(e)}
+
+ async def _check_gene_exists(self, gene: str) -> dict:
+ """Check if gene symbol is valid in Ensembl."""
+ try:
+ async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
+ resp = await client.get(
+ f"{ENSEMBL_REST}/lookup/symbol/homo_sapiens/{gene}",
+ headers={"Content-Type": "application/json"},
+ )
+ if resp.status_code == 200:
+ data = resp.json()
+ return {
+ "score": 1.0,
+ "found": True,
+ "ensembl_id": data.get("id", ""),
+ "biotype": data.get("biotype", ""),
+ }
+ return {"score": 0.0, "found": False}
+ except Exception as e:
+ logger.warning("gene_exists_check_failed", error=str(e))
+ return {"score": 0.3, "error": str(e)}
+
+ async def _check_dataset_exists(self, accession: str) -> dict:
+ """Check if GEO/ArrayExpress accession exists."""
+ if not accession:
+ return {"score": 0.5, "note": "No dataset accession provided"}
+
+ try:
+ async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
+ # GEO accession (GSE...)
+ if accession.upper().startswith("GSE"):
+ resp = await client.get(
+ f"{NCBI_EUTILS}/esearch.fcgi",
+ params={"db": "gds", "term": accession, "retmode": "json"},
+ )
+ if resp.status_code == 200:
+ data = resp.json()
+ count = int(data.get("esearchresult", {}).get("count", 0))
+ if count > 0:
+ return {"score": 1.0, "found": True, "source": "geo"}
+ # ArrayExpress (E-MTAB-...)
+ elif accession.upper().startswith("E-"):
+ resp = await client.get(
+ f"https://www.ebi.ac.uk/biostudies/api/v1/studies/{accession}",
+ )
+ if resp.status_code == 200:
+ return {"score": 1.0, "found": True, "source": "biostudies"}
+
+ return {"score": 0.0, "found": False}
+ except Exception as e:
+ logger.warning("dataset_exists_check_failed", error=str(e))
+ return {"score": 0.3, "error": str(e)}
+
+ @staticmethod
+ def _check_expression_range(fold_change: float | None) -> dict:
+ """Check if claimed fold change is biologically plausible."""
+ if fold_change is None:
+ return {"score": 0.5, "note": "No fold change claimed"}
+
+ abs_fc = abs(fold_change)
+ if abs_fc == 0:
+ return {"score": 0.0, "error": "Fold change cannot be zero"}
+
+ # Typical range: 0.1 to 100
+ if 0.1 <= abs_fc <= 100:
+ return {"score": 1.0, "plausible": True, "fold_change": fold_change}
+ elif 0.01 <= abs_fc <= 1000:
+ return {"score": 0.5, "plausible": "borderline", "fold_change": fold_change}
+ else:
+ return {"score": 0.0, "plausible": False, "fold_change": fold_change}
+
+ @staticmethod
+ def _check_statistics(result: dict) -> dict:
+ """Re-run p-value calculation with scipy if data provided."""
+ try:
+ from scipy import stats as sp_stats
+ except ImportError:
+ return {"score": 0.5, "note": "scipy unavailable"}
+
+ p_value = result.get("p_value")
+ test_type = result.get("test_type", "")
+ group1 = result.get("group1_data", [])
+ group2 = result.get("group2_data", [])
+
+ if p_value is None:
+ return {"score": 0.5, "note": "No p-value claimed"}
+
+ if not isinstance(p_value, (int, float)) or p_value < 0 or p_value > 1:
+ return {"score": 0.0, "error": f"Invalid p-value: {p_value}"}
+
+ # If raw data provided, re-compute
+ if group1 and group2:
+ try:
+ if test_type == "mannwhitneyu":
+ _, recomputed_p = sp_stats.mannwhitneyu(group1, group2, alternative="two-sided")
+ else:
+ _, recomputed_p = sp_stats.ttest_ind(group1, group2)
+
+ tolerance = max(abs(recomputed_p) * 0.05, 1e-10)
+ match = abs(p_value - recomputed_p) <= tolerance
+
+ return {
+ "score": 1.0 if match else 0.3,
+ "match": match,
+ "claimed_p": p_value,
+ "recomputed_p": recomputed_p,
+ }
+ except Exception as e:
+ return {"score": 0.3, "error": f"Re-computation failed: {e}"}
+
+ # No raw data — just check plausibility
+ return {"score": 0.5, "note": "No raw data for re-computation", "p_value": p_value}
+
+ async def _check_gwas_catalog(self, rsid: str, claimed_trait: str) -> dict:
+ """Cross-reference with NHGRI-EBI GWAS Catalog."""
+ try:
+ async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
+ resp = await client.get(
+ f"{GWAS_CATALOG_API}/singleNucleotidePolymorphisms/{rsid}/associations",
+ )
+ if resp.status_code != 200:
+ return {"score": 0.3, "note": f"GWAS Catalog lookup failed (HTTP {resp.status_code})"}
+
+ data = resp.json()
+ associations = data.get("_embedded", {}).get("associations", [])
+
+ if not associations:
+ return {"score": 0.3, "note": "Not found in GWAS Catalog"}
+
+ traits: list[str] = []
+ for assoc in associations:
+ for ef in assoc.get("efoTraits", []):
+ traits.append(ef.get("trait", ""))
+
+ if not claimed_trait:
+ return {"score": 0.7, "found_in_catalog": True, "n_associations": len(associations)}
+
+ claimed_lower = claimed_trait.lower()
+ for t in traits:
+ if claimed_lower in t.lower() or t.lower() in claimed_lower:
+ return {
+ "score": 1.0,
+ "trait_match": True,
+ "catalog_traits": traits[:10],
+ "n_associations": len(associations),
+ }
+
+ return {
+ "score": 0.5,
+ "trait_match": False,
+ "claimed": claimed_trait,
+ "catalog_traits": traits[:10],
+ }
+
+ except Exception as e:
+ logger.warning("gwas_catalog_check_failed", error=str(e))
+ return {"score": 0.3, "error": str(e)}
+
+ @staticmethod
+ def _check_pvalue_plausible(p_value: float | None) -> dict:
+ """Check if GWAS p-value is in expected range."""
+ if p_value is None:
+ return {"score": 0.5, "note": "No p-value claimed"}
+
+ if not isinstance(p_value, (int, float)):
+ return {"score": 0.0, "error": "p-value must be numeric"}
+
+ if p_value < 0 or p_value > 1:
+ return {"score": 0.0, "error": f"p-value {p_value} out of range [0, 1]"}
+
+ # For GWAS, genome-wide significance is typically 5e-8
+ if p_value <= GENOME_WIDE_SIG:
+ return {"score": 1.0, "genome_wide_significant": True, "p_value": p_value}
+ elif p_value <= 1e-5:
+ return {"score": 0.7, "suggestive_significance": True, "p_value": p_value}
+ elif p_value <= 0.05:
+ return {"score": 0.4, "nominally_significant": True, "p_value": p_value}
+ else:
+ return {"score": 0.2, "not_significant": True, "p_value": p_value}
+
+ @staticmethod
+ def _check_effect_size_plausible(effect_size: float | None) -> dict:
+ """Check if GWAS effect size (beta or OR) is in reasonable range."""
+ if effect_size is None:
+ return {"score": 0.5, "note": "No effect size claimed"}
+
+ if not isinstance(effect_size, (int, float)):
+ return {"score": 0.0, "error": "Effect size must be numeric"}
+
+ abs_effect = abs(effect_size)
+
+ # For odds ratios (typically 0.5 - 5.0 for common variants)
+ if effect_size > 0:
+ if 0.5 <= abs_effect <= 5.0:
+ return {"score": 1.0, "plausible": True, "effect_size": effect_size}
+ elif 0.1 <= abs_effect <= 20.0:
+ return {"score": 0.5, "plausible": "borderline", "effect_size": effect_size}
+ else:
+ return {"score": 0.1, "plausible": False, "effect_size": effect_size}
+
+ # For betas (typically -2 to 2 for common variants)
+ if abs_effect <= 2.0:
+ return {"score": 1.0, "plausible": True, "effect_size": effect_size}
+ elif abs_effect <= 10.0:
+ return {"score": 0.5, "plausible": "borderline", "effect_size": effect_size}
+ else:
+ return {"score": 0.1, "plausible": False, "effect_size": effect_size}
diff --git a/backend/verification/immunoinformatics_adapter.py b/backend/verification/immunoinformatics_adapter.py
new file mode 100644
index 0000000..5059f6d
--- /dev/null
+++ b/backend/verification/immunoinformatics_adapter.py
@@ -0,0 +1,611 @@
+"""Immunoinformatics verification: IEDB API + UniProt + IMGT/HLA.
+
+Validates epitope predictions, MHC binding affinity, and B-cell epitope
+claims using public immunology APIs. API-based (no Docker).
+"""
+from __future__ import annotations
+
+import re
+import time
+from typing import Any
+
+import httpx
+
+from backend.logging_config import get_logger
+from backend.verification.base import (
+ VerificationAdapter,
+ VerificationResult,
+)
+
+logger = get_logger(__name__)
+
+IEDB_API = "http://tools-cluster-interface.iedb.org/tools_api"
+UNIPROT_BASE = "https://rest.uniprot.org/uniprotkb"
+ENSEMBL_REST = "https://rest.ensembl.org"
+HTTP_TIMEOUT = 60 # IEDB can be slow
+
+VALID_AMINO_ACIDS = set("ACDEFGHIKLMNPQRSTVWY")
+
+# MHC-I typically binds 8-11mer, MHC-II 13-25mer
+MHC_I_LENGTH = (8, 11)
+MHC_II_LENGTH = (13, 25)
+
+# IC50 binding thresholds (nM)
+STRONG_BINDER_THRESHOLD = 50
+WEAK_BINDER_THRESHOLD = 500
+
+
+class ImmunoinformaticsAdapter(VerificationAdapter):
+ domain = "immunoinformatics"
+
+ async def verify(self, task_result: dict, task_metadata: dict) -> VerificationResult:
+ claim_type = task_result.get("claim_type", "epitope_prediction")
+
+ if claim_type == "epitope_prediction":
+ return await self._verify_epitope_prediction(task_result)
+ elif claim_type == "mhc_binding":
+ return await self._verify_mhc_binding(task_result)
+ elif claim_type == "bcell_epitope":
+ return await self._verify_bcell_epitope(task_result)
+ else:
+ return VerificationResult.fail(self.domain, [f"Unknown claim_type: {claim_type}"])
+
+ # ------------------------------------------------------------------
+ # epitope_prediction
+ # ------------------------------------------------------------------
+
+ async def _verify_epitope_prediction(self, result: dict) -> VerificationResult:
+ start = time.monotonic()
+
+ peptide = result.get("peptide", "")
+ source_protein = result.get("source_protein", "")
+ hla_allele = result.get("hla_allele", "")
+ claimed_score = result.get("prediction_score")
+
+ if not peptide:
+ return VerificationResult.fail(self.domain, ["peptide sequence required"])
+
+ component_scores: dict[str, float] = {}
+ details: dict[str, Any] = {"claim_type": "epitope_prediction"}
+
+ # Component 1: peptide_valid (0.15)
+ pep_result = self._check_peptide_valid(peptide, min_len=8, max_len=15)
+ component_scores["peptide_valid"] = pep_result["score"]
+ details["peptide_valid"] = pep_result
+
+ # Component 2: source_protein_valid (0.20)
+ prot_result = await self._check_protein_valid(source_protein)
+ component_scores["source_protein_valid"] = prot_result["score"]
+ details["source_protein_valid"] = prot_result
+
+ # Component 3: peptide_in_source (0.20)
+ in_source_result = await self._check_peptide_in_source(peptide, source_protein)
+ component_scores["peptide_in_source"] = in_source_result["score"]
+ details["peptide_in_source"] = in_source_result
+
+ # Component 4: iedb_score_check (0.25)
+ iedb_result = await self._check_iedb_score(peptide, hla_allele, claimed_score)
+ component_scores["iedb_score_check"] = iedb_result["score"]
+ details["iedb_score_check"] = iedb_result
+
+ # Component 5: allele_valid (0.20)
+ allele_result = self._check_allele_valid(hla_allele)
+ component_scores["allele_valid"] = allele_result["score"]
+ details["allele_valid"] = allele_result
+
+ weights = {
+ "peptide_valid": 0.15,
+ "source_protein_valid": 0.20,
+ "peptide_in_source": 0.20,
+ "iedb_score_check": 0.25,
+ "allele_valid": 0.20,
+ }
+ score = sum(weights[k] * component_scores[k] for k in weights)
+ score = min(1.0, round(score, 4))
+
+ elapsed = time.monotonic() - start
+ details["component_scores"] = component_scores
+
+ return VerificationResult(
+ passed=score >= 0.5,
+ score=score,
+ badge=VerificationResult.score_to_badge(score),
+ domain=self.domain,
+ details=details,
+ compute_time_seconds=elapsed,
+ )
+
+ # ------------------------------------------------------------------
+ # mhc_binding
+ # ------------------------------------------------------------------
+
+ async def _verify_mhc_binding(self, result: dict) -> VerificationResult:
+ start = time.monotonic()
+
+ peptide = result.get("peptide", "")
+ hla_allele = result.get("hla_allele", "")
+ mhc_class = result.get("mhc_class", "I")
+ claimed_ic50 = result.get("binding_affinity")
+ claimed_classification = result.get("classification", "")
+
+ if not peptide:
+ return VerificationResult.fail(self.domain, ["peptide sequence required"])
+
+ component_scores: dict[str, float] = {}
+ details: dict[str, Any] = {"claim_type": "mhc_binding"}
+
+ # Component 1: allele_valid (0.20)
+ allele_result = self._check_allele_valid(hla_allele)
+ component_scores["allele_valid"] = allele_result["score"]
+ details["allele_valid"] = allele_result
+
+ # Component 2: peptide_length_valid (0.15)
+ length_range = MHC_I_LENGTH if mhc_class.upper() in ("I", "1") else MHC_II_LENGTH
+ length_result = self._check_peptide_valid(peptide, min_len=length_range[0], max_len=length_range[1])
+ component_scores["peptide_length_valid"] = length_result["score"]
+ details["peptide_length_valid"] = length_result
+
+ # Component 3: binding_affinity_recomputed (0.35)
+ binding_result = await self._check_binding_affinity(
+ peptide, hla_allele, mhc_class, claimed_ic50,
+ )
+ component_scores["binding_affinity_recomputed"] = binding_result["score"]
+ details["binding_affinity_recomputed"] = binding_result
+
+ # Component 4: classification_consistent (0.30)
+ class_result = self._check_classification_consistent(
+ claimed_ic50, claimed_classification,
+ )
+ component_scores["classification_consistent"] = class_result["score"]
+ details["classification_consistent"] = class_result
+
+ weights = {
+ "allele_valid": 0.20,
+ "peptide_length_valid": 0.15,
+ "binding_affinity_recomputed": 0.35,
+ "classification_consistent": 0.30,
+ }
+ score = sum(weights[k] * component_scores[k] for k in weights)
+ score = min(1.0, round(score, 4))
+
+ elapsed = time.monotonic() - start
+ details["component_scores"] = component_scores
+
+ return VerificationResult(
+ passed=score >= 0.5,
+ score=score,
+ badge=VerificationResult.score_to_badge(score),
+ domain=self.domain,
+ details=details,
+ compute_time_seconds=elapsed,
+ )
+
+ # ------------------------------------------------------------------
+ # bcell_epitope
+ # ------------------------------------------------------------------
+
+ async def _verify_bcell_epitope(self, result: dict) -> VerificationResult:
+ start = time.monotonic()
+
+ sequence = result.get("sequence", "")
+ source_protein = result.get("source_protein", "")
+ claimed_accessibility = result.get("surface_accessibility")
+
+ if not sequence:
+ return VerificationResult.fail(self.domain, ["sequence required"])
+
+ component_scores: dict[str, float] = {}
+ details: dict[str, Any] = {"claim_type": "bcell_epitope"}
+
+ # Component 1: sequence_valid (0.15)
+ seq_result = self._check_peptide_valid(sequence, min_len=5, max_len=50)
+ component_scores["sequence_valid"] = seq_result["score"]
+ details["sequence_valid"] = seq_result
+
+ # Component 2: source_protein_valid (0.20)
+ prot_result = await self._check_protein_valid(source_protein)
+ component_scores["source_protein_valid"] = prot_result["score"]
+ details["source_protein_valid"] = prot_result
+
+ # Component 3: surface_accessibility (0.25)
+ acc_result = self._check_surface_accessibility(sequence, claimed_accessibility)
+ component_scores["surface_accessibility"] = acc_result["score"]
+ details["surface_accessibility"] = acc_result
+
+ # Component 4: iedb_bcell_check (0.25)
+ bcell_result = await self._check_iedb_bcell(sequence)
+ component_scores["iedb_bcell_check"] = bcell_result["score"]
+ details["iedb_bcell_check"] = bcell_result
+
+ # Component 5: conservation_check (0.15)
+ conserv_result = await self._check_conservation(sequence, source_protein)
+ component_scores["conservation_check"] = conserv_result["score"]
+ details["conservation_check"] = conserv_result
+
+ weights = {
+ "sequence_valid": 0.15,
+ "source_protein_valid": 0.20,
+ "surface_accessibility": 0.25,
+ "iedb_bcell_check": 0.25,
+ "conservation_check": 0.15,
+ }
+ score = sum(weights[k] * component_scores[k] for k in weights)
+ score = min(1.0, round(score, 4))
+
+ elapsed = time.monotonic() - start
+ details["component_scores"] = component_scores
+
+ return VerificationResult(
+ passed=score >= 0.5,
+ score=score,
+ badge=VerificationResult.score_to_badge(score),
+ domain=self.domain,
+ details=details,
+ compute_time_seconds=elapsed,
+ )
+
+ # ------------------------------------------------------------------
+ # Shared helpers
+ # ------------------------------------------------------------------
+
+ @staticmethod
+ def _check_peptide_valid(peptide: str, min_len: int = 8, max_len: int = 15) -> dict:
+ """Validate peptide sequence (standard amino acids, length)."""
+ if not peptide:
+ return {"score": 0.0, "error": "Empty peptide"}
+
+ upper = peptide.upper()
+ invalid = [aa for aa in upper if aa not in VALID_AMINO_ACIDS]
+
+ if invalid:
+ return {
+ "score": 0.0,
+ "error": f"Invalid amino acids: {''.join(set(invalid))}",
+ "length": len(upper),
+ }
+
+ if len(upper) < min_len or len(upper) > max_len:
+ return {
+ "score": 0.3,
+ "note": f"Length {len(upper)} outside [{min_len}, {max_len}]",
+ "length": len(upper),
+ }
+
+ return {"score": 1.0, "valid": True, "length": len(upper)}
+
+ async def _check_protein_valid(self, protein_id: str) -> dict:
+ """Check if protein exists in UniProt."""
+ if not protein_id:
+ return {"score": 0.5, "note": "No source protein provided"}
+
+ try:
+ async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
+ resp = await client.get(
+ f"{UNIPROT_BASE}/{protein_id}",
+ headers={"Accept": "application/json"},
+ )
+ if resp.status_code == 200:
+ data = resp.json()
+ return {
+ "score": 1.0,
+ "found": True,
+ "protein_name": data.get("proteinDescription", {})
+ .get("recommendedName", {}).get("fullName", {}).get("value", ""),
+ "organism": data.get("organism", {}).get("scientificName", ""),
+ }
+ return {"score": 0.0, "found": False}
+ except Exception as e:
+ logger.warning("protein_valid_check_failed", error=str(e))
+ return {"score": 0.3, "error": str(e)}
+
+ async def _check_peptide_in_source(self, peptide: str, protein_id: str) -> dict:
+ """Check if peptide is a substring of the source protein."""
+ if not protein_id:
+ return {"score": 0.5, "note": "No source protein to check"}
+
+ try:
+ async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
+ resp = await client.get(
+ f"{UNIPROT_BASE}/{protein_id}.fasta",
+ )
+ if resp.status_code != 200:
+ return {"score": 0.3, "note": "Could not fetch protein sequence"}
+
+ fasta_lines = resp.text.strip().split("\n")
+ protein_seq = "".join(line for line in fasta_lines if not line.startswith(">"))
+
+ if peptide.upper() in protein_seq.upper():
+ return {"score": 1.0, "found": True}
+ return {"score": 0.0, "found": False}
+ except Exception as e:
+ logger.warning("peptide_in_source_check_failed", error=str(e))
+ return {"score": 0.3, "error": str(e)}
+
+ @staticmethod
+ def _check_allele_valid(hla_allele: str) -> dict:
+ """Validate HLA allele format."""
+ if not hla_allele:
+ return {"score": 0.5, "note": "No HLA allele provided"}
+
+ # Standard HLA nomenclature: HLA-A*02:01, HLA-B*07:02, etc.
+ pattern = r"^HLA-[A-Z]+\d?\*\d{2,4}:\d{2,4}$"
+ if re.match(pattern, hla_allele):
+ return {"score": 1.0, "valid": True, "allele": hla_allele}
+
+ # Relaxed format: A*02:01, B0702
+ relaxed = r"^[A-Z]+\d?\*?\d{2,4}:?\d{2,4}$"
+ if re.match(relaxed, hla_allele):
+ return {"score": 0.7, "valid": True, "note": "Non-standard format", "allele": hla_allele}
+
+ return {"score": 0.0, "valid": False, "allele": hla_allele}
+
+ async def _check_iedb_score(
+ self, peptide: str, hla_allele: str, claimed_score: float | None,
+ ) -> dict:
+ """Run IEDB MHC-I prediction and compare score."""
+ if not hla_allele:
+ return {"score": 0.5, "note": "No allele for IEDB prediction"}
+
+ try:
+ async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
+ resp = await client.post(
+ f"{IEDB_API}/mhci/",
+ data={
+ "method": "recommended",
+ "sequence_text": peptide,
+ "allele": hla_allele,
+ "length": str(len(peptide)),
+ },
+ )
+ if resp.status_code != 200:
+ return {"score": 0.3, "note": f"IEDB API failed (HTTP {resp.status_code})"}
+
+ text = resp.text.strip()
+ if not text or "error" in text.lower():
+ return {"score": 0.3, "note": "IEDB returned error or empty"}
+
+ # Parse tab-delimited results
+ lines = text.strip().split("\n")
+ if len(lines) < 2:
+ return {"score": 0.5, "note": "No prediction results"}
+
+ # Last column is typically the score/IC50
+ header = lines[0].split("\t")
+ data_line = lines[-1].split("\t")
+
+ if claimed_score is None:
+ return {"score": 0.5, "note": "No score claimed", "iedb_available": True}
+
+ # Try to extract IC50 value
+ try:
+ iedb_score = float(data_line[-1])
+ tolerance = max(abs(iedb_score) * 0.2, 1.0)
+ match = abs(claimed_score - iedb_score) <= tolerance
+ return {
+ "score": 1.0 if match else 0.3,
+ "match": match,
+ "claimed": claimed_score,
+ "iedb": iedb_score,
+ }
+ except (ValueError, IndexError):
+ return {"score": 0.5, "note": "Could not parse IEDB score"}
+
+ except Exception as e:
+ logger.warning("iedb_score_check_failed", error=str(e))
+ return {"score": 0.3, "error": str(e)}
+
+ @staticmethod
+ def _check_classification_consistent(
+ ic50: float | None, classification: str,
+ ) -> dict:
+ """Check if binder classification matches IC50 thresholds."""
+ if not classification or ic50 is None:
+ return {"score": 0.5, "note": "Classification or IC50 missing"}
+
+ classification_lower = classification.lower()
+
+ if "strong" in classification_lower:
+ expected = ic50 <= STRONG_BINDER_THRESHOLD
+ elif "weak" in classification_lower:
+ expected = STRONG_BINDER_THRESHOLD < ic50 <= WEAK_BINDER_THRESHOLD
+ elif "non" in classification_lower or "no" in classification_lower:
+ expected = ic50 > WEAK_BINDER_THRESHOLD
+ else:
+ return {"score": 0.5, "note": f"Unknown classification: {classification}"}
+
+ return {
+ "score": 1.0 if expected else 0.0,
+ "consistent": expected,
+ "ic50": ic50,
+ "classification": classification,
+ "thresholds": {
+ "strong": STRONG_BINDER_THRESHOLD,
+ "weak": WEAK_BINDER_THRESHOLD,
+ },
+ }
+
+ async def _check_binding_affinity(
+ self, peptide: str, hla_allele: str, mhc_class: str, claimed_ic50: float | None,
+ ) -> dict:
+ """Re-run IEDB prediction for binding affinity."""
+ if not hla_allele:
+ return {"score": 0.5, "note": "No allele for binding prediction"}
+
+ endpoint = "mhci" if mhc_class.upper() in ("I", "1") else "mhcii"
+
+ try:
+ async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
+ data = {
+ "method": "recommended",
+ "sequence_text": peptide,
+ "allele": hla_allele,
+ "length": str(len(peptide)),
+ }
+ resp = await client.post(f"{IEDB_API}/{endpoint}/", data=data)
+
+ if resp.status_code != 200:
+ return {"score": 0.3, "note": f"IEDB binding API failed (HTTP {resp.status_code})"}
+
+ text = resp.text.strip()
+ lines = text.strip().split("\n")
+ if len(lines) < 2:
+ return {"score": 0.5, "note": "No prediction results"}
+
+ data_line = lines[-1].split("\t")
+
+ if claimed_ic50 is None:
+ return {"score": 0.5, "note": "No IC50 claimed", "iedb_available": True}
+
+ try:
+ iedb_ic50 = float(data_line[-1])
+ # Compare on log scale for IC50
+ import math
+ if iedb_ic50 > 0 and claimed_ic50 > 0:
+ log_diff = abs(math.log10(iedb_ic50) - math.log10(claimed_ic50))
+ match = log_diff <= 0.5 # within half an order of magnitude
+ else:
+ match = False
+
+ return {
+ "score": 1.0 if match else 0.3,
+ "match": match,
+ "claimed_ic50": claimed_ic50,
+ "iedb_ic50": iedb_ic50,
+ }
+ except (ValueError, IndexError):
+ return {"score": 0.5, "note": "Could not parse IEDB IC50"}
+
+ except Exception as e:
+ logger.warning("binding_affinity_check_failed", error=str(e))
+ return {"score": 0.3, "error": str(e)}
+
+ @staticmethod
+ def _check_surface_accessibility(sequence: str, claimed_accessibility: float | None) -> dict:
+ """Check surface accessibility using hydrophobicity profile."""
+ if not sequence:
+ return {"score": 0.5, "note": "No sequence for accessibility check"}
+
+ # Kyte-Doolittle hydrophobicity scale
+ kd_scale = {
+ "A": 1.8, "C": 2.5, "D": -3.5, "E": -3.5, "F": 2.8,
+ "G": -0.4, "H": -3.2, "I": 4.5, "K": -3.9, "L": 3.8,
+ "M": 1.9, "N": -3.5, "P": -1.6, "Q": -3.5, "R": -4.5,
+ "S": -0.8, "T": -0.7, "V": 4.2, "W": -0.9, "Y": -1.3,
+ }
+
+ hydrophobicity = [kd_scale.get(aa, 0) for aa in sequence.upper()]
+ avg_hydrophobicity = sum(hydrophobicity) / len(hydrophobicity) if hydrophobicity else 0
+
+ # B-cell epitopes tend to be hydrophilic (negative hydrophobicity)
+ # Score higher if avg hydrophobicity is negative (more accessible)
+ if avg_hydrophobicity < -1.0:
+ access_score = 1.0
+ elif avg_hydrophobicity < 0:
+ access_score = 0.7
+ elif avg_hydrophobicity < 1.0:
+ access_score = 0.4
+ else:
+ access_score = 0.2
+
+ result: dict[str, Any] = {
+ "score": access_score,
+ "avg_hydrophobicity": round(avg_hydrophobicity, 3),
+ }
+
+ if claimed_accessibility is not None:
+ # If they claim high accessibility but peptide is hydrophobic, penalize
+ if claimed_accessibility > 0.5 and avg_hydrophobicity > 1.0:
+ result["score"] = 0.2
+ result["note"] = "Claimed accessible but hydrophobic"
+ elif claimed_accessibility < 0.3 and avg_hydrophobicity < -1.0:
+ result["score"] = 0.2
+ result["note"] = "Claimed buried but hydrophilic"
+
+ return result
+
+ async def _check_iedb_bcell(self, sequence: str) -> dict:
+ """Run IEDB B-cell epitope prediction."""
+ try:
+ async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
+ resp = await client.post(
+ f"{IEDB_API}/bcell/",
+ data={
+ "method": "Bepipred",
+ "sequence_text": sequence,
+ },
+ )
+ if resp.status_code != 200:
+ return {"score": 0.5, "note": f"IEDB B-cell API failed (HTTP {resp.status_code})"}
+
+ text = resp.text.strip()
+ if not text or "error" in text.lower():
+ return {"score": 0.5, "note": "IEDB B-cell returned error"}
+
+ # Parse results — look for positive predictions
+ lines = text.strip().split("\n")
+ if len(lines) < 2:
+ return {"score": 0.5, "note": "No B-cell prediction results"}
+
+ # Count positive predictions
+ positive = 0
+ total = 0
+ for line in lines[1:]:
+ parts = line.split("\t")
+ if len(parts) >= 2:
+ total += 1
+ try:
+ score_val = float(parts[-1])
+ if score_val > 0.5:
+ positive += 1
+ except ValueError:
+ pass
+
+ if total == 0:
+ return {"score": 0.5, "note": "Could not parse B-cell results"}
+
+ frac_positive = positive / total
+ return {
+ "score": min(1.0, frac_positive * 2), # Scale up
+ "positive_residues": positive,
+ "total_residues": total,
+ "fraction_positive": round(frac_positive, 3),
+ }
+
+ except Exception as e:
+ logger.warning("iedb_bcell_check_failed", error=str(e))
+ return {"score": 0.3, "error": str(e)}
+
+ async def _check_conservation(self, sequence: str, protein_id: str) -> dict:
+ """Check sequence conservation (simplified check via UniProt)."""
+ if not protein_id:
+ return {"score": 0.5, "note": "No protein ID for conservation check"}
+
+ try:
+ async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
+ # Check if the protein has orthologs noted in UniProt
+ resp = await client.get(
+ f"{UNIPROT_BASE}/{protein_id}",
+ headers={"Accept": "application/json"},
+ )
+ if resp.status_code != 200:
+ return {"score": 0.5, "note": "Could not fetch protein info"}
+
+ data = resp.json()
+ # Check for cross-references to ortholog databases
+ xrefs = data.get("uniProtKBCrossReferences", [])
+ orthodb_refs = [
+ x for x in xrefs
+ if x.get("database") in ("OrthoDB", "OMA", "InParanoid")
+ ]
+
+ if orthodb_refs:
+ return {
+ "score": 0.8,
+ "conserved": True,
+ "ortholog_databases": len(orthodb_refs),
+ }
+ return {"score": 0.5, "note": "No ortholog data available"}
+
+ except Exception as e:
+ logger.warning("conservation_check_failed", error=str(e))
+ return {"score": 0.5, "error": str(e)}
diff --git a/backend/verification/metabolomics_adapter.py b/backend/verification/metabolomics_adapter.py
new file mode 100644
index 0000000..c7b08f0
--- /dev/null
+++ b/backend/verification/metabolomics_adapter.py
@@ -0,0 +1,636 @@
+"""Metabolomics verification: HMDB + PubChem + KEGG + MassBank.
+
+Validates compound identification, pathway mapping, and spectral matching
+using public metabolomics APIs. API-based (no Docker).
+"""
+from __future__ import annotations
+
+import math
+import time
+from typing import Any
+
+import httpx
+
+from backend.logging_config import get_logger
+from backend.verification.base import (
+ VerificationAdapter,
+ VerificationResult,
+)
+
+logger = get_logger(__name__)
+
+HMDB_API = "https://hmdb.ca/metabolites"
+PUBCHEM_API = "https://pubchem.ncbi.nlm.nih.gov/rest/pug"
+KEGG_REST = "https://rest.kegg.jp"
+MASSBANK_API = "https://massbank.eu/MassBank/rest"
+HTTP_TIMEOUT = 30
+
+# Standard adduct types for mass spectrometry
+STANDARD_ADDUCTS = {
+ "[M+H]+", "[M+Na]+", "[M+K]+", "[M+NH4]+",
+ "[M-H]-", "[M+Cl]-", "[M+FA-H]-", "[M+CH3COO]-",
+ "[M+2H]2+", "[M-H2O+H]+", "[M-2H]2-",
+}
+
+
+class MetabolomicsAdapter(VerificationAdapter):
+ domain = "metabolomics"
+
+ async def verify(self, task_result: dict, task_metadata: dict) -> VerificationResult:
+ claim_type = task_result.get("claim_type", "compound_identification")
+
+ if claim_type == "compound_identification":
+ return await self._verify_compound_identification(task_result)
+ elif claim_type == "pathway_mapping":
+ return await self._verify_pathway_mapping(task_result)
+ elif claim_type == "spectral_match":
+ return await self._verify_spectral_match(task_result)
+ else:
+ return VerificationResult.fail(self.domain, [f"Unknown claim_type: {claim_type}"])
+
+ # ------------------------------------------------------------------
+ # compound_identification
+ # ------------------------------------------------------------------
+
+ async def _verify_compound_identification(self, result: dict) -> VerificationResult:
+ start = time.monotonic()
+
+ hmdb_id = result.get("hmdb_id", "")
+ compound_name = result.get("compound_name", "")
+ claimed_mz = result.get("mz")
+ claimed_formula = result.get("formula", "")
+ inchikey = result.get("inchikey", "")
+
+ if not hmdb_id and not compound_name and not inchikey:
+ return VerificationResult.fail(
+ self.domain, ["hmdb_id, compound_name, or inchikey required"],
+ )
+
+ component_scores: dict[str, float] = {}
+ details: dict[str, Any] = {"claim_type": "compound_identification"}
+
+ # Component 1: identifier_valid (0.20)
+ id_result = await self._check_identifier_valid(hmdb_id, inchikey)
+ component_scores["identifier_valid"] = id_result["score"]
+ details["identifier_valid"] = id_result
+
+ # Component 2: name_match (0.20)
+ name_result = await self._check_name_match(hmdb_id, compound_name)
+ component_scores["name_match"] = name_result["score"]
+ details["name_match"] = name_result
+
+ # Component 3: mass_match (0.25)
+ mass_result = await self._check_mass_match(hmdb_id, claimed_mz, result.get("adduct"))
+ component_scores["mass_match"] = mass_result["score"]
+ details["mass_match"] = mass_result
+
+ # Component 4: formula_match (0.20)
+ formula_result = await self._check_formula_match(hmdb_id, claimed_formula)
+ component_scores["formula_match"] = formula_result["score"]
+ details["formula_match"] = formula_result
+
+ # Component 5: pubchem_cross_ref (0.15)
+ pubchem_result = await self._check_pubchem_cross_ref(compound_name, inchikey)
+ component_scores["pubchem_cross_ref"] = pubchem_result["score"]
+ details["pubchem_cross_ref"] = pubchem_result
+
+ weights = {
+ "identifier_valid": 0.20,
+ "name_match": 0.20,
+ "mass_match": 0.25,
+ "formula_match": 0.20,
+ "pubchem_cross_ref": 0.15,
+ }
+ score = sum(weights[k] * component_scores[k] for k in weights)
+ score = min(1.0, round(score, 4))
+
+ elapsed = time.monotonic() - start
+ details["component_scores"] = component_scores
+
+ return VerificationResult(
+ passed=score >= 0.5,
+ score=score,
+ badge=VerificationResult.score_to_badge(score),
+ domain=self.domain,
+ details=details,
+ compute_time_seconds=elapsed,
+ )
+
+ # ------------------------------------------------------------------
+ # pathway_mapping
+ # ------------------------------------------------------------------
+
+ async def _verify_pathway_mapping(self, result: dict) -> VerificationResult:
+ start = time.monotonic()
+
+ compound_id = result.get("kegg_compound_id", "")
+ pathway_id = result.get("kegg_pathway_id", "")
+ claimed_enzymes = result.get("enzymes", [])
+
+ if not compound_id:
+ return VerificationResult.fail(self.domain, ["kegg_compound_id required"])
+
+ component_scores: dict[str, float] = {}
+ details: dict[str, Any] = {"claim_type": "pathway_mapping"}
+
+ # Component 1: compound_exists (0.20)
+ cpd_result = await self._check_kegg_compound_exists(compound_id)
+ component_scores["compound_exists"] = cpd_result["score"]
+ details["compound_exists"] = cpd_result
+
+ # Component 2: pathway_exists (0.25)
+ pw_result = await self._check_kegg_pathway_exists(pathway_id)
+ component_scores["pathway_exists"] = pw_result["score"]
+ details["pathway_exists"] = pw_result
+
+ # Component 3: compound_in_pathway (0.30)
+ in_pw_result = await self._check_compound_in_pathway(compound_id, pathway_id)
+ component_scores["compound_in_pathway"] = in_pw_result["score"]
+ details["compound_in_pathway"] = in_pw_result
+
+ # Component 4: enzyme_links (0.25)
+ enz_result = await self._check_enzyme_links(compound_id, claimed_enzymes)
+ component_scores["enzyme_links"] = enz_result["score"]
+ details["enzyme_links"] = enz_result
+
+ weights = {
+ "compound_exists": 0.20,
+ "pathway_exists": 0.25,
+ "compound_in_pathway": 0.30,
+ "enzyme_links": 0.25,
+ }
+ score = sum(weights[k] * component_scores[k] for k in weights)
+ score = min(1.0, round(score, 4))
+
+ elapsed = time.monotonic() - start
+ details["component_scores"] = component_scores
+
+ return VerificationResult(
+ passed=score >= 0.5,
+ score=score,
+ badge=VerificationResult.score_to_badge(score),
+ domain=self.domain,
+ details=details,
+ compute_time_seconds=elapsed,
+ )
+
+ # ------------------------------------------------------------------
+ # spectral_match
+ # ------------------------------------------------------------------
+
+ async def _verify_spectral_match(self, result: dict) -> VerificationResult:
+ start = time.monotonic()
+
+ precursor_mz = result.get("precursor_mz")
+ adduct = result.get("adduct", "")
+ spectrum_peaks = result.get("spectrum_peaks", [])
+ claimed_compound = result.get("compound_name", "")
+ ppm_tolerance = result.get("ppm_tolerance", 10)
+
+ if not spectrum_peaks:
+ return VerificationResult.fail(self.domain, ["spectrum_peaks required"])
+
+ component_scores: dict[str, float] = {}
+ details: dict[str, Any] = {"claim_type": "spectral_match"}
+
+ # Component 1: precursor_valid (0.15)
+ prec_result = self._check_precursor_valid(precursor_mz)
+ component_scores["precursor_valid"] = prec_result["score"]
+ details["precursor_valid"] = prec_result
+
+ # Component 2: adduct_valid (0.10)
+ adduct_result = self._check_adduct_valid(adduct)
+ component_scores["adduct_valid"] = adduct_result["score"]
+ details["adduct_valid"] = adduct_result
+
+ # Component 3: fragment_match (0.35)
+ frag_result = self._check_fragment_match(spectrum_peaks)
+ component_scores["fragment_match"] = frag_result["score"]
+ details["fragment_match"] = frag_result
+
+ # Component 4: library_hit (0.25)
+ lib_result = await self._check_library_hit(precursor_mz, spectrum_peaks)
+ component_scores["library_hit"] = lib_result["score"]
+ details["library_hit"] = lib_result
+
+ # Component 5: mass_accuracy (0.15)
+ acc_result = self._check_mass_accuracy(spectrum_peaks, ppm_tolerance)
+ component_scores["mass_accuracy"] = acc_result["score"]
+ details["mass_accuracy"] = acc_result
+
+ weights = {
+ "precursor_valid": 0.15,
+ "adduct_valid": 0.10,
+ "fragment_match": 0.35,
+ "library_hit": 0.25,
+ "mass_accuracy": 0.15,
+ }
+ score = sum(weights[k] * component_scores[k] for k in weights)
+ score = min(1.0, round(score, 4))
+
+ elapsed = time.monotonic() - start
+ details["component_scores"] = component_scores
+
+ return VerificationResult(
+ passed=score >= 0.5,
+ score=score,
+ badge=VerificationResult.score_to_badge(score),
+ domain=self.domain,
+ details=details,
+ compute_time_seconds=elapsed,
+ )
+
+ # ------------------------------------------------------------------
+ # API helpers — compound identification
+ # ------------------------------------------------------------------
+
+ async def _check_identifier_valid(self, hmdb_id: str, inchikey: str) -> dict:
+ """Check if HMDB ID or InChIKey resolves."""
+ if hmdb_id:
+ try:
+ async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
+ resp = await client.get(
+ f"{HMDB_API}/{hmdb_id}.xml",
+ follow_redirects=True,
+ )
+ if resp.status_code == 200:
+ return {"score": 1.0, "found": True, "source": "hmdb"}
+ except Exception as e:
+ logger.warning("hmdb_id_check_failed", error=str(e))
+
+ if inchikey:
+ try:
+ async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
+ resp = await client.get(
+ f"{PUBCHEM_API}/compound/inchikey/{inchikey}/property/MolecularFormula/JSON",
+ )
+ if resp.status_code == 200:
+ return {"score": 1.0, "found": True, "source": "pubchem"}
+ except Exception as e:
+ logger.warning("inchikey_check_failed", error=str(e))
+
+ return {"score": 0.0, "found": False}
+
+ async def _check_name_match(self, hmdb_id: str, compound_name: str) -> dict:
+ """Check if compound name matches HMDB record."""
+ if not hmdb_id or not compound_name:
+ return {"score": 0.5, "note": "HMDB ID or name missing for comparison"}
+
+ try:
+ async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
+ resp = await client.get(
+ f"{HMDB_API}/{hmdb_id}.xml",
+ follow_redirects=True,
+ )
+ if resp.status_code != 200:
+ return {"score": 0.3, "note": "HMDB lookup failed"}
+
+ content = resp.text
+ # Simple XML name extraction
+ import re
+ name_match = re.search(r"(.*?)", content)
+ if name_match:
+ db_name = name_match.group(1).lower()
+ if compound_name.lower() == db_name:
+ return {"score": 1.0, "match": True, "db_name": name_match.group(1)}
+ if compound_name.lower() in db_name or db_name in compound_name.lower():
+ return {"score": 0.7, "partial_match": True, "db_name": name_match.group(1)}
+ return {"score": 0.0, "match": False, "db_name": name_match.group(1)}
+
+ return {"score": 0.3, "note": "Could not extract name from HMDB"}
+ except Exception as e:
+ logger.warning("name_match_check_failed", error=str(e))
+ return {"score": 0.3, "error": str(e)}
+
+ async def _check_mass_match(
+ self, hmdb_id: str, claimed_mz: float | None, adduct: str | None,
+ ) -> dict:
+ """Check if claimed m/z matches exact mass within instrument tolerance."""
+ if claimed_mz is None:
+ return {"score": 0.5, "note": "No m/z claimed"}
+
+ if not hmdb_id:
+ # Check plausibility only
+ if 50 <= claimed_mz <= 2000:
+ return {"score": 0.5, "note": "No HMDB ID for mass comparison", "mz_plausible": True}
+ return {"score": 0.2, "note": "m/z outside typical range"}
+
+ try:
+ async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
+ resp = await client.get(
+ f"{HMDB_API}/{hmdb_id}.xml",
+ follow_redirects=True,
+ )
+ if resp.status_code != 200:
+ return {"score": 0.3, "note": "HMDB lookup failed"}
+
+ content = resp.text
+ import re
+ mass_match = re.search(r"([\d.]+)", content)
+ if not mass_match:
+ mass_match = re.search(r"([\d.]+)", content)
+
+ if mass_match:
+ exact_mass = float(mass_match.group(1))
+ # Account for adduct mass shift
+ adduct_shift = 1.00728 # [M+H]+ by default
+ if adduct == "[M-H]-":
+ adduct_shift = -1.00728
+ elif adduct == "[M+Na]+":
+ adduct_shift = 22.9892
+ elif adduct == "[M+K]+":
+ adduct_shift = 38.9632
+
+ expected_mz = exact_mass + adduct_shift
+ ppm_error = abs(claimed_mz - expected_mz) / expected_mz * 1e6
+ match = ppm_error <= 10 # 10 ppm tolerance
+
+ return {
+ "score": 1.0 if match else 0.3,
+ "match": match,
+ "claimed_mz": claimed_mz,
+ "expected_mz": round(expected_mz, 4),
+ "ppm_error": round(ppm_error, 2),
+ }
+
+ return {"score": 0.3, "note": "Could not extract mass from HMDB"}
+ except Exception as e:
+ logger.warning("mass_match_check_failed", error=str(e))
+ return {"score": 0.3, "error": str(e)}
+
+ async def _check_formula_match(self, hmdb_id: str, claimed_formula: str) -> dict:
+ """Check if molecular formula matches HMDB."""
+ if not hmdb_id or not claimed_formula:
+ return {"score": 0.5, "note": "HMDB ID or formula missing"}
+
+ try:
+ async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
+ resp = await client.get(
+ f"{HMDB_API}/{hmdb_id}.xml",
+ follow_redirects=True,
+ )
+ if resp.status_code != 200:
+ return {"score": 0.3, "note": "HMDB lookup failed"}
+
+ content = resp.text
+ import re
+ formula_match = re.search(r"(.*?)", content)
+ if formula_match:
+ db_formula = formula_match.group(1).strip()
+ if claimed_formula.replace(" ", "") == db_formula.replace(" ", ""):
+ return {"score": 1.0, "match": True, "db_formula": db_formula}
+ return {"score": 0.0, "match": False, "claimed": claimed_formula, "db_formula": db_formula}
+
+ return {"score": 0.3, "note": "Could not extract formula from HMDB"}
+ except Exception as e:
+ logger.warning("formula_match_check_failed", error=str(e))
+ return {"score": 0.3, "error": str(e)}
+
+ async def _check_pubchem_cross_ref(self, compound_name: str, inchikey: str) -> dict:
+ """Cross-reference with PubChem."""
+ identifier = inchikey or compound_name
+ if not identifier:
+ return {"score": 0.5, "note": "No identifier for PubChem lookup"}
+
+ try:
+ async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
+ if inchikey:
+ resp = await client.get(
+ f"{PUBCHEM_API}/compound/inchikey/{inchikey}/property/MolecularFormula,MolecularWeight/JSON",
+ )
+ else:
+ resp = await client.get(
+ f"{PUBCHEM_API}/compound/name/{compound_name}/property/MolecularFormula,MolecularWeight/JSON",
+ )
+
+ if resp.status_code == 200:
+ data = resp.json()
+ props = data.get("PropertyTable", {}).get("Properties", [{}])[0]
+ return {
+ "score": 0.8,
+ "found": True,
+ "pubchem_formula": props.get("MolecularFormula"),
+ "pubchem_weight": props.get("MolecularWeight"),
+ }
+ return {"score": 0.3, "note": "Not found in PubChem"}
+ except Exception as e:
+ logger.warning("pubchem_cross_ref_failed", error=str(e))
+ return {"score": 0.3, "error": str(e)}
+
+ # ------------------------------------------------------------------
+ # API helpers — pathway mapping
+ # ------------------------------------------------------------------
+
+ async def _check_kegg_compound_exists(self, compound_id: str) -> dict:
+ """Check if KEGG compound ID is valid."""
+ if not compound_id:
+ return {"score": 0.5, "note": "No compound ID"}
+
+ try:
+ async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
+ resp = await client.get(f"{KEGG_REST}/get/{compound_id}")
+ if resp.status_code == 200:
+ return {"score": 1.0, "found": True}
+ return {"score": 0.0, "found": False}
+ except Exception as e:
+ logger.warning("kegg_compound_check_failed", error=str(e))
+ return {"score": 0.3, "error": str(e)}
+
+ async def _check_kegg_pathway_exists(self, pathway_id: str) -> dict:
+ """Check if KEGG pathway ID is valid."""
+ if not pathway_id:
+ return {"score": 0.5, "note": "No pathway ID"}
+
+ try:
+ async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
+ resp = await client.get(f"{KEGG_REST}/get/{pathway_id}")
+ if resp.status_code == 200:
+ return {"score": 1.0, "found": True}
+ return {"score": 0.0, "found": False}
+ except Exception as e:
+ logger.warning("kegg_pathway_check_failed", error=str(e))
+ return {"score": 0.3, "error": str(e)}
+
+ async def _check_compound_in_pathway(self, compound_id: str, pathway_id: str) -> dict:
+ """Check if compound is a participant in the claimed pathway."""
+ if not compound_id or not pathway_id:
+ return {"score": 0.5, "note": "Both compound and pathway IDs needed"}
+
+ try:
+ async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
+ resp = await client.get(f"{KEGG_REST}/link/compound/{pathway_id}")
+ if resp.status_code != 200:
+ return {"score": 0.3, "note": f"KEGG link query failed (HTTP {resp.status_code})"}
+
+ compounds_in_pathway = resp.text.strip().split("\n")
+ compound_ids = set()
+ for line in compounds_in_pathway:
+ parts = line.strip().split("\t")
+ if len(parts) >= 2:
+ compound_ids.add(parts[1].strip())
+
+ # Normalize compound_id format
+ normalized = compound_id if ":" in compound_id else f"cpd:{compound_id}"
+ if normalized in compound_ids or compound_id in compound_ids:
+ return {"score": 1.0, "found": True, "n_compounds": len(compound_ids)}
+ return {
+ "score": 0.0,
+ "found": False,
+ "n_compounds_in_pathway": len(compound_ids),
+ }
+ except Exception as e:
+ logger.warning("compound_in_pathway_check_failed", error=str(e))
+ return {"score": 0.3, "error": str(e)}
+
+ async def _check_enzyme_links(self, compound_id: str, claimed_enzymes: list) -> dict:
+ """Check if claimed enzymes link to compound in KEGG."""
+ if not claimed_enzymes:
+ return {"score": 0.5, "note": "No enzymes claimed"}
+
+ try:
+ async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
+ resp = await client.get(f"{KEGG_REST}/link/enzyme/{compound_id}")
+ if resp.status_code != 200:
+ return {"score": 0.3, "note": "KEGG enzyme link failed"}
+
+ db_enzymes: set[str] = set()
+ for line in resp.text.strip().split("\n"):
+ parts = line.strip().split("\t")
+ if len(parts) >= 2:
+ db_enzymes.add(parts[1].strip().replace("ec:", ""))
+
+ if not db_enzymes:
+ return {"score": 0.5, "note": "No enzyme links in KEGG"}
+
+ matched = sum(1 for e in claimed_enzymes if e in db_enzymes)
+ score = matched / len(claimed_enzymes) if claimed_enzymes else 0.0
+ return {
+ "score": round(score, 4),
+ "matched": matched,
+ "total_claimed": len(claimed_enzymes),
+ "db_enzymes": sorted(list(db_enzymes))[:10],
+ }
+ except Exception as e:
+ logger.warning("enzyme_links_check_failed", error=str(e))
+ return {"score": 0.3, "error": str(e)}
+
+ # ------------------------------------------------------------------
+ # Helpers — spectral match
+ # ------------------------------------------------------------------
+
+ @staticmethod
+ def _check_precursor_valid(precursor_mz: float | None) -> dict:
+ """Check if precursor m/z is in expected range."""
+ if precursor_mz is None:
+ return {"score": 0.5, "note": "No precursor m/z"}
+
+ if not isinstance(precursor_mz, (int, float)):
+ return {"score": 0.0, "error": "precursor_mz must be numeric"}
+
+ # Typical metabolomics range: 50-2000 m/z
+ if 50 <= precursor_mz <= 2000:
+ return {"score": 1.0, "valid": True, "precursor_mz": precursor_mz}
+ elif 20 <= precursor_mz <= 5000:
+ return {"score": 0.5, "note": "Unusual precursor range", "precursor_mz": precursor_mz}
+ return {"score": 0.0, "error": "precursor_mz outside plausible range"}
+
+ @staticmethod
+ def _check_adduct_valid(adduct: str) -> dict:
+ """Check if adduct type is standard."""
+ if not adduct:
+ return {"score": 0.5, "note": "No adduct specified"}
+
+ if adduct in STANDARD_ADDUCTS:
+ return {"score": 1.0, "valid": True, "adduct": adduct}
+ return {"score": 0.3, "valid": False, "adduct": adduct, "known_adducts": sorted(STANDARD_ADDUCTS)}
+
+ @staticmethod
+ def _check_fragment_match(spectrum_peaks: list) -> dict:
+ """Validate spectrum peak list structure and compute self-consistency."""
+ if not spectrum_peaks:
+ return {"score": 0.0, "error": "No peaks"}
+
+ valid_peaks = 0
+ invalid_peaks = 0
+
+ for peak in spectrum_peaks:
+ if isinstance(peak, (list, tuple)) and len(peak) >= 2:
+ mz, intensity = peak[0], peak[1]
+ if isinstance(mz, (int, float)) and isinstance(intensity, (int, float)):
+ if mz > 0 and intensity >= 0:
+ valid_peaks += 1
+ continue
+ invalid_peaks += 1
+
+ total = valid_peaks + invalid_peaks
+ score = valid_peaks / total if total > 0 else 0.0
+
+ # Check that peaks are sorted by m/z (expected)
+ mz_values = [
+ p[0] for p in spectrum_peaks
+ if isinstance(p, (list, tuple)) and len(p) >= 2
+ ]
+ is_sorted = all(mz_values[i] <= mz_values[i + 1] for i in range(len(mz_values) - 1))
+
+ return {
+ "score": round(score, 4),
+ "valid_peaks": valid_peaks,
+ "invalid_peaks": invalid_peaks,
+ "is_sorted": is_sorted,
+ "n_peaks": total,
+ }
+
+ async def _check_library_hit(
+ self, precursor_mz: float | None, spectrum_peaks: list,
+ ) -> dict:
+ """Search MassBank for spectral matches."""
+ if precursor_mz is None:
+ return {"score": 0.5, "note": "No precursor for library search"}
+
+ try:
+ async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
+ # Search MassBank by precursor m/z
+ resp = await client.get(
+ f"{MASSBANK_API}/searchspectrum",
+ params={
+ "mz": str(precursor_mz),
+ "tol": "0.5",
+ "unit": "Da",
+ "limit": "5",
+ },
+ )
+ if resp.status_code != 200:
+ return {"score": 0.5, "note": f"MassBank search failed (HTTP {resp.status_code})"}
+
+ data = resp.json()
+ if not data:
+ return {"score": 0.3, "note": "No MassBank hits"}
+
+ n_hits = len(data) if isinstance(data, list) else 0
+ return {
+ "score": min(1.0, 0.5 + n_hits * 0.1),
+ "n_hits": n_hits,
+ "source": "massbank",
+ }
+ except Exception as e:
+ logger.warning("library_hit_check_failed", error=str(e))
+ return {"score": 0.5, "error": str(e)}
+
+ @staticmethod
+ def _check_mass_accuracy(spectrum_peaks: list, ppm_tolerance: float) -> dict:
+ """Check if mass accuracy is within declared tolerance."""
+ if not spectrum_peaks or not isinstance(ppm_tolerance, (int, float)):
+ return {"score": 0.5, "note": "Insufficient data for accuracy check"}
+
+ # Check that ppm tolerance is reasonable (0.1 to 100 ppm)
+ if ppm_tolerance < 0.1:
+ return {"score": 0.3, "note": "PPM tolerance unrealistically low"}
+ if ppm_tolerance > 100:
+ return {"score": 0.3, "note": "PPM tolerance unrealistically high"}
+
+ # For a well-calibrated instrument, 1-20 ppm is typical
+ if 1 <= ppm_tolerance <= 20:
+ return {"score": 1.0, "plausible": True, "ppm_tolerance": ppm_tolerance}
+ elif 0.1 <= ppm_tolerance <= 50:
+ return {"score": 0.7, "plausible": True, "ppm_tolerance": ppm_tolerance}
+ return {"score": 0.3, "plausible": False, "ppm_tolerance": ppm_tolerance}
diff --git a/backend/verification/systems_biology_adapter.py b/backend/verification/systems_biology_adapter.py
new file mode 100644
index 0000000..6ad44e6
--- /dev/null
+++ b/backend/verification/systems_biology_adapter.py
@@ -0,0 +1,667 @@
+"""Systems biology verification: Reactome + STRING + KEGG + scipy/numpy.
+
+Validates pathway enrichment, network topology, and flux balance claims.
+API-based (no Docker) with optional networkx for topology metrics.
+"""
+from __future__ import annotations
+
+import asyncio
+import time
+from typing import Any
+
+import httpx
+
+from backend.logging_config import get_logger
+from backend.verification.base import (
+ VerificationAdapter,
+ VerificationResult,
+)
+
+logger = get_logger(__name__)
+
+REACTOME_API = "https://reactome.org/ContentService"
+STRING_API = "https://string-db.org/api"
+KEGG_REST = "https://rest.kegg.jp"
+ENSEMBL_REST = "https://rest.ensembl.org"
+HTTP_TIMEOUT = 30
+
+# Graceful degradation for networkx
+try:
+ import networkx as nx
+ NETWORKX_AVAILABLE = True
+except ImportError:
+ NETWORKX_AVAILABLE = False
+ logger.warning("networkx_not_available", note="Network topology metrics will use neutral scores")
+
+
+class SystemsBiologyAdapter(VerificationAdapter):
+ domain = "systems_biology"
+
+ async def verify(self, task_result: dict, task_metadata: dict) -> VerificationResult:
+ claim_type = task_result.get("claim_type", "pathway_enrichment")
+
+ if claim_type == "pathway_enrichment":
+ return await self._verify_pathway_enrichment(task_result)
+ elif claim_type == "network_topology":
+ return await self._verify_network_topology(task_result)
+ elif claim_type == "flux_balance":
+ return await self._verify_flux_balance(task_result)
+ else:
+ return VerificationResult.fail(self.domain, [f"Unknown claim_type: {claim_type}"])
+
+ # ------------------------------------------------------------------
+ # pathway_enrichment
+ # ------------------------------------------------------------------
+
+ async def _verify_pathway_enrichment(self, result: dict) -> VerificationResult:
+ start = time.monotonic()
+
+ gene_set = result.get("gene_set", [])
+ pathway_id = result.get("pathway_id", "")
+ claimed_pvalue = result.get("p_value")
+ claimed_fdr = result.get("fdr")
+ background_size = result.get("background_size", 20000)
+ pathway_size = result.get("pathway_size")
+
+ if not gene_set:
+ return VerificationResult.fail(self.domain, ["gene_set required"])
+
+ component_scores: dict[str, float] = {}
+ details: dict[str, Any] = {"claim_type": "pathway_enrichment"}
+
+ # Component 1: gene_set_valid (0.20)
+ genes_result = await self._check_gene_set_valid(gene_set)
+ component_scores["gene_set_valid"] = genes_result["score"]
+ details["gene_set_valid"] = genes_result
+
+ # Component 2: pathway_exists (0.20)
+ pw_result = await self._check_pathway_exists(pathway_id)
+ component_scores["pathway_exists"] = pw_result["score"]
+ details["pathway_exists"] = pw_result
+
+ # Component 3: enrichment_recomputed (0.30)
+ enrich_result = await asyncio.to_thread(
+ self._recompute_enrichment,
+ len(gene_set), pathway_size, background_size, claimed_pvalue,
+ )
+ component_scores["enrichment_recomputed"] = enrich_result["score"]
+ details["enrichment_recomputed"] = enrich_result
+
+ # Component 4: fdr_correction (0.30)
+ fdr_result = await asyncio.to_thread(
+ self._check_fdr_correction, claimed_pvalue, claimed_fdr, result.get("n_tests"),
+ )
+ component_scores["fdr_correction"] = fdr_result["score"]
+ details["fdr_correction"] = fdr_result
+
+ weights = {
+ "gene_set_valid": 0.20,
+ "pathway_exists": 0.20,
+ "enrichment_recomputed": 0.30,
+ "fdr_correction": 0.30,
+ }
+ score = sum(weights[k] * component_scores[k] for k in weights)
+ score = min(1.0, round(score, 4))
+
+ elapsed = time.monotonic() - start
+ details["component_scores"] = component_scores
+
+ return VerificationResult(
+ passed=score >= 0.5,
+ score=score,
+ badge=VerificationResult.score_to_badge(score),
+ domain=self.domain,
+ details=details,
+ compute_time_seconds=elapsed,
+ )
+
+ # ------------------------------------------------------------------
+ # network_topology
+ # ------------------------------------------------------------------
+
+ async def _verify_network_topology(self, result: dict) -> VerificationResult:
+ start = time.monotonic()
+
+ protein_ids = result.get("protein_ids", [])
+ claimed_edges = result.get("edges", [])
+ claimed_hubs = result.get("hub_proteins", [])
+ claimed_metrics = result.get("topology_metrics", {})
+
+ if not protein_ids:
+ return VerificationResult.fail(self.domain, ["protein_ids required"])
+
+ component_scores: dict[str, float] = {}
+ details: dict[str, Any] = {"claim_type": "network_topology"}
+
+ # Component 1: proteins_exist (0.20)
+ proteins_result = await self._check_proteins_exist(protein_ids)
+ component_scores["proteins_exist"] = proteins_result["score"]
+ details["proteins_exist"] = proteins_result
+
+ # Component 2: interactions_verified (0.25)
+ interactions_result = await self._check_interactions(protein_ids, claimed_edges)
+ component_scores["interactions_verified"] = interactions_result["score"]
+ details["interactions_verified"] = interactions_result
+
+ # Component 3: metrics_recomputed (0.30)
+ metrics_result = await asyncio.to_thread(
+ self._recompute_topology_metrics, protein_ids, claimed_edges, claimed_metrics,
+ )
+ component_scores["metrics_recomputed"] = metrics_result["score"]
+ details["metrics_recomputed"] = metrics_result
+
+ # Component 4: hub_identification (0.25)
+ hub_result = await asyncio.to_thread(
+ self._check_hub_identification, protein_ids, claimed_edges, claimed_hubs,
+ )
+ component_scores["hub_identification"] = hub_result["score"]
+ details["hub_identification"] = hub_result
+
+ weights = {
+ "proteins_exist": 0.20,
+ "interactions_verified": 0.25,
+ "metrics_recomputed": 0.30,
+ "hub_identification": 0.25,
+ }
+ score = sum(weights[k] * component_scores[k] for k in weights)
+ score = min(1.0, round(score, 4))
+
+ elapsed = time.monotonic() - start
+ details["component_scores"] = component_scores
+
+ return VerificationResult(
+ passed=score >= 0.5,
+ score=score,
+ badge=VerificationResult.score_to_badge(score),
+ domain=self.domain,
+ details=details,
+ compute_time_seconds=elapsed,
+ )
+
+ # ------------------------------------------------------------------
+ # flux_balance
+ # ------------------------------------------------------------------
+
+ async def _verify_flux_balance(self, result: dict) -> VerificationResult:
+ start = time.monotonic()
+
+ stoichiometry_matrix = result.get("stoichiometry_matrix", [])
+ claimed_fluxes = result.get("fluxes", [])
+ flux_bounds = result.get("flux_bounds", [])
+ objective = result.get("objective", [])
+
+ if not stoichiometry_matrix:
+ return VerificationResult.fail(self.domain, ["stoichiometry_matrix required"])
+
+ component_scores: dict[str, float] = {}
+ details: dict[str, Any] = {"claim_type": "flux_balance"}
+
+ # Component 1: model_valid (0.20)
+ model_result = self._check_model_valid(stoichiometry_matrix, flux_bounds)
+ component_scores["model_valid"] = model_result["score"]
+ details["model_valid"] = model_result
+
+ # Component 2: stoichiometry_consistent (0.25)
+ stoich_result = await asyncio.to_thread(
+ self._check_stoichiometry_consistent, stoichiometry_matrix,
+ )
+ component_scores["stoichiometry_consistent"] = stoich_result["score"]
+ details["stoichiometry_consistent"] = stoich_result
+
+ # Component 3: objective_feasible (0.25)
+ feas_result = await asyncio.to_thread(
+ self._check_objective_feasible,
+ stoichiometry_matrix, flux_bounds, objective,
+ )
+ component_scores["objective_feasible"] = feas_result["score"]
+ details["objective_feasible"] = feas_result
+
+ # Component 4: flux_bounds_respected (0.30)
+ bounds_result = self._check_flux_bounds_respected(claimed_fluxes, flux_bounds)
+ component_scores["flux_bounds_respected"] = bounds_result["score"]
+ details["flux_bounds_respected"] = bounds_result
+
+ weights = {
+ "model_valid": 0.20,
+ "stoichiometry_consistent": 0.25,
+ "objective_feasible": 0.25,
+ "flux_bounds_respected": 0.30,
+ }
+ score = sum(weights[k] * component_scores[k] for k in weights)
+ score = min(1.0, round(score, 4))
+
+ elapsed = time.monotonic() - start
+ details["component_scores"] = component_scores
+
+ return VerificationResult(
+ passed=score >= 0.5,
+ score=score,
+ badge=VerificationResult.score_to_badge(score),
+ domain=self.domain,
+ details=details,
+ compute_time_seconds=elapsed,
+ )
+
+ # ------------------------------------------------------------------
+ # API helpers — pathway enrichment
+ # ------------------------------------------------------------------
+
+ async def _check_gene_set_valid(self, gene_set: list[str]) -> dict:
+ """Check that gene symbols resolve in Ensembl."""
+ try:
+ valid = 0
+ invalid: list[str] = []
+ async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
+ # Check a sample (up to 10 genes) to avoid API rate limiting
+ sample = gene_set[:10]
+ for gene in sample:
+ resp = await client.get(
+ f"{ENSEMBL_REST}/lookup/symbol/homo_sapiens/{gene}",
+ headers={"Content-Type": "application/json"},
+ )
+ if resp.status_code == 200:
+ valid += 1
+ else:
+ invalid.append(gene)
+
+ score = valid / len(sample) if sample else 0.0
+ return {
+ "score": round(score, 4),
+ "valid": valid,
+ "checked": len(sample),
+ "total": len(gene_set),
+ "invalid": invalid[:5],
+ }
+ except Exception as e:
+ logger.warning("gene_set_check_failed", error=str(e))
+ return {"score": 0.3, "error": str(e)}
+
+ async def _check_pathway_exists(self, pathway_id: str) -> dict:
+ """Check if pathway ID is valid in Reactome or KEGG."""
+ if not pathway_id:
+ return {"score": 0.5, "note": "No pathway_id provided"}
+
+ try:
+ async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
+ # Try Reactome
+ if pathway_id.startswith("R-"):
+ resp = await client.get(f"{REACTOME_API}/data/pathway/{pathway_id}")
+ if resp.status_code == 200:
+ data = resp.json()
+ return {
+ "score": 1.0,
+ "found": True,
+ "source": "reactome",
+ "name": data.get("displayName", ""),
+ }
+
+ # Try KEGG
+ resp = await client.get(f"{KEGG_REST}/get/{pathway_id}")
+ if resp.status_code == 200:
+ return {"score": 1.0, "found": True, "source": "kegg"}
+
+ return {"score": 0.0, "found": False}
+ except Exception as e:
+ logger.warning("pathway_exists_check_failed", error=str(e))
+ return {"score": 0.3, "error": str(e)}
+
+ @staticmethod
+ def _recompute_enrichment(
+ overlap: int,
+ pathway_size: int | None,
+ background_size: int,
+ claimed_pvalue: float | None,
+ ) -> dict:
+ """Re-run hypergeometric test for enrichment."""
+ try:
+ from scipy import stats as sp_stats
+ except ImportError:
+ return {"score": 0.5, "note": "scipy unavailable"}
+
+ if pathway_size is None:
+ return {"score": 0.5, "note": "pathway_size needed for re-computation"}
+
+ if pathway_size <= 0 or background_size <= 0:
+ return {"score": 0.0, "error": "pathway_size and background_size must be positive"}
+
+ try:
+ # Hypergeometric test: P(X >= overlap)
+ computed_p = sp_stats.hypergeom.sf(
+ overlap - 1, # sf gives P(X > k), so k-1 for P(X >= k)
+ background_size,
+ pathway_size,
+ overlap + 10, # sample size (approximate)
+ )
+
+ if claimed_pvalue is None:
+ return {"score": 0.5, "note": "No p-value claimed", "computed_p": computed_p}
+
+ # Order-of-magnitude comparison for enrichment p-values
+ import math
+ if computed_p > 0 and claimed_pvalue > 0:
+ log_diff = abs(math.log10(computed_p) - math.log10(claimed_pvalue))
+ match = log_diff <= 2 # within 2 orders of magnitude
+ else:
+ match = False
+
+ return {
+ "score": 1.0 if match else 0.3,
+ "match": match,
+ "claimed_p": claimed_pvalue,
+ "computed_p": computed_p,
+ }
+ except Exception as e:
+ return {"score": 0.3, "error": str(e)}
+
+ @staticmethod
+ def _check_fdr_correction(
+ raw_pvalue: float | None, claimed_fdr: float | None, n_tests: int | None,
+ ) -> dict:
+ """Check FDR (Benjamini-Hochberg) correction plausibility."""
+ if claimed_fdr is None:
+ return {"score": 0.5, "note": "No FDR value claimed"}
+
+ if raw_pvalue is None:
+ return {"score": 0.5, "note": "No raw p-value to compare"}
+
+ if not isinstance(claimed_fdr, (int, float)) or claimed_fdr < 0 or claimed_fdr > 1:
+ return {"score": 0.0, "error": f"FDR {claimed_fdr} out of range [0, 1]"}
+
+ # FDR should be >= raw p-value (BH correction inflates p-values)
+ if claimed_fdr < raw_pvalue:
+ return {"score": 0.3, "error": "FDR should be >= raw p-value"}
+
+ # If n_tests provided, approximate BH correction
+ if n_tests and n_tests > 0:
+ # Rough upper bound: fdr <= p * n_tests
+ upper_bound = min(raw_pvalue * n_tests, 1.0)
+ if claimed_fdr <= upper_bound:
+ return {"score": 1.0, "plausible": True, "upper_bound": upper_bound}
+ return {"score": 0.5, "note": "FDR above expected upper bound"}
+
+ return {"score": 0.7, "note": "FDR >= raw p-value (correct direction)"}
+
+ # ------------------------------------------------------------------
+ # API helpers — network topology
+ # ------------------------------------------------------------------
+
+ async def _check_proteins_exist(self, protein_ids: list[str]) -> dict:
+ """Check protein IDs in STRING DB."""
+ try:
+ async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
+ resp = await client.post(
+ f"{STRING_API}/json/resolve",
+ data={
+ "identifiers": "\r".join(protein_ids[:20]),
+ "species": 9606,
+ },
+ )
+ if resp.status_code != 200:
+ return {"score": 0.3, "note": f"STRING resolve failed (HTTP {resp.status_code})"}
+
+ data = resp.json()
+ resolved = len(data)
+ score = resolved / min(len(protein_ids), 20) if protein_ids else 0.0
+ return {
+ "score": round(score, 4),
+ "resolved": resolved,
+ "total": len(protein_ids),
+ }
+ except Exception as e:
+ logger.warning("proteins_exist_check_failed", error=str(e))
+ return {"score": 0.3, "error": str(e)}
+
+ async def _check_interactions(self, protein_ids: list[str], claimed_edges: list) -> dict:
+ """Verify claimed edges exist in STRING above confidence threshold."""
+ if not claimed_edges:
+ return {"score": 0.5, "note": "No edges claimed"}
+
+ try:
+ async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
+ resp = await client.post(
+ f"{STRING_API}/json/network",
+ data={
+ "identifiers": "\r".join(protein_ids[:50]),
+ "species": 9606,
+ "required_score": 400,
+ },
+ )
+ if resp.status_code != 200:
+ return {"score": 0.3, "note": f"STRING network failed (HTTP {resp.status_code})"}
+
+ data = resp.json()
+ db_edges: set[tuple[str, str]] = set()
+ for interaction in data:
+ a = interaction.get("preferredName_A", "")
+ b = interaction.get("preferredName_B", "")
+ if a and b:
+ db_edges.add((a.upper(), b.upper()))
+ db_edges.add((b.upper(), a.upper()))
+
+ verified = 0
+ for edge in claimed_edges:
+ if isinstance(edge, (list, tuple)) and len(edge) >= 2:
+ if (str(edge[0]).upper(), str(edge[1]).upper()) in db_edges:
+ verified += 1
+
+ score = verified / len(claimed_edges) if claimed_edges else 0.0
+ return {
+ "score": round(score, 4),
+ "verified": verified,
+ "total_claimed": len(claimed_edges),
+ "db_edges": len(db_edges),
+ }
+ except Exception as e:
+ logger.warning("interactions_check_failed", error=str(e))
+ return {"score": 0.3, "error": str(e)}
+
+ @staticmethod
+ def _recompute_topology_metrics(
+ protein_ids: list[str], edges: list, claimed_metrics: dict,
+ ) -> dict:
+ """Re-compute degree centrality and betweenness with networkx."""
+ if not NETWORKX_AVAILABLE:
+ return {"score": 0.5, "note": "networkx unavailable — metrics not recomputed"}
+
+ if not edges:
+ return {"score": 0.5, "note": "No edges to build network"}
+
+ try:
+ G = nx.Graph()
+ G.add_nodes_from(protein_ids)
+ for edge in edges:
+ if isinstance(edge, (list, tuple)) and len(edge) >= 2:
+ G.add_edge(str(edge[0]), str(edge[1]))
+
+ computed = {
+ "n_nodes": G.number_of_nodes(),
+ "n_edges": G.number_of_edges(),
+ "density": round(nx.density(G), 4),
+ "avg_degree": round(sum(dict(G.degree()).values()) / max(G.number_of_nodes(), 1), 4),
+ }
+
+ if not claimed_metrics:
+ return {"score": 0.5, "computed": computed}
+
+ matches = 0
+ total = 0
+ for key in ["n_nodes", "n_edges", "density", "avg_degree"]:
+ if key in claimed_metrics:
+ total += 1
+ claimed_val = claimed_metrics[key]
+ computed_val = computed[key]
+ tolerance = max(abs(computed_val) * 0.1, 0.01)
+ if abs(claimed_val - computed_val) <= tolerance:
+ matches += 1
+
+ score = matches / total if total > 0 else 0.5
+ return {
+ "score": round(score, 4),
+ "matches": matches,
+ "total_compared": total,
+ "computed": computed,
+ }
+ except Exception as e:
+ return {"score": 0.3, "error": str(e)}
+
+ @staticmethod
+ def _check_hub_identification(
+ protein_ids: list[str], edges: list, claimed_hubs: list[str],
+ ) -> dict:
+ """Check if claimed hub proteins match top centrality nodes."""
+ if not NETWORKX_AVAILABLE:
+ return {"score": 0.5, "note": "networkx unavailable"}
+
+ if not claimed_hubs:
+ return {"score": 0.5, "note": "No hub proteins claimed"}
+
+ if not edges:
+ return {"score": 0.3, "note": "No edges to compute centrality"}
+
+ try:
+ G = nx.Graph()
+ G.add_nodes_from(protein_ids)
+ for edge in edges:
+ if isinstance(edge, (list, tuple)) and len(edge) >= 2:
+ G.add_edge(str(edge[0]), str(edge[1]))
+
+ degree_centrality = nx.degree_centrality(G)
+ # Top N hubs (N = number of claimed hubs)
+ sorted_by_centrality = sorted(
+ degree_centrality.items(), key=lambda x: x[1], reverse=True,
+ )
+ top_n = {node for node, _ in sorted_by_centrality[:max(len(claimed_hubs) * 2, 5)]}
+
+ matches = sum(1 for h in claimed_hubs if h in top_n)
+ score = matches / len(claimed_hubs) if claimed_hubs else 0.0
+
+ return {
+ "score": round(score, 4),
+ "matches": matches,
+ "claimed_hubs": claimed_hubs,
+ "top_centrality": [node for node, _ in sorted_by_centrality[:5]],
+ }
+ except Exception as e:
+ return {"score": 0.3, "error": str(e)}
+
+ # ------------------------------------------------------------------
+ # Helpers — flux balance
+ # ------------------------------------------------------------------
+
+ @staticmethod
+ def _check_model_valid(matrix: list, bounds: list) -> dict:
+ """Check if stoichiometry matrix and bounds are well-formed."""
+ if not matrix:
+ return {"score": 0.0, "error": "Empty stoichiometry matrix"}
+
+ n_metabolites = len(matrix)
+ n_reactions = len(matrix[0]) if matrix else 0
+
+ if n_reactions == 0:
+ return {"score": 0.0, "error": "No reactions in matrix"}
+
+ # Check all rows have same length
+ if any(len(row) != n_reactions for row in matrix):
+ return {"score": 0.0, "error": "Inconsistent row lengths in matrix"}
+
+ if bounds and len(bounds) != n_reactions:
+ return {"score": 0.5, "note": "bounds length doesn't match n_reactions"}
+
+ return {
+ "score": 1.0,
+ "valid": True,
+ "n_metabolites": n_metabolites,
+ "n_reactions": n_reactions,
+ }
+
+ @staticmethod
+ def _check_stoichiometry_consistent(matrix: list) -> dict:
+ """Check S matrix rank for consistency."""
+ try:
+ import numpy as np
+ S = np.array(matrix, dtype=float)
+ rank = int(np.linalg.matrix_rank(S))
+ n_metabolites, n_reactions = S.shape
+
+ # System has solutions if rank < n_reactions
+ has_null_space = rank < n_reactions
+ return {
+ "score": 1.0 if has_null_space else 0.3,
+ "consistent": has_null_space,
+ "rank": rank,
+ "n_metabolites": n_metabolites,
+ "n_reactions": n_reactions,
+ }
+ except ImportError:
+ return {"score": 0.5, "note": "numpy unavailable"}
+ except Exception as e:
+ return {"score": 0.3, "error": str(e)}
+
+ @staticmethod
+ def _check_objective_feasible(matrix: list, bounds: list, objective: list) -> dict:
+ """Check LP feasibility with scipy.optimize.linprog."""
+ try:
+ import numpy as np
+ from scipy.optimize import linprog
+ except ImportError:
+ return {"score": 0.5, "note": "scipy unavailable"}
+
+ try:
+ S = np.array(matrix, dtype=float)
+ n_metabolites, n_reactions = S.shape
+ b_eq = np.zeros(n_metabolites)
+
+ # Parse bounds
+ if bounds:
+ lb = [b[0] if isinstance(b, (list, tuple)) else -1000 for b in bounds]
+ ub = [b[1] if isinstance(b, (list, tuple)) else 1000 for b in bounds]
+ var_bounds = list(zip(lb, ub))
+ else:
+ var_bounds = [(-1000, 1000)] * n_reactions
+
+ # Objective
+ c = np.zeros(n_reactions)
+ if objective and len(objective) == n_reactions:
+ c = -np.array(objective, dtype=float) # negate for maximization
+ elif n_reactions > 0:
+ c[-1] = -1 # Default: maximize last reaction
+
+ result = linprog(c, A_eq=S, b_eq=b_eq, bounds=var_bounds, method="highs")
+
+ if result.success:
+ return {
+ "score": 1.0,
+ "feasible": True,
+ "optimal_value": round(-result.fun, 6),
+ }
+ return {"score": 0.3, "feasible": False, "status": result.message}
+ except Exception as e:
+ return {"score": 0.3, "error": str(e)}
+
+ @staticmethod
+ def _check_flux_bounds_respected(claimed_fluxes: list, bounds: list) -> dict:
+ """Check if claimed fluxes are within declared bounds."""
+ if not claimed_fluxes:
+ return {"score": 0.5, "note": "No fluxes claimed"}
+
+ if not bounds:
+ return {"score": 0.5, "note": "No bounds declared"}
+
+ if len(claimed_fluxes) != len(bounds):
+ return {"score": 0.3, "error": "Flux count doesn't match bounds count"}
+
+ violations = 0
+ for i, (flux, bound) in enumerate(zip(claimed_fluxes, bounds)):
+ if not isinstance(flux, (int, float)):
+ continue
+ if isinstance(bound, (list, tuple)) and len(bound) >= 2:
+ lb, ub = bound[0], bound[1]
+ if flux < lb - 1e-6 or flux > ub + 1e-6:
+ violations += 1
+
+ score = max(0.0, 1.0 - violations / len(claimed_fluxes)) if claimed_fluxes else 0.0
+ return {
+ "score": round(score, 4),
+ "violations": violations,
+ "total_fluxes": len(claimed_fluxes),
+ }