diff --git a/2.0/problems/vector_db_ann/config.yaml b/2.0/problems/vector_db_ann/config.yaml index e6d48943..56f99c37 100644 --- a/2.0/problems/vector_db_ann/config.yaml +++ b/2.0/problems/vector_db_ann/config.yaml @@ -19,8 +19,11 @@ runtime: docker: image: ubuntu:24.04 environment: - cpus: 4 - memory_mb: 8192 + # If these resource limits change, also update the resource budget text in + # readme and harbor/app/README.md so agents can design parallel algorithms + # for the actual CPU and memory budget. + cpus: 8 + memory_mb: 16384 storage_mb: 8192 build_timeout_seconds: 3600 submission: diff --git a/2.0/problems/vector_db_ann/harbor/app/README.md b/2.0/problems/vector_db_ann/harbor/app/README.md index c87b189f..06c74801 100644 --- a/2.0/problems/vector_db_ann/harbor/app/README.md +++ b/2.0/problems/vector_db_ann/harbor/app/README.md @@ -20,6 +20,14 @@ cargo 1.75 Pin crate versions if newer transitive dependencies require a newer Rust compiler. +The Harbor task provides the following resource budget: + +```text +vCPUs: 8 +memory: 16 GiB +query concurrency: 4 +``` + ## Attribution This starter skeleton is adapted from KCORES/vector-db-bench, licensed under diff --git a/2.0/problems/vector_db_ann/readme b/2.0/problems/vector_db_ann/readme index 59cf88f1..4fb77cc0 100644 --- a/2.0/problems/vector_db_ann/readme +++ b/2.0/problems/vector_db_ann/readme @@ -36,6 +36,15 @@ cargo 1.75 If you add crates, choose versions compatible with this toolchain or pin transitive dependencies accordingly. +The service and judge run with the task resource limits below. Design your +parallel search and indexing strategy for this budget: + +```text +vCPUs: 8 +memory: 16 GiB +query concurrency: 4 +``` + The service must listen on `PORT` and implement these endpoints: ```text @@ -92,15 +101,21 @@ A submission is valid if: ## Scoring -The evaluator runs an official exact-search reference HTTP service once per -judge process on the same hidden benchmark and through the same `/bulk_insert` -and `/search` client harnesses to measure: +At trial startup, the Harbor judge sidecar prepares the hidden benchmark and +runs an exact-search reference HTTP service through the same `/bulk_insert` +and `/search` client harnesses to produce ground-truth nearest neighbors and +the trial-local scoring baseline: ```text baseline_qps +baseline_effective_qps baseline_load_seconds ``` +Interactive submissions and the final verifier both score through this same +judge sidecar, so the baseline and runtime environment are shared within a +trial while still letting different machines measure their own local baseline. + Each submission is then timed independently. The load phase includes all `/bulk_insert` calls and any index construction performed by the service before queries begin. The query phase measures only `/search` throughput: diff --git a/adapters/frontier-cs-2.0/README.md b/adapters/frontier-cs-2.0/README.md index 342e3b41..0aa08d3b 100644 --- a/adapters/frontier-cs-2.0/README.md +++ b/adapters/frontier-cs-2.0/README.md @@ -48,9 +48,11 @@ uv run harbor trial start -p datasets/frontier-cs-2.0/frontier-cs-2-0-erdos-demo ## Task Contract -The agent works in `/app` and must create `/app/solution.py`. The final -verifier runs the original Frontier-CS `2.0` evaluator and writes a normalized -reward in `/logs/verifier/reward.txt`. +The agent works in `/app` and must create `/app/solution.py` unless the task +declares a directory submission. A judge sidecar prepares the task evaluator +once per trial; both iterative submissions and the final verifier score +through that same sidecar. The final verifier writes a normalized reward in +`/logs/verifier/reward.txt`. During the trial, the agent can call: @@ -61,11 +63,12 @@ bash /app/submit.sh This submits the current `/app/solution.py` to a black-box judge service, prints the score and feedback, and records each attempt in `/logs/agent/submissions.jsonl`. The evaluator source is not mounted into the -agent workspace. The final verifier mirrors that log to -`/logs/verifier/submissions.jsonl` for process-reward analysis. The reported -reward is the maximum of the final `/app/solution.py` score and the best -successful iterative submission, so a timed-out agent can keep its best -submitted solution. +agent workspace. The judge owns the authoritative submission log at +`/logs/judge/submissions.jsonl`; the final verifier filters iterative agent +submissions into `/logs/verifier/submissions.jsonl` for process-reward +analysis. The reported reward is the maximum of the final submission score and +the best successful iterative submission, so a timed-out agent can keep its +best submitted solution. Some Harbor CLI versions print the timeout/error summary before rewards; in that case inspect `result.json`, `verifier/reward.json`, and diff --git a/adapters/frontier-cs-2.0/src/frontier_cs_2_0/adapter.py b/adapters/frontier-cs-2.0/src/frontier_cs_2_0/adapter.py index 8fed62bf..31461308 100644 --- a/adapters/frontier-cs-2.0/src/frontier_cs_2_0/adapter.py +++ b/adapters/frontier-cs-2.0/src/frontier_cs_2_0/adapter.py @@ -233,8 +233,24 @@ def _write_environment(self, task_paths: "TaskPaths", problem: FrontierCS20Probl ), encoding="utf-8", ) - for name in ("docker-compose.yaml", "judge_server.py", "submit.py"): - shutil.copy2(self.template_dir / "environment" / name, env_dir / name) + environment = problem.config.get("environment", {}) or {} + compose = ( + self.template_dir / "environment" / "docker-compose.yaml" + ).read_text(encoding="utf-8") + (env_dir / "docker-compose.yaml").write_text( + compose.format( + judge_cpus=int(environment.get("cpus", 2)), + judge_memory_mb=int(environment.get("memory_mb", 4096)), + ), + encoding="utf-8", + ) + shutil.copy2( + self.template_dir / "environment" / "judge_server.py", + env_dir / "judge_server.py", + ) + shutil.copy2( + self.template_dir / "environment" / "submit.py", env_dir / "submit.py" + ) # Kept in the build context for the judge image only; the main agent # image's Dockerfile does not copy this into /app. shutil.copy2(problem.problem_dir / "evaluator.py", env_dir / "problem_evaluator.py") @@ -254,7 +270,9 @@ def _write_submission_config(self, env_dir: Path, problem: FrontierCS20Problem) def _write_tests(self, task_paths: "TaskPaths", problem: FrontierCS20Problem) -> None: tests_dir = task_paths.tests_dir shutil.copy2(self.template_dir / "tests" / "test.sh", tests_dir / "test.sh") - shutil.copy2(self.template_dir / "tests" / "evaluate.py", tests_dir / "evaluate.py") + shutil.copy2( + self.template_dir / "tests" / "evaluate.py", tests_dir / "evaluate.py" + ) shutil.copy2(problem.problem_dir / "evaluator.py", tests_dir / "problem_evaluator.py") (tests_dir / "test.sh").chmod(0o755) diff --git a/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/environment/docker-compose.yaml b/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/environment/docker-compose.yaml index c70b9b33..7e53a65e 100644 --- a/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/environment/docker-compose.yaml +++ b/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/environment/docker-compose.yaml @@ -8,8 +8,16 @@ services: judge: build: - context: ${CONTEXT_DIR} + context: ${{CONTEXT_DIR}} dockerfile: Dockerfile.judge + deploy: + resources: + limits: + cpus: "{judge_cpus}" + memory: "{judge_memory_mb}M" + reservations: + cpus: "{judge_cpus}" + memory: "{judge_memory_mb}M" command: ["python3", "/judge/judge_server.py"] expose: - "8082" diff --git a/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/environment/judge_server.py b/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/environment/judge_server.py index e5cc7647..2647c9f7 100644 --- a/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/environment/judge_server.py +++ b/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/environment/judge_server.py @@ -14,11 +14,14 @@ import time import traceback import threading +from datetime import datetime, timezone from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path from typing import Any PROBLEM_EVALUATOR_PATH = Path("/judge/problem_evaluator.py") +JUDGE_READY_LOG = Path("/logs/judge/judge_ready.json") +JUDGE_SUBMISSIONS_LOG = Path("/logs/judge/submissions.jsonl") MAX_SUBMISSION_BYTES = 30_000_000 MAX_ARCHIVE_BYTES = 20_000_000 @@ -40,6 +43,44 @@ def load_problem_evaluator(): READY_PAYLOAD: dict[str, Any] = {"status": "starting"} +def now_iso() -> str: + return ( + datetime.now(timezone.utc) + .isoformat(timespec="milliseconds") + .replace("+00:00", "Z") + ) + + +def write_judge_ready(payload: dict[str, Any]) -> None: + try: + JUDGE_READY_LOG.parent.mkdir(parents=True, exist_ok=True) + JUDGE_READY_LOG.write_text(json.dumps(payload, indent=2), encoding="utf-8") + except OSError as exc: + print(f"WARN: failed to write judge_ready.json: {exc}", flush=True) + + +def log_submission(record: dict[str, Any]) -> None: + JUDGE_SUBMISSIONS_LOG.parent.mkdir(parents=True, exist_ok=True) + with JUDGE_SUBMISSIONS_LOG.open("a", encoding="utf-8") as f: + f.write(json.dumps({"ts": now_iso(), **record}, ensure_ascii=False) + "\n") + + +def read_submissions() -> list[dict[str, Any]]: + if not JUDGE_SUBMISSIONS_LOG.exists(): + return [] + records: list[dict[str, Any]] = [] + for line in JUDGE_SUBMISSIONS_LOG.read_text(encoding="utf-8").splitlines(): + if not line.strip(): + continue + try: + record = json.loads(line) + except json.JSONDecodeError: + continue + if isinstance(record, dict): + records.append(record) + return records + + def prepare_evaluator() -> None: global EVALUATOR, READY, READY_PAYLOAD start = time.time() @@ -60,6 +101,7 @@ def prepare_evaluator() -> None: "prepare_seconds": elapsed, **payload, } + write_judge_ready(READY_PAYLOAD) READY = True print( "[frontier judge] ready " @@ -144,6 +186,9 @@ def do_GET(self) -> None: if self.path == "/health": self._write_json(200 if READY else 503, READY_PAYLOAD) return + if self.path == "/submissions": + self._write_json(200, {"status": "ok", "submissions": read_submissions()}) + return self._write_json(404, {"status": "error", "error": "not found"}) def do_POST(self) -> None: @@ -176,8 +221,13 @@ def do_POST(self) -> None: self._write_json(413, {"status": "error", "error": "submission too large"}) return + submission_uuid = "" + submission_role = "agent" + submission_kind = "file" try: payload = json.loads(self.rfile.read(content_length).decode("utf-8")) + submission_uuid = str(payload.get("submission_uuid") or "") + submission_role = str(payload.get("submission_role") or "agent") submission_kind = payload.get("submission_kind", "file") if submission_kind == "directory": archive_b64 = payload.get("archive_b64") @@ -185,25 +235,49 @@ def do_POST(self) -> None: raise ValueError( "directory submission must include archive_b64" ) - self._write_json(200, evaluate_archive(archive_b64)) + result = evaluate_archive(archive_b64) + log_submission( + { + "submission_uuid": submission_uuid, + "submission_role": submission_role, + "submission_kind": submission_kind, + **result, + } + ) + self._write_json(200, result) return code = payload.get("code") if not isinstance(code, str) or not code.strip(): raise ValueError( "file submission must include non-empty string field 'code'" ) - self._write_json(200, evaluate_code(code)) + result = evaluate_code(code) + log_submission( + { + "submission_uuid": submission_uuid, + "submission_role": submission_role, + "submission_kind": submission_kind, + **result, + } + ) + self._write_json(200, result) except Exception: print(traceback.format_exc(), flush=True) - self._write_json( - 200, + result = { + "status": "error", + "score": 0.0, + "score_unbounded": 0.0, + "message": "evaluation failed", + } + log_submission( { - "status": "error", - "score": 0.0, - "score_unbounded": 0.0, - "message": "evaluation failed", - }, + "submission_uuid": submission_uuid, + "submission_role": submission_role, + "submission_kind": submission_kind, + **result, + } ) + self._write_json(200, result) def log_message(self, fmt: str, *args: object) -> None: return diff --git a/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/environment/submit.py b/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/environment/submit.py index ad236d89..2e0cc44c 100644 --- a/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/environment/submit.py +++ b/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/environment/submit.py @@ -106,7 +106,7 @@ def make_directory_archive(root: Path, exclude: list[str]) -> tuple[str, int]: return base64.b64encode(buf.getvalue()).decode("ascii"), file_count -def evaluate_with_judge(payload: dict) -> tuple[float, float, str, dict]: +def evaluate_with_judge(payload: dict) -> dict: wait_for_judge() response = requests.post( f"{JUDGE_URL}/evaluate", @@ -117,12 +117,7 @@ def evaluate_with_judge(payload: dict) -> tuple[float, float, str, dict]: payload = response.json() if payload.get("status") != "done": raise RuntimeError(str(payload.get("message") or payload.get("error") or payload)) - return ( - float(payload.get("score", 0.0)), - float(payload.get("score_unbounded", payload.get("score", 0.0))), - str(payload.get("message", "")), - dict(payload.get("metrics", {}) or {}), - ) + return payload def main() -> int: @@ -181,6 +176,7 @@ def main() -> int: ) judge_payload = { "submission_kind": "directory", + "submission_uuid": sub_uuid, "archive_b64": archive_b64, } else: @@ -199,11 +195,19 @@ def main() -> int: } ) return 2 - judge_payload = {"submission_kind": "file", "code": code} + judge_payload = { + "submission_kind": "file", + "submission_uuid": sub_uuid, + "code": code, + } try: start = time.time() - score, score_unbounded, message, metrics = evaluate_with_judge(judge_payload) + judge_result = evaluate_with_judge(judge_payload) + score = float(judge_result.get("score", 0.0)) + score_unbounded = float(judge_result.get("score_unbounded", score)) + message = str(judge_result.get("message", "")) + metrics = dict(judge_result.get("metrics", {}) or {}) elapsed_seconds = time.time() - start reward = float(score) / 100.0 diff --git a/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/tests/evaluate.py b/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/tests/evaluate.py index 4e5ada80..2807a349 100644 --- a/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/tests/evaluate.py +++ b/adapters/frontier-cs-2.0/src/frontier_cs_2_0/task-template/tests/evaluate.py @@ -3,40 +3,65 @@ from __future__ import annotations -import importlib.util +import base64 +import io import json +import os import shutil -import sys +import tarfile +import time import traceback +import uuid from pathlib import Path +from urllib import error, request SOLUTION_PATH = Path("/app/solution.py") APP_PATH = Path("/app") SUBMISSION_CONFIG_PATH = Path("/app/submission_config.json") -PROBLEM_EVALUATOR_PATH = Path("/tests/problem_evaluator.py") REWARD_TXT = Path("/logs/verifier/reward.txt") REWARD_JSON = Path("/logs/verifier/reward.json") -AGENT_SUBMISSIONS_LOG = Path("/logs/agent/submissions.jsonl") +JUDGE_SUBMISSIONS_LOG = Path("/logs/judge/submissions.jsonl") +JUDGE_READY_LOG = Path("/logs/judge/judge_ready.json") VERIFIER_SUBMISSIONS_LOG = Path("/logs/verifier/submissions.jsonl") +VERIFIER_JUDGE_READY_LOG = Path("/logs/verifier/judge_ready.json") EVALUATION_JSON = Path("/logs/verifier/evaluation_result.json") +JUDGE_URL = os.environ.get("JUDGE_URL", "http://judge:8082").rstrip("/") +JUDGE_TIMEOUT_SECONDS = int(os.environ.get("JUDGE_TIMEOUT_SECONDS", "10800")) + + +def submission_reward(record: dict) -> float | None: + try: + return float(record.get("score", 0.0)) / 100.0 + except (TypeError, ValueError): + return None def best_submission() -> dict | None: - if not AGENT_SUBMISSIONS_LOG.exists(): + submissions_log = ( + VERIFIER_SUBMISSIONS_LOG + if VERIFIER_SUBMISSIONS_LOG.exists() + else JUDGE_SUBMISSIONS_LOG + ) + if not submissions_log.exists(): return None best: dict | None = None - for line in AGENT_SUBMISSIONS_LOG.read_text(encoding="utf-8").splitlines(): + for line in submissions_log.read_text(encoding="utf-8").splitlines(): if not line.strip(): continue try: record = json.loads(line) - reward = float(record.get("score", 0.0)) - except (TypeError, ValueError, json.JSONDecodeError): + reward = submission_reward(record) + if reward is None: + continue + except json.JSONDecodeError: + continue + if record.get("submission_role", "agent") != "agent": continue if record.get("status") != "done": continue - if best is None or reward > float(best.get("score", 0.0)): + best_reward = submission_reward(best) if best is not None else None + if best is None or best_reward is None or reward > best_reward: best = record return best @@ -56,65 +81,201 @@ def write_reward(reward: float, detail: str = "", extra: dict | None = None) -> EVALUATION_JSON.write_text(json.dumps(sidecar, indent=2), encoding="utf-8") -def copy_submissions_log() -> None: - if AGENT_SUBMISSIONS_LOG.exists(): - VERIFIER_SUBMISSIONS_LOG.parent.mkdir(parents=True, exist_ok=True) +def copy_judge_artifacts() -> None: + records: list[dict] = [] + try: + with request.urlopen(f"{JUDGE_URL}/submissions", timeout=5) as response: + payload = json.loads(response.read().decode("utf-8")) + http_records = payload.get("submissions", []) + if isinstance(http_records, list): + records.extend(record for record in http_records if isinstance(record, dict)) + except Exception as exc: + print(f"WARN: failed to fetch judge submissions: {exc}") + + if JUDGE_SUBMISSIONS_LOG.exists(): try: - shutil.copy2(AGENT_SUBMISSIONS_LOG, VERIFIER_SUBMISSIONS_LOG) + with JUDGE_SUBMISSIONS_LOG.open("r", encoding="utf-8") as src: + for line in src: + try: + record = json.loads(line) + except json.JSONDecodeError: + continue + if isinstance(record, dict): + records.append(record) except OSError as exc: print(f"WARN: failed to copy submissions.jsonl: {exc}") + seen: set[str] = set() + VERIFIER_SUBMISSIONS_LOG.parent.mkdir(parents=True, exist_ok=True) + with VERIFIER_SUBMISSIONS_LOG.open("w", encoding="utf-8") as dst: + for record in records: + if record.get("submission_role", "agent") != "agent": + continue + key = str(record.get("submission_uuid") or json.dumps(record, sort_keys=True)) + if key in seen: + continue + seen.add(key) + dst.write(json.dumps(record, ensure_ascii=False) + "\n") -def load_problem_evaluator(): - spec = importlib.util.spec_from_file_location( - "frontier_cs_2_0_problem_evaluator", PROBLEM_EVALUATOR_PATH - ) - if spec is None or spec.loader is None: - raise RuntimeError(f"could not load evaluator from {PROBLEM_EVALUATOR_PATH}") - module = importlib.util.module_from_spec(spec) - sys.modules[spec.name] = module - spec.loader.exec_module(module) - return module - - -def normalize_result(result): - if not isinstance(result, tuple) or len(result) not in (3, 4): - raise TypeError("evaluator must return (score, score_unbounded, message[, metrics])") - score = float(result[0]) - score_unbounded = float(result[1]) - message = str(result[2]) - metrics = result[3] if len(result) == 4 else {} - if not isinstance(metrics, dict): - raise TypeError("evaluator metrics must be a dict") - return score, score_unbounded, message, metrics + if JUDGE_READY_LOG.exists(): + VERIFIER_JUDGE_READY_LOG.parent.mkdir(parents=True, exist_ok=True) + try: + shutil.copy2(JUDGE_READY_LOG, VERIFIER_JUDGE_READY_LOG) + except OSError as exc: + print(f"WARN: failed to copy judge_ready.json: {exc}") -def load_submission_path() -> Path: +def load_submission_config() -> dict: if not SUBMISSION_CONFIG_PATH.exists(): - return SOLUTION_PATH - config = json.loads(SUBMISSION_CONFIG_PATH.read_text(encoding="utf-8")) + return {"kind": "file", "path": str(SOLUTION_PATH), "exclude": []} + return json.loads(SUBMISSION_CONFIG_PATH.read_text(encoding="utf-8")) + + +def should_exclude(path: Path, root: Path, exclude: list[str]) -> bool: + rel = path.relative_to(root).as_posix() + parts = set(path.relative_to(root).parts) + for pattern in exclude: + pattern = str(pattern).strip("/") + if not pattern: + continue + if rel == pattern or rel.startswith(pattern + "/") or pattern in parts: + return True + return False + + +def make_directory_archive(root: Path, exclude: list[str]) -> tuple[str, int]: + buf = io.BytesIO() + file_count = 0 + with tarfile.open(fileobj=buf, mode="w:gz") as tar: + for path in sorted(root.rglob("*")): + if should_exclude(path, root, exclude): + continue + if path.is_file(): + tar.add(path, arcname=path.relative_to(root).as_posix()) + file_count += 1 + return base64.b64encode(buf.getvalue()).decode("ascii"), file_count + + +def wait_for_judge() -> None: + deadline = time.time() + JUDGE_TIMEOUT_SECONDS + last_error: Exception | None = None + while time.time() < deadline: + try: + with request.urlopen(f"{JUDGE_URL}/health", timeout=5) as response: + payload = json.loads(response.read().decode("utf-8")) + if payload.get("status") == "error": + raise RuntimeError(f"judge setup failed: {payload.get('error') or payload}") + return + except error.HTTPError as exc: + try: + payload = json.loads(exc.read().decode("utf-8")) + except Exception: + payload = {} + if payload.get("status") == "error": + raise RuntimeError(f"judge setup failed: {payload.get('error') or payload}") + last_error = exc + time.sleep(1) + except RuntimeError: + raise + except Exception as exc: + last_error = exc + time.sleep(1) + raise RuntimeError(f"judge service is not ready at {JUDGE_URL}: {last_error}") + + +def post_json(url: str, payload: dict) -> dict: + body = json.dumps(payload).encode("utf-8") + req = request.Request( + url, + data=body, + headers={"Content-Type": "application/json"}, + method="POST", + ) + try: + with request.urlopen(req, timeout=JUDGE_TIMEOUT_SECONDS) as response: + return json.loads(response.read().decode("utf-8")) + except error.HTTPError as exc: + detail = exc.read().decode("utf-8", errors="replace") + raise RuntimeError(f"judge HTTP {exc.code}: {detail}") from exc + + +def build_judge_payload(solution_path: Path, config: dict) -> dict: + submission_kind = str(config.get("kind", "file")) + exclude = list(config.get("exclude", []) or []) + submission_uuid = str(uuid.uuid4()) + + if submission_kind == "directory": + archive_b64, _ = make_directory_archive(solution_path, exclude) + return { + "submission_kind": "directory", + "submission_uuid": submission_uuid, + "submission_role": "final", + "archive_b64": archive_b64, + } + + return { + "submission_kind": "file", + "submission_uuid": submission_uuid, + "submission_role": "final", + "code": solution_path.read_text(encoding="utf-8"), + } + + +def evaluate_with_judge(payload: dict) -> dict: + wait_for_judge() + result = post_json(f"{JUDGE_URL}/evaluate", payload) + if result.get("status") != "done": + raise RuntimeError(str(result.get("message") or result.get("error") or result)) + return result + + +def load_submission_path(config: dict) -> Path: if config.get("kind") == "directory": return Path(config.get("path") or APP_PATH) return Path(config.get("path") or SOLUTION_PATH) +def write_result(result: dict) -> None: + score = float(result.get("score", 0.0)) + score_unbounded = float(result.get("score_unbounded", score)) + message = str(result.get("message", "")) + metrics = result.get("metrics", {}) + if not isinstance(metrics, dict): + metrics = {} + reward = score / 100.0 + print(message) + print(f"Score: {score}/100 (reward: {reward:.4f})") + if score_unbounded != score: + print(f"Unbounded score: {score_unbounded}") + write_reward( + reward, + message, + { + "score": score, + "score_unbounded": score_unbounded, + "raw_result": result, + **metrics, + }, + ) + + def main() -> None: - copy_submissions_log() + copy_judge_artifacts() best = best_submission() def write_best_submission_reward(reason: str) -> bool: if best is None: return False - reward = float(best.get("score", 0.0)) - score_raw = best.get("score_raw", reward * 100.0) - score_unbounded = best.get("score_unbounded", score_raw) + score_raw = float(best.get("score", 0.0)) + reward = score_raw / 100.0 + score_unbounded = float(best.get("score_unbounded", score_raw)) metrics = best.get("metrics", {}) if not isinstance(metrics, dict): metrics = {} print(f"Using best iterative submission after {reason}: reward={reward:.4f}") write_reward( reward, - f"Using best iterative submission after {reason}: {best.get('detail', '')}", + f"Using best iterative submission after {reason}: {best.get('message', '')}", { "score": score_raw, "score_unbounded": score_unbounded, @@ -125,44 +286,33 @@ def write_best_submission_reward(reason: str) -> bool: ) return True - solution_path = load_submission_path() - if not solution_path.exists(): - print(f"ERROR: {solution_path} not found") - if write_best_submission_reward(f"{solution_path} not found"): + try: + config = load_submission_config() + solution_path = load_submission_path(config) + if not solution_path.exists(): + print(f"ERROR: {solution_path} not found") + if write_best_submission_reward(f"{solution_path} not found"): + return + write_reward(0.0, f"{solution_path} not found") return - write_reward(0.0, f"{solution_path} not found") - return - if solution_path.is_file() and not solution_path.read_text(encoding="utf-8").strip(): - print("ERROR: /app/solution.py is empty") - if write_best_submission_reward("solution.py is empty"): + if solution_path.is_file() and not solution_path.read_text(encoding="utf-8").strip(): + print(f"ERROR: {solution_path} is empty") + if write_best_submission_reward(f"{solution_path} is empty"): + return + write_reward(0.0, f"{solution_path} is empty") return - write_reward(0.0, "solution.py is empty") - return - try: - evaluator = load_problem_evaluator() - score, score_unbounded, message, metrics = normalize_result( - evaluator.evaluate(str(solution_path)) - ) - reward = float(score) / 100.0 - if best is not None and float(best.get("score", 0.0)) > reward: + final_result = evaluate_with_judge(build_judge_payload(solution_path, config)) + copy_judge_artifacts() + final_reward = float(final_result.get("score", 0.0)) / 100.0 + best_reward = submission_reward(best) if best is not None else None + if best_reward is not None and best_reward > final_reward: write_best_submission_reward("final solution scored below best submission") return - print(message) - print(f"Score: {score}/100 (reward: {reward:.4f})") - if score_unbounded != score: - print(f"Unbounded score: {score_unbounded}") - write_reward( - reward, - message, - { - "score": score, - "score_unbounded": score_unbounded, - **metrics, - }, - ) + write_result(final_result) except Exception as exc: print(traceback.format_exc()) + copy_judge_artifacts() if write_best_submission_reward(f"final evaluation failed: {exc}"): return write_reward(0.0, f"Evaluation failed: {exc}") diff --git a/adapters/frontier-cs-algorithm/README.md b/adapters/frontier-cs-algorithm/README.md index 767e5e64..01814d1b 100644 --- a/adapters/frontier-cs-algorithm/README.md +++ b/adapters/frontier-cs-algorithm/README.md @@ -57,9 +57,9 @@ the average of per-case scores normalized to `[0, 1]` and written to final `/app/solution.cpp` score and the best successful iterative submission. This preserves progress when an agent times out after submitting a working solution but before leaving a better final file. -- **Process reward artifacts**: every interactive submission is recorded in - `/logs/agent/submissions.jsonl` and mirrored to - `/logs/verifier/submissions.jsonl` by the final verifier. +- **Process reward artifacts**: `/logs/agent/submissions.jsonl` is used for + live progress display; final scoring rebuilds `/logs/verifier/submissions.jsonl` + from judge-owned artifacts under `/logs/artifacts/judge/submissions`. - **Per-task verifier timeout**: scaled as `max(120, n_cases * time_limit_seconds * 5 + 60)` so harder problems with many cases do not time out before the judge finishes. diff --git a/adapters/frontier-cs-algorithm/src/frontier_cs_algorithm/task-template/tests/evaluate.py b/adapters/frontier-cs-algorithm/src/frontier_cs_algorithm/task-template/tests/evaluate.py index 97f8f8b2..924edabc 100644 --- a/adapters/frontier-cs-algorithm/src/frontier_cs_algorithm/task-template/tests/evaluate.py +++ b/adapters/frontier-cs-algorithm/src/frontier_cs_algorithm/task-template/tests/evaluate.py @@ -6,7 +6,6 @@ import json import os -import shutil import time from pathlib import Path @@ -17,41 +16,82 @@ SOLUTION_PATH = Path("/app/solution.cpp") REWARD_TXT = Path("/logs/verifier/reward.txt") REWARD_JSON = Path("/logs/verifier/reward.json") -AGENT_SUBMISSIONS_LOG = Path("/logs/agent/submissions.jsonl") +JUDGE_SUBMISSIONS_DIR = Path("/logs/artifacts/judge/submissions") VERIFIER_SUBMISSIONS_LOG = Path("/logs/verifier/submissions.jsonl") JUDGE_RESULT_JSON = Path("/logs/verifier/judge_result.json") POLL_INTERVAL = 2 # seconds MAX_POLL_TIME = int(os.environ.get("MAX_POLL_TIME", "600")) # seconds -def copy_submissions_log() -> None: - """Mirror the agent's in-trace submissions log into verifier artifacts. +def _submission_sort_key(path: Path) -> tuple[int, str]: + try: + return (int(path.parent.name), path.parent.name) + except ValueError: + return (10**18, path.parent.name) - The verifier dir is always present in the trial output; copying the log - here means the post-hoc analysis only has to look in one place even if - /logs/agent/ collection layout shifts. - """ - if AGENT_SUBMISSIONS_LOG.exists(): - VERIFIER_SUBMISSIONS_LOG.parent.mkdir(parents=True, exist_ok=True) + +def judge_submission_records() -> list[dict]: + """Rebuild the iterative submission log from judge-owned artifacts.""" + problem_dir = JUDGE_SUBMISSIONS_DIR / str(PROBLEM_ID) + if not problem_dir.exists(): + return [] + + records: list[dict] = [] + for result_path in sorted( + problem_dir.glob("*/result.json"), key=_submission_sort_key + ): + try: + result = json.loads(result_path.read_text()) + except (OSError, json.JSONDecodeError): + continue + + meta: dict = {} + meta_path = result_path.parent / "meta.json" + if meta_path.exists(): + try: + meta = json.loads(meta_path.read_text()) + except (OSError, json.JSONDecodeError): + meta = {} + + score_raw = result.get("score") or 0.0 try: - shutil.copy2(AGENT_SUBMISSIONS_LOG, VERIFIER_SUBMISSIONS_LOG) - except OSError as exc: - print(f"WARN: failed to copy submissions.jsonl: {exc}") + reward = float(score_raw) / 100.0 + except (TypeError, ValueError): + reward = 0.0 + + records.append( + { + "ts": meta.get("ts"), + "status": result.get("status", "unknown"), + "sid": meta.get("sid") or result_path.parent.name, + "problem_id": meta.get("pid") or PROBLEM_ID, + "score": reward, + "score_raw": score_raw, + "score_unbounded": result.get("scoreUnbounded"), + "detail": result.get("message") + or result.get("detail") + or result.get("result") + or "", + "raw_result": result, + } + ) + return records -def best_submission() -> dict | None: - if not AGENT_SUBMISSIONS_LOG.exists(): - return None +def write_submissions_log(records: list[dict]) -> None: + VERIFIER_SUBMISSIONS_LOG.parent.mkdir(parents=True, exist_ok=True) + with VERIFIER_SUBMISSIONS_LOG.open("w") as f: + for record in records: + f.write(json.dumps(record, ensure_ascii=False) + "\n") + +def best_submission(records: list[dict]) -> dict | None: best: dict | None = None - for line in AGENT_SUBMISSIONS_LOG.read_text().splitlines(): - if not line.strip(): - continue + for record in records: try: - record = json.loads(line) reward = float(record.get("score", 0.0)) - except (TypeError, ValueError, json.JSONDecodeError): - continue + except (TypeError, ValueError): + reward = 0.0 if record.get("status") != "done": continue if best is None or reward > float(best.get("score", 0.0)): @@ -71,11 +111,15 @@ def write_reward( """ REWARD_TXT.parent.mkdir(parents=True, exist_ok=True) REWARD_TXT.write_text(str(score)) - REWARD_JSON.write_text(json.dumps({"reward": score}, indent=2)) + numeric_rewards: dict[str, float | int] = {"reward": score} sidecar: dict[str, object] = {"reward": score, "detail": detail} if extra: - sidecar.update(extra) + for key, value in extra.items(): + if isinstance(value, (int, float)) and not isinstance(value, bool): + numeric_rewards[key] = value + sidecar[key] = value + REWARD_JSON.write_text(json.dumps(numeric_rewards, indent=2)) JUDGE_RESULT_JSON.write_text(json.dumps(sidecar, indent=2)) @@ -83,9 +127,9 @@ def main(): print(f"Frontier-CS Problem {PROBLEM_ID}") print(f"Judge: {JUDGE_URL}") - # Always mirror the agent's in-trace submission log into verifier artifacts. - copy_submissions_log() - best = best_submission() + records = judge_submission_records() + write_submissions_log(records) + best = best_submission(records) def write_best_submission_reward(reason: str) -> bool: if best is None: diff --git a/src/frontier_cs/cli.py b/src/frontier_cs/cli.py index 92ae0afc..8a9dece2 100644 --- a/src/frontier_cs/cli.py +++ b/src/frontier_cs/cli.py @@ -1168,6 +1168,23 @@ def _count_successful_submissions(trial_dir: Path) -> tuple[int, float | None]: if not submissions.exists(): return 0, None + def record_reward(record: dict) -> float | None: + if "reward" in record: + try: + return float(record["reward"]) + except (TypeError, ValueError): + return None + if "score_raw" in record: + try: + return float(record.get("score_raw", 0.0)) / 100.0 + except (TypeError, ValueError): + return None + try: + score = float(record.get("score", 0.0)) + except (TypeError, ValueError): + return None + return score / 100.0 if score > 1.0 else score + successful = 0 best = None for line in submissions.read_text(encoding="utf-8", errors="replace").splitlines(): @@ -1175,12 +1192,14 @@ def _count_successful_submissions(trial_dir: Path) -> tuple[int, float | None]: continue try: record = json.loads(line) - score = float(record.get("score", 0.0)) except (TypeError, ValueError, json.JSONDecodeError): continue if record.get("status") == "done": + reward = record_reward(record) + if reward is None: + continue successful += 1 - best = score if best is None else max(best, score) + best = reward if best is None else max(best, reward) return successful, best