diff --git a/README.md b/README.md index 01774ff73..dd053cb2e 100644 --- a/README.md +++ b/README.md @@ -75,6 +75,38 @@ uv run vf-eval wordle -m gpt-5-nano For advanced evaluation configurations with the `prime` [CLI](https://github.com/PrimeIntellect-ai/prime-cli), see [here](https://docs.primeintellect.ai/tutorials-environments/evaluating) +## Prompt Optimization with GEPA + +Automatically improve your environment's prompts using GEPA (Genetic-Pareto): + +```bash +# Install GEPA extras +uv add 'verifiers[gepa]' + +# Optimize system prompt +vf-gepa wordle --budget medium + +# Optimize system prompt + tool descriptions +vf-gepa wiki-search --budget heavy --components system_prompt tool_descriptions +``` + +GEPA analyzes your rubric's feedback and iteratively refines prompts. Works best when reward functions return rich textual feedback. See the [GEPA documentation](docs/source/gepa.md) for details. + +After a run completes, apply the saved components to an environment instance: + +```python +import json +import verifiers as vf + +with open("gepa_results/wordle//wordle_optimized.json") as f: + optimized = json.load(f) + +env = vf.load_environment("wordle") +env.system_prompt = optimized["system_prompt"] +if "tool_0_description" in optimized and hasattr(env, "oai_tools"): + env.oai_tools[0]["function"]["description"] = optimized["tool_0_description"] +``` + ## RL Training ### `prime-rl` diff --git a/docs/source/gepa.md b/docs/source/gepa.md new file mode 100644 index 000000000..d44aac2ab --- /dev/null +++ b/docs/source/gepa.md @@ -0,0 +1,435 @@ +# GEPA: Prompt Optimization + +GEPA (Genetic-Pareto) is an automatic prompt optimization system that improves your environment's system prompts and tool descriptions based on rubric feedback. + +## Overview + +GEPA works by: +1. Testing your current prompts on examples +2. Analyzing failures using rubric feedback +3. Generating improved prompts through reflection +4. Iteratively refining until convergence + +This is particularly effective when combined with `FeedbackRubric`, which provides rich textual feedback explaining why rollouts succeeded or failed. + +## Installation + +GEPA is available as an optional dependency: + +```bash +uv add 'verifiers[gepa]' +``` + +This installs the `gepa` optimization engine. + +## Quick Start + +Optimize the system prompt for an environment: + +```bash +vf-gepa wordle --budget medium +``` + +This will: +- Load the `wordle` environment +- Use medium budget (~12 candidate prompts) +- Optimize the `system_prompt` component +- Save results to `./gepa_results/wordle//` + +## Budget Modes + +GEPA offers three budget presets: + +### Light (~6 candidates) +Fast iteration for testing: +```bash +vf-gepa my-env --budget light +``` +- Best for: Quick experiments, initial testing +- Time: ~30-60 minutes for typical environments +- Use when: Testing GEPA setup, first optimization runs + +### Medium (~12 candidates) +Balanced optimization: +```bash +vf-gepa my-env --budget medium +``` +- Best for: Most use cases, good improvements +- Time: ~1-2 hours for typical environments +- Use when: Standard optimization runs + +### Heavy (~18 candidates) +Thorough exploration: +```bash +vf-gepa my-env --budget heavy +``` +- Best for: Final production prompts, critical environments +- Time: ~2-4 hours for typical environments +- Use when: You need the best possible prompt + +### Custom Budget + +For fine control, specify exact metric calls: +```bash +vf-gepa my-env --max-metric-calls 1000 +``` + +### Faster Iteration + +For quicker feedback cycles (at the cost of potentially noisier signals), reduce the minibatch size: +```bash +vf-gepa my-env --budget light --reflection-minibatch-size 10 +``` + +The default minibatch size is 35 examples per reflection step. Smaller values (5-15) trade stability for speed, useful during initial experimentation. + +## Component Selection + +By default, GEPA optimizes `system_prompt`. You can specify multiple components: + +### System Prompt Only +```bash +vf-gepa my-env --budget medium --components system_prompt +``` + +### Tool Descriptions +For environments with tools, optimize their descriptions: +```bash +vf-gepa wiki-search --budget medium --components tool_descriptions +``` + +### Both System Prompt and Tool Descriptions +```bash +vf-gepa wiki-search --budget heavy --components system_prompt tool_descriptions +``` + +When optimizing `tool_descriptions`, GEPA: +1. Extracts each tool's description from `oai_tools` +2. Treats each as a separate component to optimize +3. Uses separate reflection for each tool +4. Injects optimized descriptions back into tools + +## Model Configuration + +### Task Model +The model being optimized (default: `gpt-5-mini`): +```bash +vf-gepa my-env --budget medium -m gpt-5-mini +``` + +### Reflection Model +The model generating improved prompts (default: `gpt-5-mini`): +```bash +vf-gepa my-env --budget medium --reflection-model gpt-5-mini +``` + +### Sampling Parameters +```bash +vf-gepa my-env --budget medium \ + -T 0.7 \ # Temperature for task model + -t 2048 \ # Max tokens + --reflection-temperature 1.0 # Temperature for reflection +``` + +## Dataset Configuration + +Control train/validation split sizes: + +```bash +vf-gepa my-env --budget medium \ + -n 100 \ # 100 training examples + --num-val 30 # 30 validation examples +``` + +**Guidelines**: +- Training: 50-100 examples (more = slower but potentially better) +- Validation: 20-30 examples (for measuring improvement) +- Use representative examples that cover your task's diversity + +## Output + +GEPA saves three files to `./gepa_results///`: + +### 1. `_optimized.json` +The optimized components: +```json +{ + "system_prompt": "You are a competitive Wordle player...", + "tool_0_description": "Search Wikipedia for..." +} +``` + +### 2. `_original.json` +The original components for comparison. + +### 3. `_metrics.json` +Optimization metrics: +```json +{ + "best_val_score": 0.85, + "initial_val_score": 0.62, + "improvement": 0.23, + "num_candidates": 12, + "candidates_history": [...] +} +``` + +## Rubric Feedback Support + +For best results, have your reward functions return feedback: + +```python +import verifiers as vf + +def accuracy_with_feedback(parser, completion, answer, **kwargs): + """Reward function that returns score + feedback.""" + guess = parser.parse_answer(completion) + correct = (guess == answer) + + return { + "score": 1.0 if correct else 0.0, + "feedback": ( + f"{'✓' if correct else '✗'} " + f"Expected: {answer}, Got: {guess}" + ) + } + +rubric = vf.Rubric(parser=parser) +rubric.add_reward_func(accuracy_with_feedback) +``` + +The `feedback` field is used by GEPA to understand *why* completions failed, enabling better prompt improvements. The base `Rubric` class automatically collects feedback via its `get_feedback()` method. + +## Advanced Usage + +### Multiple Rollouts Per Example +Increase robustness with multiple rollouts: +```bash +vf-gepa my-env --budget medium --rollouts-per-example 3 +``` + +### Custom Log Directory +```bash +vf-gepa my-env --budget medium --log-dir ./my_optimization_runs +``` + +### Track Detailed Statistics +Save full outputs for analysis: +```bash +vf-gepa my-env --budget medium --track-stats +``` + +### Verbose Logging +Debug optimization process: +```bash +vf-gepa my-env --budget medium -v +``` + +## Experiment Tracking + +GEPA supports integration with popular experiment tracking platforms to monitor and analyze optimization runs. + +### Weights & Biases (wandb) + +Track GEPA runs in wandb: + +```bash +vf-gepa my-env --budget medium \ + --use-wandb \ + --wandb-project my-project \ + --wandb-entity my-team \ + --wandb-name "wordle-optimization" +``` + +**Configuration options**: +- `--use-wandb`: Enable wandb logging +- `--wandb-project PROJECT`: Wandb project name +- `--wandb-entity ENTITY`: Wandb entity/team name +- `--wandb-name NAME`: Run name (default: auto-generated from env_id) +- `--wandb-api-key-var VAR`: Environment variable containing API key (default: `WANDB_API_KEY`) +- `--wandb-init-kwargs JSON`: Additional `wandb.init()` kwargs as JSON + +**Example with additional kwargs**: +```bash +vf-gepa my-env --budget medium \ + --use-wandb \ + --wandb-project gepa-experiments \ + --wandb-init-kwargs '{"tags": ["baseline", "system-prompt"], "mode": "online"}' +``` + +**Logged metrics**: +- Validation scores per candidate +- Training scores per reflection step +- Component-level improvements +- Optimization progress over time +- Final best candidate components + +### MLflow + +Track GEPA runs in MLflow: + +```bash +vf-gepa my-env --budget medium \ + --use-mlflow \ + --mlflow-tracking-uri http://localhost:5000 \ + --mlflow-experiment-name gepa-wordle +``` + +**Configuration options**: +- `--use-mlflow`: Enable MLflow logging +- `--mlflow-tracking-uri URI`: MLflow tracking server URI +- `--mlflow-experiment-name NAME`: Experiment name + +**Logged data**: +- Parameters: model, budget, dataset sizes, components +- Metrics: validation scores, improvements +- Artifacts: optimized components, metrics JSON + +### Using Both Simultaneously + +You can enable both wandb and MLflow tracking in the same run: + +```bash +vf-gepa my-env --budget medium \ + --use-wandb --wandb-project my-project \ + --use-mlflow --mlflow-tracking-uri http://localhost:5000 +``` + +## Best Practices + +### 1. Provide Rich Feedback +GEPA works best when reward functions return textual feedback explaining scores. If your functions only return numbers, GEPA has less to work with. + +**Good**: +```python +return { + "score": 0.5, + "feedback": "Partially correct. Got step 1 right but step 2 is missing." +} +``` + +**OK but less effective**: +```python +return 0.5 # GEPA will only see the number +``` + +### 2. Use Representative Examples +Ensure your training and validation sets cover the full range of task difficulty and variety. + +### 3. Start Light, Then Scale Up +Begin with `--budget light` to verify everything works, then use `medium` or `heavy` for production. + +### 4. Iterate on Feedback Quality +If GEPA improvements are small, review your rubric's feedback. More specific feedback = better improvements. + +### 5. Version Control Prompts +Save optimized prompts in your repo and track which version is in production. + +## Troubleshooting + +### "Error: GEPA is not installed" +```bash +uv add 'verifiers[gepa]' +``` + +### "Environment does not have component 'X'" +Check that your environment exposes the component you're trying to optimize. Use `--components system_prompt` (default) if unsure. + +## Limitations + +### Unsupported Environment Types +- **EnvGroup**: GEPA operates on a single environment at a time. Optimize each member separately, then compose them with `EnvGroup`. +- **Dynamic tools**: Environments that mutate their tool list during `__init__` or per rollout may not preserve those changes across candidate reconstruction. + +### Requirements +- Components you optimize must be attributes on the environment object (e.g., `system_prompt`). +- `tool_descriptions` optimization requires `oai_tools` to be defined up front. +- Reward functions should emit textual feedback to unlock GEPA's reflection step. + +### Operational Constraints +- Multiple rollouts per example scale linearly in cost—start small before increasing `--rollouts-per-example`. +- Heavy budgets require high-quality validation datasets; under-sized eval sets can hide regressions. +- GEPA expects deterministic environment construction. Expensive setup code will re-run for every candidate. + +### Low Improvement +- Increase budget: Use `--budget heavy` or `--max-metric-calls 2000` +- Improve feedback: Make your rubric's feedback more specific +- Add more examples: Use `-n 100 --num-val 30` +- Check dataset quality: Ensure examples are representative + +### Out of Memory +- Reduce batch sizes: `--reflection-minibatch-size 2` +- Reduce examples: `-n 30 --num-val 10` +- Use smaller models: `-m gpt-5-mini` + +## Examples + +### Basic Optimization +```bash +vf-gepa wordle --budget medium +``` + +### Tool-Using Environment +```bash +vf-gepa wiki-search --budget heavy \ + --components system_prompt tool_descriptions \ + -m gpt-5-mini +``` + +### Large-Scale Optimization +```bash +vf-gepa my-env --max-metric-calls 2000 \ + -n 200 --num-val 50 \ + --rollouts-per-example 3 \ + --track-stats +``` + +### Custom Models +```bash +vf-gepa my-env --budget medium \ + -m claude-3-5-sonnet-20241022 \ + --reflection-model gpt-5-mini +``` + +## API Usage + +For programmatic use: + +```python +import verifiers as vf +from verifiers.gepa import GEPAAdapter +from gepa import optimize + +# Load environment +env = vf.load_environment("wordle") + +# Create adapter +adapter = GEPAAdapter( + env=env, + client=client, + model="gpt-5-mini", + sampling_args={"temperature": 1.0, "max_tokens": 8096}, + components_to_optimize=["system_prompt"], +) + +# Run optimization +result = optimize( + seed_candidate={"system_prompt": env.system_prompt}, + trainset=trainset, + valset=valset, + adapter=adapter, + max_metric_calls=500, + reflection_lm=reflection_function, +) + +# Access results +best_prompt = result.best_candidate["system_prompt"] +improvement = max(result.val_aggregate_scores) - result.val_aggregate_scores[0] +``` + +## Further Reading + +- [GEPA Paper](https://arxiv.org/abs/2507.19457) +- [GEPA Documentation](https://dspy.ai/api/optimizers/GEPA/overview/) +- [Creating Environments](environments.md) + diff --git a/environments/gsm8k/gsm8k.py b/environments/gsm8k/gsm8k.py index dd8ac79e0..f77f52f85 100644 --- a/environments/gsm8k/gsm8k.py +++ b/environments/gsm8k/gsm8k.py @@ -1,4 +1,5 @@ import verifiers as vf +from verifiers.types import RewardResult from verifiers.utils.data_utils import ( BOXED_SYSTEM_PROMPT, extract_boxed_answer, @@ -20,9 +21,25 @@ def load_environment( parser = vf.Parser(extract_fn=extract_boxed_answer) - def correct_answer_reward_func(parser, completion, answer, **kwargs): + def correct_answer_reward_func( + parser, completion, answer, **kwargs + ) -> RewardResult: response = parser.parse_answer(completion) or "" - return 1.0 if response == answer else 0.0 + is_correct = response == answer + + # Build feedback for GEPA optimization + if is_correct: + feedback = f"Correct! The model correctly computed {answer}." + else: + if not response: + feedback = ( + f"Incorrect. The model did not provide an answer in \\boxed{{}}. " + f"Expected: {answer}" + ) + else: + feedback = f"Incorrect. The model answered {response} but the correct answer is {answer}." + + return {"score": 1.0 if is_correct else 0.0, "feedback": feedback} rubric = vf.Rubric( parser=parser, diff --git a/environments/tool_test/tool_test.py b/environments/tool_test/tool_test.py index b3f958b1b..61ec8c1d8 100644 --- a/environments/tool_test/tool_test.py +++ b/environments/tool_test/tool_test.py @@ -65,7 +65,7 @@ def tool_D(x: bool) -> bool: def tool_call_reward_func(completion, info): # check if completion tool calls exactly matches info tool calls tool_calls = completion[-1].get("tool_calls", []) - called_tool_names = sorted([call.function.name for call in tool_calls]) + called_tool_names = sorted([call["function"]["name"] for call in tool_calls]) expected_tool_names = sorted(info["tool_names"]) if called_tool_names == expected_tool_names: return 1.0 diff --git a/environments/wordle/wordle.py b/environments/wordle/wordle.py index a1d7052c3..d14e63404 100644 --- a/environments/wordle/wordle.py +++ b/environments/wordle/wordle.py @@ -1,5 +1,6 @@ import verifiers as vf from verifiers.envs.textarena_env import TextArenaEnv +from verifiers.types import RewardResult ### prompt @@ -18,15 +19,26 @@ def wordle_feedback_fn(observation: str) -> str: ### reward functions -def check_answer_reward_func(parser, completion, answer, **kwargs) -> float: +def check_answer_reward_func(parser, completion, answer, **kwargs) -> RewardResult: + """Check if the guess is correct and provide feedback.""" guess = parser.parse_answer(completion) - return 1.0 if guess == "[" + answer + "]" else 0.0 + correct = guess == "[" + answer + "]" + + # Return dict with score and feedback (for GEPA optimization) + return { + "score": 1.0 if correct else 0.0, + "feedback": ( + f"{'✓ Correct!' if correct else '✗ Incorrect.'} " + f"Expected: {answer}, Got: {guess}" + ), + } def count_turns_reward_func(parser, completion, answer, **kwargs) -> float: num_turns = len([x for x in completion if x["role"] == "assistant"]) - is_correct = check_answer_reward_func(parser, completion, answer, **kwargs) - return is_correct / (num_turns + 1) + result = check_answer_reward_func(parser, completion, answer, **kwargs) + score = result["score"] if isinstance(result, dict) else result + return score / (num_turns + 1) def partial_credit_reward_func(parser, completion, answer, **kwargs) -> float: diff --git a/integrations/gepa/README.md b/integrations/gepa/README.md new file mode 100644 index 000000000..1d56c7401 --- /dev/null +++ b/integrations/gepa/README.md @@ -0,0 +1,253 @@ +# GEPA Integration + +GEPA (Genetic-Pareto) integration for Verifiers environments. + +## Overview + +This integration enables automatic prompt optimization using GEPA, a reflection-based optimization system that improves prompts by analyzing rubric feedback. GEPA works by: + +1. Running your environment with current prompts +2. Collecting rich feedback from rubric evaluations +3. Using an LLM to reflect on failures and propose improvements +4. Iteratively refining prompts until convergence + +## Installation + +```bash +uv sync --extra gepa +``` + +This installs the `gepa` package (>=0.0.22). + +## Quick Start + +Optimize a system prompt: + +```bash +vf-gepa wordle --budget medium +``` + +Optimize system prompt + tool descriptions: + +```bash +vf-gepa wiki-search --budget heavy --components system_prompt tool_descriptions +``` + +## Components + +### `adapter.py` + +The `GEPAAdapter` class bridges Verifiers environments to GEPA's optimization protocol: + +- **Component management**: Extracts and injects optimizable components (system prompts, tool descriptions) +- **Evaluation**: Runs rollouts and collects scores +- **Feedback generation**: Converts rubric feedback into reflection data +- **Tool optimization**: Splits tool descriptions into separate optimizable components + +### Key Methods + +```python +from verifiers.gepa import GEPAAdapter + +adapter = GEPAAdapter( + env=vf_env, + client=async_client, + model="gpt-5-mini", + sampling_args={"temperature": 1.0}, + components_to_optimize=["system_prompt"], +) + +# Build new environment with optimized components +new_env = adapter.build_program({"system_prompt": "Optimized prompt..."}) + +# Evaluate candidate prompts (sync wrapper) +results = adapter.evaluate(batch, candidate, capture_traces=True) + +# Evaluate candidate prompts (async - preferred in async contexts) +results = await adapter.evaluate_async(batch, candidate, capture_traces=True) + +# Generate reflection dataset for GEPA +reflective_data = adapter.make_reflective_dataset(candidate, results, components) +``` + +**Note**: Use `evaluate_async()` when you're already in an async context (e.g., notebooks, async services). The sync `evaluate()` method is a convenience wrapper that manages the event loop for you. + +## Rubric Feedback + +GEPA works best when reward functions return structured feedback: + +```python +def accuracy_with_feedback(parser, completion, answer, **kwargs): + guess = parser.parse_answer(completion) + correct = (guess == answer) + + return { + "score": 1.0 if correct else 0.0, + "feedback": f"Expected: {answer}, Got: {guess}. {explain_why(...)}" + } + +rubric = vf.Rubric(parser=parser) +rubric.add_reward_func(accuracy_with_feedback) +``` + +The `feedback` field provides context GEPA uses to understand failures and generate better prompts. Without it, GEPA only sees numeric scores. + +## Tool Description Optimization + +When optimizing `tool_descriptions`, the adapter: + +1. Extracts each tool's description from `env.oai_tools` +2. Creates separate components: `tool_0_description`, `tool_1_description`, etc. +3. Optimizes each independently through GEPA's reflection process +4. Reconstructs `oai_tools` with improved descriptions + +Example: + +```bash +vf-gepa my-env --components tool_descriptions --budget medium +``` + +## Architecture + +``` +┌─────────────────┐ +│ GEPA Engine │ +│ (reflection + │ +│ proposals) │ +└────────┬────────┘ + │ + ├─ evaluate() + ├─ make_reflective_dataset() + └─ build_program() + │ +┌────────▼────────┐ +│ GEPAAdapter │ +│ (integrations/ │ +│ gepa) │ +└────────┬────────┘ + │ + ├─ rollout() + ├─ score_rollout() + └─ get_feedback() + │ +┌────────▼────────┐ +│ Verifiers Env │ +│ (dataset + │ +│ rubric) │ +└─────────────────┘ +``` + +## Configuration + +### Budget Modes + +- **light** (~6 candidates): Fast iteration, ~5-10 min +- **medium** (~12 candidates): Balanced, ~15-30 min +- **heavy** (~18 candidates): Thorough, ~30-60 min + +### Dataset Sizes + +- Training: 50-100 examples (more = slower but potentially better) +- Validation: 20-30 examples (for measuring improvement) + +### Models + +- **Task model** (being optimized): `gpt-5-mini`, or custom +- **Reflection model** (generating proposals): `gpt-5-mini` (default) + +## Output + +GEPA saves results to `./gepa_results///`: + +- `_optimized.json` - Optimized components +- `_original.json` - Original components (for comparison) +- `_metrics.json` - Optimization metrics and history + +## Experiment Tracking + +GEPA supports integration with Weights & Biases (wandb) and MLflow for tracking optimization runs: + +```bash +# Track with wandb +vf-gepa my-env --budget medium \ + --use-wandb \ + --wandb-project gepa-experiments + +# Track with MLflow +vf-gepa my-env --budget medium \ + --use-mlflow \ + --mlflow-tracking-uri http://localhost:5000 + +# Use both simultaneously +vf-gepa my-env --budget medium \ + --use-wandb --wandb-project my-project \ + --use-mlflow --mlflow-tracking-uri http://localhost:5000 +``` + +These integrations automatically log: +- Validation and training scores +- Component-level improvements +- Optimization configuration +- Final optimized components + +For detailed documentation on experiment tracking options, see [GEPA Documentation](../../docs/source/gepa.md#experiment-tracking). + +## Implementation Notes + +### Packaging + +The GEPA adapter ships inside the `verifiers.gepa` package so it is available to `pip install verifiers` users. The `integrations/gepa` directory contains additional documentation and examples for reference. + +### Feedback Collection + +The base `Rubric` class automatically collects feedback when reward functions return dicts with `"feedback"` keys. The adapter checks for `rubric.get_feedback(state)` to retrieve combined feedback from all functions. + +### Error Handling + +The adapter validates: +- Environment has requested components (`system_prompt`, `oai_tools`) +- Tool descriptions can only be optimized if environment has tools +- Reflection datasets require `capture_traces=True` + +## CLI Reference + +Full documentation: [`docs/source/gepa.md`](../../docs/source/gepa.md) + +```bash +# Basic +vf-gepa ENV_ID --budget light|medium|heavy + +# Advanced +vf-gepa ENV_ID \ + --max-metric-calls 1000 \ + -n 100 --num-val 30 \ + --components system_prompt tool_descriptions \ + -m gpt-5-mini \ + --reflection-model gpt-5-mini \ + --rollouts-per-example 3 + +# Options + -n, --num-examples Training examples (default: 50) + --num-val Validation examples (default: 20) + --budget Budget preset: light/medium/heavy + --max-metric-calls Custom budget (total metric calls) + --components What to optimize (default: system_prompt) + -m, --model Task model (default: gpt-5-mini) + --reflection-model Reflection model (default: gpt-5-mini) + -T, --temperature Task model temperature (default: 1.0) + -t, --max-tokens Max tokens (default: 8096) + --track-stats Save detailed statistics + -v, --verbose Verbose logging + --use-wandb Enable wandb logging + --wandb-project Wandb project name + --wandb-entity Wandb entity/team name + --use-mlflow Enable MLflow logging + --mlflow-tracking-uri MLflow tracking server URI +``` + +## Links + +- [GEPA Documentation](../../docs/source/gepa.md) - Complete usage guide +- [GEPA Paper](https://arxiv.org/abs/2507.19457) - Original research +- [GEPA API Docs](https://dspy.ai/api/optimizers/GEPA/overview/) - DSPy reference +- [Creating Environments](../../docs/source/environments.md) - Build custom environments diff --git a/pyproject.toml b/pyproject.toml index 662a4b0d3..5ffb9f45b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -80,6 +80,9 @@ envs = [ "nltk", "textarena", ] +gepa = [ + "gepa>=0.0.22", +] all = [ "torch>=2.8.0", "transformers", @@ -95,6 +98,7 @@ all = [ "brave-search", "nltk", "textarena", + "gepa>=0.0.22", ] docs = [ "sphinx", @@ -110,6 +114,7 @@ flash-attn = { FLASH_ATTENTION_SKIP_CUDA_BUILD = "TRUE" } [project.scripts] vf-eval = "verifiers.scripts.eval:main" +vf-gepa = "verifiers.scripts.gepa:main" vf-init = "verifiers.scripts.init:main" vf-install = "verifiers.scripts.install:main" vf-setup = "verifiers.scripts.setup:main" diff --git a/tests/test_gepa.py b/tests/test_gepa.py new file mode 100644 index 000000000..099e434dd --- /dev/null +++ b/tests/test_gepa.py @@ -0,0 +1,727 @@ +""" +Tests for GEPA integration: Rubric feedback support and GEPAAdapter. +""" + +import pytest +from unittest.mock import AsyncMock, MagicMock, patch + +import verifiers as vf +from verifiers.types import RewardResult, State + + +def require_gepa_adapter(): + """Import GEPAAdapter or skip tests if the module is unavailable.""" + module = pytest.importorskip("verifiers.gepa.adapter") + return module.GEPAAdapter + + +class TestRubricFeedback: + """Tests for Rubric class feedback support.""" + + def test_rubric_with_dict_return(self): + """Test Rubric with reward function returning dict.""" + + def reward_with_feedback(completion, answer, **kwargs) -> RewardResult: + correct = completion == answer + return { + "score": 1.0 if correct else 0.0, + "feedback": f"Expected: {answer}, Got: {completion}", + } + + rubric = vf.Rubric() + rubric.add_reward_func(reward_with_feedback) + + assert len(rubric.funcs) == 1 + assert rubric.funcs[0] == reward_with_feedback + + def test_rubric_with_float_return(self): + """Test Rubric with reward function returning float (backward compat).""" + + def simple_reward(completion, answer, **kwargs) -> float: + return 1.0 if completion == answer else 0.0 + + rubric = vf.Rubric() + rubric.add_reward_func(simple_reward) + + assert len(rubric.funcs) == 1 + assert rubric.funcs[0] == simple_reward + + def test_rubric_mixed_functions(self): + """Test Rubric with mix of dict and float returning functions.""" + + def reward_with_feedback(completion, answer, **kwargs) -> RewardResult: + return { + "score": 1.0 if completion == answer else 0.0, + "feedback": "Detailed feedback", + } + + def simple_reward(completion, **kwargs) -> float: + return 0.5 + + rubric = vf.Rubric() + rubric.add_reward_func(reward_with_feedback, weight=1.0) + rubric.add_reward_func(simple_reward, weight=0.5) + + assert len(rubric.funcs) == 2 + + @pytest.mark.asyncio + async def test_get_feedback_with_feedbacks(self): + """Test get_feedback when state has feedbacks.""" + rubric = vf.Rubric() + + state = State(input={}) + state["reward"] = 0.75 + state["feedbacks"] = [ + "reward_1: Good job!", + "reward_2: Could be better", + ] + + feedback = rubric.get_feedback(state) + + assert "0.75" in feedback or "75" in feedback # Score percentage + assert "Good job!" in feedback + assert "Could be better" in feedback + + @pytest.mark.asyncio + async def test_get_feedback_without_feedbacks(self): + """Test get_feedback when state has no feedbacks (fallback).""" + rubric = vf.Rubric() + + state = State(input={}) + state["reward"] = 0.5 + + feedback = rubric.get_feedback(state) + + assert "0.5" in feedback or "50" in feedback + assert "no detailed feedback" in feedback.lower() + + +class TestGEPAAdapter: + """Tests for GEPAAdapter class.""" + + def test_gepa_adapter_initialization(self): + """Test GEPAAdapter initializes correctly.""" + GEPAAdapter = require_gepa_adapter() + + # Create mock environment + env = MagicMock(spec=vf.SingleTurnEnv) + env.system_prompt = "Test prompt" + env.dataset = None + env.eval_dataset = None + env.parser = vf.Parser() + env.rubric = vf.Rubric() + env.sampling_args = {} + env.message_type = "chat" + env.max_workers = 512 + + client = AsyncMock() + + adapter = GEPAAdapter( + env=env, + client=client, + model="gpt-4o-mini", + sampling_args={"temperature": 1.0}, + components_to_optimize=["system_prompt"], + ) + + assert adapter.base_env == env + assert adapter.model == "gpt-4o-mini" + assert "system_prompt" in adapter.components_to_optimize + + def test_gepa_adapter_tool_descriptions_validation(self): + """Test GEPAAdapter validates tool_descriptions component.""" + GEPAAdapter = require_gepa_adapter() + + # Create mock environment WITHOUT tools + env = MagicMock(spec=vf.SingleTurnEnv) + env.system_prompt = "Test prompt" + env.oai_tools = None + + client = AsyncMock() + + # Should raise error when trying to optimize tool_descriptions without tools + with pytest.raises(ValueError, match="no tools"): + GEPAAdapter( + env=env, + client=client, + model="gpt-4o-mini", + sampling_args={}, + components_to_optimize=["tool_descriptions"], + ) + + def test_gepa_adapter_build_program(self): + """Test GEPAAdapter.build_program creates new environment with updated components. + + Important: datasets are shared (not copied) for efficiency via shallow copy. + The adapter provides inputs directly via _build_rollout_inputs. + """ + GEPAAdapter = require_gepa_adapter() + + # Create real environment + dataset = vf.load_example_dataset(n=5) + env = vf.SingleTurnEnv( + dataset=dataset, + system_prompt="Original prompt", + rubric=vf.Rubric(), + ) + + client = AsyncMock() + + adapter = GEPAAdapter( + env=env, + client=client, + model="gpt-4o-mini", + sampling_args={}, + components_to_optimize=["system_prompt"], + ) + + # Build new program with updated system_prompt + candidate = {"system_prompt": "Optimized prompt"} + new_env = adapter.build_program(candidate) + + # Verify component was updated + assert new_env.system_prompt == "Optimized prompt" + assert new_env.system_prompt != env.system_prompt + + # Verify dataset is shared (shallow copy - most efficient) + assert new_env.dataset is not None + assert new_env.dataset is env.dataset # Same reference (shared) + + # Verify rubric is also shared (preserves feedback functions) + assert new_env.rubric is env.rubric + + def test_gepa_adapter_build_program_multiturn_env(self): + """Test build_program with MultiTurnEnv (uses **kwargs).""" + GEPAAdapter = require_gepa_adapter() + + # Create a simple MultiTurnEnv + dataset = vf.load_example_dataset(n=5) + + class TestMultiTurnEnv(vf.MultiTurnEnv): + async def env_response(self, messages, state, **kwargs): + return [{"role": "user", "content": "test"}] + + env = TestMultiTurnEnv( + dataset=dataset, + system_prompt="Original prompt", + rubric=vf.Rubric(), + max_turns=3, + ) + + client = AsyncMock() + adapter = GEPAAdapter( + env=env, + client=client, + model="gpt-4o-mini", + sampling_args={}, + components_to_optimize=["system_prompt"], + ) + + candidate = {"system_prompt": "Optimized prompt"} + new_env = adapter.build_program(candidate) + + # Verify component was updated + assert new_env.system_prompt == "Optimized prompt" + # Verify dataset is shared (shallow copy) + assert new_env.dataset is not None + assert new_env.dataset is env.dataset + + def test_gepa_adapter_build_program_tool_env(self): + """Test build_program with ToolEnv.""" + GEPAAdapter = require_gepa_adapter() + + def example_tool(x: int) -> int: + return x * 2 + + dataset = vf.load_example_dataset(n=5) + + class TestToolEnv(vf.ToolEnv): + def __init__(self, **kwargs): + super().__init__(tools=[example_tool], **kwargs) + + env = TestToolEnv( + dataset=dataset, + system_prompt="Use the tool", + rubric=vf.Rubric(), + ) + + client = AsyncMock() + adapter = GEPAAdapter( + env=env, + client=client, + model="gpt-4o-mini", + sampling_args={}, + components_to_optimize=["system_prompt"], + ) + + candidate = {"system_prompt": "Use the tool wisely"} + new_env = adapter.build_program(candidate) + + # Verify component was updated + assert new_env.system_prompt == "Use the tool wisely" + # Verify dataset is shared (shallow copy) + assert new_env.dataset is not None + assert new_env.dataset is env.dataset + assert new_env.oai_tools is not None # Tools preserved + + def test_gepa_adapter_build_program_stateful_tool_env(self): + """Test build_program with StatefulToolEnv.""" + GEPAAdapter = require_gepa_adapter() + + def stateful_tool(x: int, state_val: int) -> int: + return x + state_val + + dataset = vf.load_example_dataset(n=5) + + class TestStatefulToolEnv(vf.StatefulToolEnv): + def __init__(self, **kwargs): + super().__init__(tools=[stateful_tool], **kwargs) + + def update_tool_args(self, tool_name, tool_args, messages, state, **kwargs): + return {**tool_args, "state_val": 10} + + env = TestStatefulToolEnv( + dataset=dataset, + system_prompt="Stateful tool env", + rubric=vf.Rubric(), + ) + + client = AsyncMock() + adapter = GEPAAdapter( + env=env, + client=client, + model="gpt-4o-mini", + sampling_args={}, + components_to_optimize=["system_prompt"], + ) + + candidate = {"system_prompt": "Updated stateful prompt"} + new_env = adapter.build_program(candidate) + + # Verify component was updated + assert new_env.system_prompt == "Updated stateful prompt" + # Verify dataset is shared (shallow copy) + assert new_env.dataset is not None + assert new_env.dataset is env.dataset + + def test_gepa_adapter_build_program_internal_dataset_env(self): + """Test build_program with env that creates dataset internally.""" + GEPAAdapter = require_gepa_adapter() + + class InternalDatasetEnv(vf.SingleTurnEnv): + """Mock env that creates dataset internally like TextArenaEnv.""" + + def __init__( + self, + num_train_examples: int = 10, + num_eval_examples: int = 0, + system_prompt: str | None = None, + **kwargs, + ): + # Create dataset internally (like TextArenaEnv does) + from datasets import Dataset + + rows = [ + {"question": f"q{i}", "answer": f"a{i}"} + for i in range(num_train_examples) + ] + dataset = Dataset.from_list(rows) + + self.num_train_examples = num_train_examples + self.num_eval_examples = num_eval_examples + + super().__init__( + dataset=dataset, + system_prompt=system_prompt, + rubric=vf.Rubric(), + **kwargs, + ) + + env = InternalDatasetEnv( + num_train_examples=100, + system_prompt="Internal dataset env", + ) + + client = AsyncMock() + adapter = GEPAAdapter( + env=env, + client=client, + model="gpt-4o-mini", + sampling_args={}, + components_to_optimize=["system_prompt"], + ) + + candidate = {"system_prompt": "Updated internal prompt"} + new_env = adapter.build_program(candidate) + + # Verify component was updated + assert new_env.system_prompt == "Updated internal prompt" + # Verify dataset is shared (shallow copy preserves all attributes) + assert new_env.dataset is not None + assert new_env.dataset is env.dataset # Shared reference + assert len(new_env.dataset) == 100 # Original dataset preserved + assert new_env.num_train_examples == 100 + + def test_gepa_adapter_extract_seed_candidate(self): + """Test extracting seed candidate from environment.""" + dataset = vf.load_example_dataset(n=5) + env = vf.SingleTurnEnv( + dataset=dataset, + system_prompt="Test prompt", + rubric=vf.Rubric(), + ) + + # Verify we can extract the system_prompt + assert hasattr(env, "system_prompt") + assert env.system_prompt == "Test prompt" + + def test_gepa_adapter_evaluate_uses_generate(self): + """Integration test ensuring evaluate() calls env.generate correctly.""" + GEPAAdapter = require_gepa_adapter() + + base_env = MagicMock(spec=vf.Environment) + base_env.dataset = None + base_env.eval_dataset = None + base_env.parser = vf.Parser() + base_env.rubric = vf.Rubric() + base_env.sampling_args = {} + base_env.message_type = "chat" + base_env.max_workers = 1 + base_env.system_prompt = "Base system" + base_env.few_shot = None + base_env.env_id = "stub-env" + base_env.oai_tools = [] + + adapter = GEPAAdapter( + env=base_env, + client=AsyncMock(), + model="stub-model", + sampling_args={"temperature": 0.1}, + components_to_optimize=["system_prompt"], + num_rollouts_per_example=1, + ) + + class StubEnv: + def __init__(self): + self.dataset = None + self.eval_dataset = None + self.parser = base_env.parser + self.rubric = base_env.rubric + self.sampling_args = {} + self.message_type = "chat" + self.system_prompt = "Stub system" + self.few_shot = None + self.env_id = "stub-env" + self.max_workers = 1 + self.oai_tools = [] + self.last_inputs = None + + async def generate( + self, + inputs, + client, + model, + sampling_args=None, + max_concurrent=-1, + use_tqdm=True, + ): + self.last_inputs = inputs + return { + "completion": [[{"role": "assistant", "content": "42"}]], + "state": [ + { + "prompt": [ + {"role": "system", "content": "Stub system"}, + {"role": "user", "content": "What is 6*7?"}, + ], + "completion": [{"role": "assistant", "content": "42"}], + "reward": 0.9, + } + ], + "reward": [0.9], + } + + stub_env = StubEnv() + batch = [ + { + "question": "What is 6*7?", + "answer": "42", + "task": "math", + "info": {}, + } + ] + + with patch.object(adapter, "build_program", return_value=stub_env): + result = adapter.evaluate( + batch, candidate={"system_prompt": "Stub system"}, capture_traces=True + ) + + assert stub_env.last_inputs is not None + assert stub_env.last_inputs[0]["task"] == "math" + # Prompt should include system + user messages + assert isinstance(stub_env.last_inputs[0]["prompt"], list) + assert stub_env.last_inputs[0]["prompt"][-1]["content"] == "What is 6*7?" + + assert result.scores == [0.9] + assert result.outputs == [[{"role": "assistant", "content": "42"}]] + assert result.trajectories is not None + assert result.trajectories[0]["score"] == 0.9 + + def test_gepa_adapter_tool_metadata_extraction(self): + """Test that GEPAAdapter extracts tool metadata for tool_descriptions.""" + GEPAAdapter = require_gepa_adapter() + + def search_tool(query: str, max_results: int = 10) -> str: + """Search for information about a query. + + Args: + query: The search query string + max_results: Maximum number of results to return + """ + return f"Results for: {query}" + + dataset = vf.load_example_dataset(n=5) + env = vf.ToolEnv( + dataset=dataset, + tools=[search_tool], + system_prompt="Use the search tool", + rubric=vf.Rubric(), + ) + + client = AsyncMock() + adapter = GEPAAdapter( + env=env, + client=client, + model="gpt-4o-mini", + sampling_args={}, + components_to_optimize=["tool_descriptions"], + ) + + # Verify tool metadata was extracted + assert "tool_0_description" in adapter._tool_metadata + assert adapter._tool_metadata["tool_0_description"]["name"] == "search_tool" + assert "parameters" in adapter._tool_metadata["tool_0_description"] + + # Verify parameters include the function arguments + params = adapter._tool_metadata["tool_0_description"]["parameters"] + assert "properties" in params + assert "query" in params["properties"] + assert "max_results" in params["properties"] + + def test_gepa_adapter_propose_new_texts_tool_descriptions(self): + """Test that propose_new_texts uses tool-specific template for tool descriptions.""" + GEPAAdapter = require_gepa_adapter() + + def calculate(x: int, y: int) -> int: + """Add two numbers together.""" + return x + y + + dataset = vf.load_example_dataset(n=5) + env = vf.ToolEnv( + dataset=dataset, + tools=[calculate], + system_prompt="Use the calculator", + rubric=vf.Rubric(), + ) + + client = AsyncMock() + adapter = GEPAAdapter( + env=env, + client=client, + model="gpt-4o-mini", + sampling_args={}, + components_to_optimize=["tool_descriptions"], + ) + + # Mock reflection_lm + reflection_output = "```\nImproved tool description that adds two numbers with better clarity.\n```" + adapter.reflection_lm = MagicMock(return_value=reflection_output) + + # Create mock candidate and reflective dataset + candidate = {"tool_0_description": "Add two numbers together."} + reflective_dataset = { + "tool_0_description": [ + { + "Inputs": {"Task": "Calculate 2 + 3"}, + "Generated Outputs": "Tool Call: calculate(x=2, y=3)", + "Feedback": "Correct usage", + } + ] + } + + # Call propose_new_texts + new_texts = adapter.propose_new_texts( + candidate=candidate, + reflective_dataset=reflective_dataset, + components_to_update=["tool_0_description"], + ) + + # Verify the reflection_lm was called + assert adapter.reflection_lm.called + called_prompt = adapter.reflection_lm.call_args[0][0] + + # Verify tool name is in the prompt + assert "calculate" in called_prompt + + # Verify tool parameters are in the prompt (JSON schema) + assert "parameters" in called_prompt.lower() + assert '"x"' in called_prompt or "'x'" in called_prompt + assert '"y"' in called_prompt or "'y'" in called_prompt + + # Verify current description is in the prompt + assert "Add two numbers together" in called_prompt + + # Verify new text was extracted correctly + assert "tool_0_description" in new_texts + assert "Improved tool description" in new_texts["tool_0_description"] + + def test_gepa_adapter_propose_new_texts_system_prompt(self): + """Test that propose_new_texts uses default GEPA template for system_prompt.""" + GEPAAdapter = require_gepa_adapter() + + dataset = vf.load_example_dataset(n=5) + env = vf.SingleTurnEnv( + dataset=dataset, + system_prompt="Original system prompt", + rubric=vf.Rubric(), + ) + + client = AsyncMock() + adapter = GEPAAdapter( + env=env, + client=client, + model="gpt-4o-mini", + sampling_args={}, + components_to_optimize=["system_prompt"], + ) + + # Mock reflection_lm + reflection_output = "```\nImproved system prompt with better instructions.\n```" + adapter.reflection_lm = MagicMock(return_value=reflection_output) + + # Create mock candidate and reflective dataset + candidate = {"system_prompt": "Original system prompt"} + reflective_dataset = { + "system_prompt": [ + { + "Inputs": {"Task": "Solve this problem"}, + "Generated Outputs": "Here's the solution", + "Feedback": "Good response", + } + ] + } + + # Call propose_new_texts + new_texts = adapter.propose_new_texts( + candidate=candidate, + reflective_dataset=reflective_dataset, + components_to_update=["system_prompt"], + ) + + # Verify the reflection_lm was called + assert adapter.reflection_lm.called + called_prompt = adapter.reflection_lm.call_args[0][0] + + # Verify it uses the default GEPA template (should NOT contain tool-specific language) + assert "TOOL NAME" not in called_prompt + assert "TOOL PARAMETERS" not in called_prompt + + # Should contain the default GEPA language about "assistant" and "instructions" + assert ( + "assistant" in called_prompt.lower() + or "instruction" in called_prompt.lower() + ) + + # Verify new text was extracted correctly + assert "system_prompt" in new_texts + + +class TestRubricDictSupport: + """Tests for base Rubric class dict return support.""" + + @pytest.mark.asyncio + async def test_rubric_score_rollout_with_dict_return(self): + """Test that score_rollout handles dict returns from reward functions.""" + + def reward_with_feedback(completion, answer, **kwargs) -> RewardResult: + return { + "score": 0.8, + "feedback": "Good answer", + } + + rubric = vf.Rubric() + rubric.add_reward_func(reward_with_feedback) + + # Create minimal state + state = State( + input={ + "prompt": [{"role": "user", "content": "test"}], + "example_id": 0, + "task": "test", + "answer": "correct", + } + ) + state["prompt"] = [{"role": "user", "content": "test"}] + state["completion"] = [{"role": "assistant", "content": "response"}] + state["task"] = "test" + state["timing"] = {"scoring_ms": 0.0, "total_ms": 0.0} + + # Mock score_sem + from contextlib import asynccontextmanager + + @asynccontextmanager + async def mock_sem(): + yield + + await rubric.score_rollout(state, score_sem=mock_sem()) + + # Check that reward was extracted correctly + assert state["reward"] == 0.8 + assert "reward_with_feedback" in state["metrics"] + assert state["metrics"]["reward_with_feedback"] == 0.8 + + # Check that feedback was stored + assert "feedbacks" in state + assert len(state["feedbacks"]) == 1 + assert "Good answer" in state["feedbacks"][0] + + @pytest.mark.asyncio + async def test_rubric_score_rollout_with_float_return(self): + """Test that score_rollout still handles float returns (backward compat).""" + + def simple_reward(completion, answer, **kwargs) -> float: + return 0.5 + + rubric = vf.Rubric() + rubric.add_reward_func(simple_reward) + + # Create minimal state + state = State( + input={ + "prompt": [{"role": "user", "content": "test"}], + "example_id": 0, + "task": "test", + "answer": "correct", + } + ) + state["prompt"] = [{"role": "user", "content": "test"}] + state["completion"] = [{"role": "assistant", "content": "response"}] + state["task"] = "test" + state["timing"] = {"scoring_ms": 0.0, "total_ms": 0.0} + + from contextlib import asynccontextmanager + + @asynccontextmanager + async def mock_sem(): + yield + + await rubric.score_rollout(state, score_sem=mock_sem()) + + # Check that reward was extracted correctly + assert state["reward"] == 0.5 + assert "simple_reward" in state["metrics"] + assert state["metrics"]["simple_reward"] == 0.5 + + # Feedbacks should be empty for float returns + assert "feedbacks" in state + assert len(state["feedbacks"]) == 0 diff --git a/tests/test_gepa_cli.py b/tests/test_gepa_cli.py new file mode 100644 index 000000000..7fb802143 --- /dev/null +++ b/tests/test_gepa_cli.py @@ -0,0 +1,391 @@ +"""Tests for vf-gepa CLI argument parsing and configuration.""" + +import argparse +import os +from types import SimpleNamespace +from unittest.mock import MagicMock + +import pytest + +import verifiers as vf + + +def require_gepa_script(): + """Import gepa script or skip tests if module is unavailable.""" + return pytest.importorskip("verifiers.scripts.gepa") + + +def _make_mock_env(): + """Create a mock environment for testing.""" + env = MagicMock(spec=vf.Environment) + env.system_prompt = "Test system prompt" + env.eval_dataset = None + env.env_id = "test-env" + env.oai_tools = None + + # Mock dataset methods - return enough items for all tests + # Most tests use num_examples=10 and num_val=5, so we need at least 15 items + mock_dataset = MagicMock() + mock_dataset.to_list.return_value = [ + {"question": f"q{i}", "answer": f"a{i}", "task": "test", "info": {}} + for i in range(50) # Plenty of items for all tests + ] + env.get_dataset.return_value = mock_dataset + env.get_eval_dataset.return_value = mock_dataset + + return env + + +def _run_cli(monkeypatch, overrides, custom_env=None): + """ + Helper to run vf-gepa CLI with mocked dependencies. + + Args: + monkeypatch: pytest monkeypatch fixture + overrides: dict of CLI args to override + custom_env: optional custom mock environment (default: _make_mock_env()) + + Returns: + dict containing captured GEPAConfig passed to run_gepa_optimization + """ + gepa_script = require_gepa_script() + + base_args = { + "env_id": "test-env", + "env_args": "{}", + "env_dir_path": "./environments", + "num_examples": 10, + "num_val": 5, + "endpoints_path": "./configs/endpoints.py", + "model": "gpt-4o-mini", + "api_key_var": "OPENAI_API_KEY", + "api_base_url": "https://api.openai.com/v1", + "headers": None, + "temperature": 1.0, + "max_tokens": None, + "sampling_args": None, # Will be parsed by json.loads if not None + "rollouts_per_example": 1, + "max_concurrent": 32, + "budget": "light", # Required - mutually exclusive with max_metric_calls + "max_metric_calls": None, + "components": ["system_prompt"], + "reflection_model": "gpt-4o", + "reflection_temperature": 1.0, + "reflection_base_url": None, + "reflection_api_key_var": "OPENAI_API_KEY", + "reflection_max_tokens": 8000, + "reflection_minibatch_size": 35, + "save_results": False, + "save_every": -1, + "track_stats": False, + "verbose": False, + "seed": 42, + "use_wandb": False, + "wandb_project": None, + "wandb_entity": None, + "wandb_name": None, + "wandb_api_key_var": "WANDB_API_KEY", + "wandb_init_kwargs": None, + "use_mlflow": False, + "mlflow_tracking_uri": None, + "mlflow_experiment_name": None, + } + base_args.update(overrides) + args_namespace = SimpleNamespace(**base_args) + + captured = {} + + # Mock argparse + monkeypatch.setattr( + argparse.ArgumentParser, + "parse_args", + lambda self: args_namespace, + ) + + # Mock setup_logging + monkeypatch.setattr(vf, "setup_logging", lambda *_, **__: None) + + # Mock load_endpoints + from verifiers.utils import eval_utils + + monkeypatch.setattr(eval_utils, "load_endpoints", lambda *_: {}) + + # Mock get_env_gepa_defaults + from verifiers import gepa as gepa_utils + + monkeypatch.setattr(gepa_utils, "get_env_gepa_defaults", lambda *_: {}) + + # Mock load_environment + mock_env = custom_env if custom_env is not None else _make_mock_env() + monkeypatch.setattr(vf, "load_environment", lambda **kwargs: mock_env) + + # Mock os.getenv for reflection API key + def mock_getenv(key, default=None): + if key in ("OPENAI_API_KEY", "WANDB_API_KEY"): + return "fake-api-key" + return default + + monkeypatch.setattr(os, "getenv", mock_getenv) + + # Mock prepare_gepa_dataset to return non-empty datasets + def mock_prepare_gepa_dataset(dataset): + if dataset is None: + raise ValueError("dataset cannot be None") + # Return hardcoded examples instead of relying on the mock dataset + # This ensures we always have data for the tests + return [ + { + "question": f"Question {i}", + "answer": f"Answer {i}", + "task": "test", + "info": {}, + } + for i in range(10) + ] + + monkeypatch.setattr( + gepa_utils, + "prepare_gepa_dataset", + mock_prepare_gepa_dataset, + ) + + # Mock run_gepa_optimization to capture config + # Must patch in the gepa script's namespace since it's imported at module level + async def fake_run_gepa_optimization(config): + captured["config"] = config + # Return immediately without running optimization + return None + + monkeypatch.setattr( + gepa_script, + "run_gepa_optimization", + fake_run_gepa_optimization, + ) + + # Run the CLI + gepa_script.main() + + return captured + + +def test_cli_sampling_args_precedence_over_flags(monkeypatch): + """Test that --sampling-args takes precedence over --temperature and --max-tokens.""" + captured = _run_cli( + monkeypatch, + { + "sampling_args": {"temperature": 0.5, "max_tokens": 100}, + "temperature": 0.9, + "max_tokens": 500, + }, + ) + + config = captured["config"] + assert config.sampling_args["temperature"] == 0.5 + assert config.sampling_args["max_tokens"] == 100 + + +def test_cli_sampling_args_fill_from_flags_when_missing(monkeypatch): + """Test that flags fill in when --sampling-args doesn't specify them.""" + captured = _run_cli( + monkeypatch, + { + "sampling_args": {"enable_thinking": True}, + "temperature": 0.7, + "max_tokens": 200, + }, + ) + + config = captured["config"] + assert config.sampling_args["temperature"] == 0.7 + assert config.sampling_args["max_tokens"] == 200 + assert config.sampling_args["enable_thinking"] is True + + +def test_cli_budget_light_conversion(monkeypatch): + """Test that --budget light converts to expected max_metric_calls.""" + captured = _run_cli( + monkeypatch, + { + "budget": "light", + "max_metric_calls": None, + "num_examples": 10, + "num_val": 5, + }, + ) + + config = captured["config"] + # Light budget should result in a positive number of metric calls + assert config.max_metric_calls > 0 + # Light budget (~6 candidates) should be in a reasonable range + assert config.max_metric_calls >= 300 # At least 300 + assert config.max_metric_calls <= 500 # At most 500 + + +def test_cli_budget_medium_conversion(monkeypatch): + """Test that --budget medium converts correctly.""" + captured = _run_cli( + monkeypatch, + { + "budget": "medium", + "max_metric_calls": None, + "num_examples": 10, + "num_val": 5, + }, + ) + + config = captured["config"] + # Medium budget should result in more calls than light (~12 candidates) + assert config.max_metric_calls >= 500 # At least 500 + assert config.max_metric_calls <= 1000 # At most 1000 + + +def test_cli_budget_heavy_conversion(monkeypatch): + """Test that --budget heavy converts correctly.""" + captured = _run_cli( + monkeypatch, + { + "budget": "heavy", + "max_metric_calls": None, + "num_examples": 10, + "num_val": 5, + }, + ) + + config = captured["config"] + # Heavy budget should result in the most calls + assert config.max_metric_calls > 200 + + +def test_cli_max_metric_calls_direct(monkeypatch): + """Test that --max-metric-calls is used directly when provided.""" + captured = _run_cli( + monkeypatch, + { + "budget": None, + "max_metric_calls": 1234, + }, + ) + + config = captured["config"] + assert config.max_metric_calls == 1234 + + +def test_cli_seed_candidate_extraction(monkeypatch): + """Test that seed_candidate is extracted from env's system_prompt.""" + captured = _run_cli( + monkeypatch, + { + "components": ["system_prompt"], + }, + ) + + config = captured["config"] + assert "system_prompt" in config.seed_candidate + assert config.seed_candidate["system_prompt"] == "Test system prompt" + assert config.components_to_optimize == ["system_prompt"] + + +def test_cli_defaults_fallback(monkeypatch): + """Test that CLI args are used when provided (not overridden by defaults).""" + captured = _run_cli( + monkeypatch, + { + "num_examples": 25, + "num_val": 10, + "rollouts_per_example": 3, + }, + ) + + config = captured["config"] + assert config.num_examples == 25 + assert config.num_val == 10 + assert config.rollouts_per_example == 3 + + +def test_cli_reflection_model_config(monkeypatch): + """Test that reflection model configuration is captured correctly.""" + captured = _run_cli( + monkeypatch, + { + "reflection_model": "gpt-4o", + "reflection_temperature": 0.8, + "reflection_max_tokens": 4000, + "reflection_minibatch_size": 20, + }, + ) + + config = captured["config"] + assert config.reflection_model == "gpt-4o" + assert config.reflection_temperature == 0.8 + assert config.reflection_max_tokens == 4000 + assert config.reflection_minibatch_size == 20 + + +def test_cli_experiment_tracking_config(monkeypatch): + """Test that experiment tracking (wandb/mlflow) configuration is captured.""" + captured = _run_cli( + monkeypatch, + { + "use_wandb": True, + "wandb_project": "test-project", + "wandb_entity": "test-entity", + "wandb_name": "test-run", + "use_mlflow": True, + "mlflow_tracking_uri": "http://localhost:5000", + "mlflow_experiment_name": "test-experiment", + }, + ) + + config = captured["config"] + assert config.use_wandb is True + assert config.wandb_project == "test-project" + assert config.wandb_entity == "test-entity" + assert config.wandb_name == "test-run" + assert config.use_mlflow is True + assert config.mlflow_tracking_uri == "http://localhost:5000" + assert config.mlflow_experiment_name == "test-experiment" + + +def test_cli_env_args_parsing(monkeypatch): + """Test that --env-args is a string that gets parsed to dict correctly.""" + # Note: env_args stays as a string in the CLI args, then gets parsed by json.loads + # But since we're passing through SimpleNamespace, we just verify the config receives it + captured = _run_cli( + monkeypatch, + { + "env_args": '{"custom_arg": "value", "num": 42}', + }, + ) + + config = captured["config"] + assert config.env_args["custom_arg"] == "value" + assert config.env_args["num"] == 42 + + +def test_cli_components_multiple(monkeypatch): + """Test that multiple components can be specified.""" + # Create a mock env with oai_tools + env_with_tools = _make_mock_env() + env_with_tools.oai_tools = [ + { + "function": { + "name": "test_tool", + "description": "A test tool", + "parameters": {}, + } + } + ] + + captured = _run_cli( + monkeypatch, + { + "components": ["system_prompt", "tool_descriptions"], + }, + custom_env=env_with_tools, + ) + + config = captured["config"] + assert config.components_to_optimize == ["system_prompt", "tool_descriptions"] + # Should have both system_prompt and tool descriptions in seed_candidate + assert "system_prompt" in config.seed_candidate + assert "tool_0_description" in config.seed_candidate diff --git a/tests/test_rubric.py b/tests/test_rubric.py index a58b3c064..5d4d8f806 100644 --- a/tests/test_rubric.py +++ b/tests/test_rubric.py @@ -218,6 +218,36 @@ def list_func(completion, **kwargs): assert state["metrics"]["list_func"] == 2.0 # Length of completion list assert state["reward"] == 2.0 + @pytest.mark.asyncio + async def test_reward_result_missing_score_raises(self): + """RewardResult dicts must include a score key.""" + + def bad_reward(completion, **kwargs): + return {"feedback": "oops"} + + rubric = Rubric(funcs=[bad_reward]) + + state = State( + input=RolloutInput( + prompt="prompt", + answer="answer", + task="task", + example_id=0, + ) + ) + state["completion"] = "prediction" + state["trajectory"] = [] + state["timing"] = { + "generation_ms": 0.0, + "scoring_ms": 0.0, + "total_ms": 0.0, + "start_time": 0.0, + } + score_sem = NullAsyncContext() + + with pytest.raises(ValueError, match="missing required 'score'"): + await rubric.score_rollout(state, score_sem) + @pytest.mark.asyncio async def test_score_rollouts_multiple(self): """Test scoring multiple rollouts using score_group.""" @@ -276,6 +306,83 @@ def length_func(completion, **kwargs): assert states[1]["metrics"]["length_func"] == 7.0 assert states[2]["metrics"]["length_func"] == 5.0 + @pytest.mark.asyncio + async def test_score_group_handles_reward_result_dicts(self): + """Ensure score_group handles RewardResult outputs from individual funcs.""" + + def reward_with_feedback(completion, **kwargs): + return {"score": 0.25, "feedback": "ok"} + + rubric = Rubric(funcs=[reward_with_feedback], weights=[2.0]) + + state = State( + input=RolloutInput( + prompt="prompt", + answer="answer", + task="task", + example_id=0, + ) + ) + state["completion"] = "prediction" + state["trajectory"] = [] + state["timing"] = { + "generation_ms": 0.0, + "scoring_ms": 0.0, + "total_ms": 0.0, + "start_time": 0.0, + } + score_sem = NullAsyncContext() + + await rubric.score_group([state], score_sem) + + assert state["metrics"]["reward_with_feedback"] == pytest.approx(0.25) + assert state["reward"] == pytest.approx(0.5) + + @pytest.mark.asyncio + async def test_group_reward_func_handles_dict_scores(self): + """Ensure group-level reward functions can emit RewardResult dicts.""" + + def group_reward(states, **kwargs): + return [{"score": 0.1}, {"score": 0.2}] + + rubric = Rubric(funcs=[group_reward], weights=[1.0]) + + states = [ + State( + input=RolloutInput( + prompt="p1", + answer="a1", + task="t1", + example_id=0, + ) + ), + State( + input=RolloutInput( + prompt="p2", + answer="a2", + task="t2", + example_id=1, + ) + ), + ] + for state in states: + state["completion"] = "resp" + state["trajectory"] = [] + state["timing"] = { + "generation_ms": 0.0, + "scoring_ms": 0.0, + "total_ms": 0.0, + "start_time": 0.0, + } + + score_sem = NullAsyncContext() + await rubric.score_group(states, score_sem) + + assert states[0]["metrics"]["group_reward"] == pytest.approx(0.1) + assert states[1]["metrics"]["group_reward"] == pytest.approx(0.2) + assert states[0]["reward"] == pytest.approx(0.1) + assert states[1]["reward"] == pytest.approx(0.2) + @pytest.mark.asyncio async def test_score_rollouts_with_apply_weights(self): """Test scoring rollouts - weights always applied via score_group.""" diff --git a/verifiers/envs/environment.py b/verifiers/envs/environment.py index db1f80399..f43c90d2a 100644 --- a/verifiers/envs/environment.py +++ b/verifiers/envs/environment.py @@ -431,6 +431,7 @@ async def init_state( total_ms=0.0, start_time=time.time(), ) + state["feedbacks"] = [] return state @abstractmethod diff --git a/verifiers/gepa/__init__.py b/verifiers/gepa/__init__.py new file mode 100644 index 000000000..aa2b4d99b --- /dev/null +++ b/verifiers/gepa/__init__.py @@ -0,0 +1,45 @@ +""" +GEPA (Genetic-Pareto) integration for Verifiers. + +This module provides adapter, utilities, and templates for optimizing +Verifiers environments using the GEPA reflection-based optimization algorithm. + +Main components: +- GEPAAdapter: Bridges Verifiers environments with GEPA optimization +- run_gepa_optimization: High-level function to run GEPA on an environment +- TOOL_DESCRIPTION_PROMPT_TEMPLATE: Template for tool description optimization +""" + +from .adapter import GEPAAdapter +from .templates import TOOL_DESCRIPTION_PROMPT_TEMPLATE +from .utils import ( + auto_budget_to_metric_calls, + call_reflection_model, + ensure_env_dir_on_path, + get_env_gepa_defaults, + prepare_gepa_dataset, + print_optimization_results, + run_gepa_optimization, + save_candidate_rollouts, + save_optimized_components, + save_optimization_metrics, +) + +__all__ = [ + # Core adapter + "GEPAAdapter", + # Templates + "TOOL_DESCRIPTION_PROMPT_TEMPLATE", + # Main optimization function + "run_gepa_optimization", + # Utility functions + "auto_budget_to_metric_calls", + "call_reflection_model", + "ensure_env_dir_on_path", + "get_env_gepa_defaults", + "prepare_gepa_dataset", + "print_optimization_results", + "save_candidate_rollouts", + "save_optimized_components", + "save_optimization_metrics", +] diff --git a/verifiers/gepa/adapter.py b/verifiers/gepa/adapter.py new file mode 100644 index 000000000..62e012a02 --- /dev/null +++ b/verifiers/gepa/adapter.py @@ -0,0 +1,715 @@ +""" +GEPAAdapter: Bridge between Verifiers Environment API and GEPA optimization. + +This adapter implements the GEPAAdapter protocol from the gepa package, +enabling automatic optimization of environment text components (system_prompt, +tool descriptions, etc.) through reflection-based evolution. +""" + +import asyncio +import json +import logging +from collections.abc import Mapping, Sequence +from concurrent.futures import ThreadPoolExecutor +from copy import deepcopy +from typing import Any + +from statistics import fmean +from gepa import EvaluationBatch, GEPAAdapter as BaseGEPAAdapter +from openai import AsyncOpenAI + +import verifiers as vf +from verifiers.gepa.templates import TOOL_DESCRIPTION_PROMPT_TEMPLATE +from verifiers.types import Messages, RolloutInput + +logger = logging.getLogger(__name__) + + +class GEPAAdapter(BaseGEPAAdapter): + """ + Adapter bridging Verifiers Environment API to GEPA optimization. + + Key responsibilities: + - Component management: Extract/inject text components (system_prompt, tool descriptions) + - Evaluation: Run rollouts and collect scores + - Feedback generation: Convert rubric scores + state to GEPA feedback + - Dataset conversion: HF Dataset → GEPA format + + Args: + env: Base Verifiers Environment to optimize + client: AsyncOpenAI client for model inference + model: Model name to optimize + sampling_args: Sampling configuration (temperature, max_tokens, etc.) + components_to_optimize: List of component names (e.g., ["system_prompt", "tool_descriptions"]) + num_rollouts_per_example: Number of rollouts per example for evaluation + max_concurrent: Maximum concurrent rollout evaluations + """ + + def __init__( + self, + env: vf.Environment, + client: AsyncOpenAI, + model: str, + sampling_args: dict[str, Any], + components_to_optimize: list[str] | None = None, + num_rollouts_per_example: int = 1, + max_concurrent: int = 32, + ): + self.base_env = env + self.client = client + self.model = model + self.sampling_args = sampling_args + self.components_to_optimize = components_to_optimize or ["system_prompt"] + self.num_rollouts_per_example = num_rollouts_per_example + self.max_concurrent = max_concurrent + self._candidate_build_count = 0 # Track candidate environment builds + self._tool_metadata: dict[ + str, dict[str, Any] + ] = {} # Maps tool_N_description -> {name, parameters} + self.reflection_lm = None # Will be set before optimization starts + + if self.num_rollouts_per_example < 1: + raise ValueError("num_rollouts_per_example must be at least 1") + if self.num_rollouts_per_example > 10: + logger.warning( + "num_rollouts_per_example=%s may be costly; " + "expect roughly %sx more rollouts per batch", + self.num_rollouts_per_example, + self.num_rollouts_per_example, + ) + + # Validate components and extract tool metadata + if "tool_descriptions" in self.components_to_optimize: + if not hasattr(env, "oai_tools") or not env.oai_tools: + raise ValueError( + "Cannot optimize tool_descriptions: environment has no tools" + ) + # Build metadata mapping for tool descriptions + for i, tool in enumerate(env.oai_tools): + comp_name = f"tool_{i}_description" + self._tool_metadata[comp_name] = { + "name": tool["function"]["name"], + "parameters": tool["function"].get("parameters", {}), + } + + for comp in self.components_to_optimize: + if comp not in ["system_prompt", "tool_descriptions"]: + if not hasattr(env, comp): + raise ValueError( + f"Environment does not have component '{comp}'. " + f"Available: system_prompt, tool_descriptions" + ) + + logger.info( + f"Initialized GEPAAdapter for {len(self.components_to_optimize)} components: " + f"{self.components_to_optimize}" + ) + + def build_program(self, candidate: dict[str, str]) -> vf.Environment: + """Create a candidate environment with updated components using shallow copy. + + Why shallow copy instead of deep copy? + - Efficiency: Datasets can be large (100s of MB). Shallow copy shares the dataset + reference across all candidate environments, avoiding memory bloat and copy overhead. + - Safety: String attributes like system_prompt are immutable. Assignment (e.g., + new_env.system_prompt = "...") creates a new reference without affecting the original. + - Shared state: Rubric and parser objects are also shared, which is fine since they + don't get mutated during evaluation. + + Special case for oai_tools: + - When optimizing tool_descriptions, we need to mutate nested dicts in oai_tools + - We deep copy oai_tools in this case to avoid mutating the base environment's tools + """ + import copy + + self._candidate_build_count += 1 + logger.debug( + f"Building candidate environment #{self._candidate_build_count} " + f"with components: {list(candidate.keys())}" + ) + + # Create shallow copy - shares dataset, rubric, parser, etc. + # This is safe because we only replace immutable string attributes, + # not mutate shared objects (except oai_tools, handled below). + new_env = copy.copy(self.base_env) + + # Update system_prompt (assignment replaces reference, doesn't mutate original) + if "system_prompt" in candidate: + new_env.system_prompt = candidate["system_prompt"] + + # Update tool descriptions (need deep copy since we mutate nested dicts) + # We ONLY deep copy when actually updating tools to avoid unnecessary overhead + if hasattr(self.base_env, "oai_tools") and self.base_env.oai_tools: + tool_updates = { + k: v + for k, v in candidate.items() + if k.startswith("tool_") and k.endswith("_description") + } + if tool_updates: + new_env.oai_tools = copy.deepcopy(self.base_env.oai_tools) + for i, tool in enumerate(new_env.oai_tools): + key = f"tool_{i}_description" + if key in tool_updates: + tool["function"]["description"] = tool_updates[key] + + logger.debug( + f"Successfully built {new_env.__class__.__name__} candidate #{self._candidate_build_count}" + ) + return new_env + + def evaluate( + self, + batch: list[dict], + candidate: dict[str, str], + capture_traces: bool = False, + ) -> EvaluationBatch: + """ + Evaluate candidate on batch of examples. + + This method provides a synchronous interface to evaluation, required by GEPA's + optimization loop. Since the verifiers Environment API is async, we bridge the gap: + - If no event loop is running: Use asyncio.run() to create one + - If already in an event loop: Use ThreadPoolExecutor to avoid blocking + + This allows GEPA to work in both sync contexts (normal scripts) and async contexts + (notebooks, services) without requiring callers to manage event loops. + + Args: + batch: List of examples (dicts with 'question', 'answer', 'info', 'task') + candidate: Dict of component values to evaluate + capture_traces: Whether to capture detailed execution traces + + Returns: + EvaluationBatch with outputs, scores, and optional trajectories + """ + # Build environment with candidate components + env = self.build_program(candidate) + + logger.debug( + f"Evaluating candidate on batch of {len(batch)} examples " + f"({self.num_rollouts_per_example} rollouts/example = {len(batch) * self.num_rollouts_per_example} total rollouts)" + ) + + # Run evaluation using Environment's evaluate method + # Note: We cannot simply await here because GEPA's optimize() expects a + # synchronous evaluate() method. We handle both sync and async contexts: + evaluation = self._evaluate_async(env, batch, capture_traces) + try: + asyncio.get_running_loop() + except RuntimeError: + # No running loop - create one and run the async evaluation + return asyncio.run(evaluation) + + # Already in an event loop - run in a thread pool to avoid blocking + # This happens when GEPA is called from an already-async context + with ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(asyncio.run, evaluation) + return future.result() + + async def evaluate_async( + self, + batch: list[dict], + candidate: dict[str, str], + capture_traces: bool = False, + ) -> EvaluationBatch: + """ + Evaluate candidate asynchronously. + + Preferred when the caller already manages an asyncio loop (e.g., notebooks, + services). Mirrors the synchronous evaluate() contract. + """ + env = self.build_program(candidate) + return await self._evaluate_async(env, batch, capture_traces) + + async def _evaluate_async( + self, env: vf.Environment, batch: list[dict], capture_traces: bool + ) -> EvaluationBatch: + """Async helper for evaluation.""" + rollout_inputs = self._build_rollout_inputs(env, batch) + if not rollout_inputs: + raise ValueError( + "Empty evaluation batch - no rollout inputs generated from batch" + ) + + generate_outputs = await env.generate( + inputs=rollout_inputs, + client=self.client, + model=self.model, + sampling_args=self.sampling_args, + max_concurrent=self.max_concurrent, + use_tqdm=False, + ) + + completions = generate_outputs["completion"] + states = generate_outputs["state"] + rewards = generate_outputs["reward"] + + if any(r is None for r in rewards): + raise ValueError( + "Received None reward from environment - check rubric configuration" + ) + scores = [float(score) for score in rewards] + trajectories = [] if capture_traces else None + + if capture_traces: + for completion, state, score in zip(completions, states, scores): + trajectories.append( + { + "completion": completion, + "state": state, + "score": score, + } + ) + + mean_score = fmean(scores) if scores else 0.0 + logger.debug( + f"Evaluation complete: {len(scores)} rollouts, " + f"mean={mean_score:.4f}, min={min(scores) if scores else 0:.4f}, " + f"max={max(scores) if scores else 0:.4f}" + ) + + return EvaluationBatch( + outputs=completions, + scores=scores, + trajectories=trajectories, + ) + + def _build_rollout_inputs( + self, env: vf.Environment, batch: list[dict] + ) -> list[RolloutInput]: + """ + Convert GEPA batch examples into Verifiers RolloutInput objects. + + GEPA uses a different schema than verifiers: + - GEPA: {"question": str, "answer": Any, "task": str, "info": dict, "example_id": int} + - Verifiers: {"prompt": Messages, "answer": Any, "task": str, "info": dict, "example_id": int} + + This method: + 1. Maps "question" -> "prompt" (with format normalization via _format_prompt) + 2. Preserves "answer", "task", "info" fields + 3. Ensures "example_id" is an integer (falls back to index) + 4. Duplicates each input num_rollouts_per_example times for multiple evaluations + + Why deepcopy for each rollout? + - Each rollout needs an independent RolloutInput to avoid state contamination + - Without deepcopy, modifying one rollout's state would affect all copies + """ + rollout_inputs: list[RolloutInput] = [] + + for example_idx, example in enumerate(batch): + # Extract prompt - GEPA uses "question", verifiers uses "prompt" + raw_prompt = example.get("prompt") or example.get("question") or "" + formatted_prompt = self._format_prompt(env, raw_prompt) + task = str(example.get("task") or env.env_id or "default") + + # Ensure example_id is an integer (GEPA may pass strings) + example_id_value = example.get("example_id", example_idx) + try: + example_id = int(example_id_value) + except (TypeError, ValueError): + example_id = example_idx + + base_input: RolloutInput = { + "prompt": formatted_prompt, + "task": task, + "example_id": example_id, + } + + if "answer" in example and example["answer"] is not None: + base_input["answer"] = example["answer"] + + info = example.get("info") + if info is not None: + base_input["info"] = deepcopy(info) + + # Create independent copies for each rollout to avoid state contamination + for _ in range(self.num_rollouts_per_example): + rollout_inputs.append(deepcopy(base_input)) + + return rollout_inputs + + def _format_prompt(self, env: vf.Environment, prompt: str | Messages) -> Messages: + """ + Ensure prompts match the environment's declared message_type. + + Environments can be either "completion" (raw text) or "chat" (message lists). + We need to normalize GEPA's prompts (which can be either format) to match: + + For completion environments (message_type == "completion"): + - String prompts: Pass through as-is + - List prompts: Flatten message contents into a single string + + For chat environments (message_type == "chat"): + - List prompts: Pass through as-is + - String prompts: Wrap in chat structure with system prompt + few-shot examples + + This ensures the environment receives prompts in the format it expects, + regardless of how GEPA provides them. + """ + # Completion environment: flatten everything to a string + if env.message_type == "completion": + if isinstance(prompt, str): + return prompt + if isinstance(prompt, list): + # Extract content from all messages and join + content_parts: list[str] = [] + for message in prompt: + if isinstance(message, dict): + content = message.get("content") + if isinstance(content, str): + content_parts.append(content) + return " ".join(content_parts) if content_parts else str(prompt) + return str(prompt) + + # Chat environment: ensure we have a message list + if isinstance(prompt, list): + return prompt + + # String prompt for chat env: wrap with system prompt + few-shot + messages: list[dict[str, str]] = [] + if env.system_prompt: + messages.append({"role": "system", "content": env.system_prompt}) + if env.few_shot: + messages.extend(deepcopy(env.few_shot)) + messages.append({"role": "user", "content": str(prompt)}) + return messages + + def _format_tool_calls_text(self, tool_calls: list[dict]) -> str: + """Format tool calls as readable text for GEPA reflection.""" + parts = [] + for tc in tool_calls: + func = tc.get("function", {}) + name = func.get("name", "unknown") + args_str = func.get("arguments", "{}") + parts.append(f"Tool Call: {name}({args_str})") + return "\n".join(parts) + + def make_reflective_dataset( + self, + candidate: dict[str, str], + eval_batch: EvaluationBatch, + components_to_update: list[str], + ) -> dict[str, list[dict]]: + """ + Generate reflective dataset for GEPA's proposal phase. + + Each reflective example contains: + - Inputs: Original prompt/task context + - Generated_Outputs: Model completion + - Feedback: Textual explanation of score + + Args: + candidate: Current candidate being evaluated + eval_batch: Results from evaluate() + components_to_update: Which components to generate feedback for + + Returns: + Dict mapping component_name → list[ReflectiveExample] + """ + if not eval_batch.trajectories: + raise ValueError( + "make_reflective_dataset requires capture_traces=True in evaluate()" + ) + + reflective_data: dict[str, list[dict]] = {} + _warned_no_get_feedback = False + + # For environment-level components (like system_prompt), all examples + # reflect on the same component, so we aggregate feedback across examples + for comp_name in components_to_update: + # Check if component is in optimization list + # Support both exact matches (e.g., "system_prompt") and group patterns + # (e.g., "tool_0_description" matches "tool_descriptions") + # + # Why this complexity? + # When optimizing tool_descriptions, GEPA's propose_new_texts receives + # individual components like "tool_0_description", "tool_1_description" etc. + # But components_to_optimize contains the group name "tool_descriptions". + # We need to match the individual tool components to the group. + is_optimizable = comp_name in self.components_to_optimize + + # Check if this is a tool description (tool_N_description pattern) + if ( + not is_optimizable + and "tool_descriptions" in self.components_to_optimize + ): + # Match pattern: tool_0_description, tool_1_description, etc. + if comp_name.startswith("tool_") and comp_name.endswith("_description"): + is_optimizable = True + + if not is_optimizable: + logger.debug( + f"Skipping component '{comp_name}' - not in components_to_optimize: {self.components_to_optimize}" + ) + continue + + examples = [] + + for traj in eval_batch.trajectories: + completion = traj["completion"] + state = traj["state"] + score = traj["score"] + + # Extract prompt for context + prompt = state.get("prompt", "") + if isinstance(prompt, list): + # Chat format - extract user message + user_msgs = [m for m in prompt if m.get("role") == "user"] + prompt_text = user_msgs[-1].get("content", "") if user_msgs else "" + else: + prompt_text = prompt + + # Extract completion text - format entire conversation + if isinstance(completion, list): + # Chat format - include all messages (assistant + tool responses) + completion_parts = [] + for msg in completion: + role = msg.get("role", "") + content = msg.get("content", "") + + if role == "assistant": + # Include content if present + if content: + completion_parts.append(f"Assistant: {content}") + # Include tool calls + tool_calls = msg.get("tool_calls", []) + if tool_calls: + completion_parts.append( + self._format_tool_calls_text(tool_calls) + ) + elif role == "tool": + # Include tool responses + completion_parts.append(f"Tool Result: {content}") + + completion_text = ( + "\n\n".join(completion_parts) if completion_parts else "" + ) + else: + completion_text = str(completion) + + # Build inputs dict + inputs = { + "Task": prompt_text, + } + + # Build outputs + generated_outputs = completion_text + + # Generate feedback - use rubric's get_feedback if available + if hasattr(self.base_env.rubric, "get_feedback"): + feedback = self.base_env.rubric.get_feedback(state) + else: + # Default fallback for basic rubrics - warn once + if not _warned_no_get_feedback: + logger.warning( + "Rubric lacks get_feedback method - using generic feedback. " + "Consider implementing get_feedback for better GEPA reflection." + ) + _warned_no_get_feedback = True + feedback = f"Reward: {score:.3f}" + if score < 0.5: + feedback += " (Low score - needs improvement)" + elif score >= 0.8: + feedback += " (Good performance)" + + examples.append( + { + "Inputs": inputs, + "Generated Outputs": generated_outputs, + "Feedback": feedback, + } + ) + + reflective_data[comp_name] = examples + + if not reflective_data: + raise ValueError( + f"No reflective data generated for components: {components_to_update}" + ) + + logger.info( + f"Generated reflective dataset with {sum(len(v) for v in reflective_data.values())} examples " + f"across {len(reflective_data)} components" + ) + + return reflective_data + + def propose_new_texts( + self, + candidate: dict[str, str], + reflective_dataset: Mapping[str, Sequence[Mapping[str, Any]]], + components_to_update: list[str], + ) -> dict[str, str]: + """ + Propose new text for components using tool-aware templates. + + Why different templates for different components? + - Tool descriptions need context about the tool's name, parameters, and purpose + - System prompts are general instructions that don't need tool-specific context + + Template selection logic: + 1. Check if component is in self._tool_metadata (tool_N_description pattern) + -> Use TOOL_DESCRIPTION_PROMPT_TEMPLATE with tool name + parameters + 2. Otherwise (system_prompt, etc.) + -> Use GEPA's default InstructionProposalSignature + + Both templates receive the same reflective feedback data, but format it + differently for the reflection model to generate appropriate improvements. + + Args: + candidate: Current candidate component values + reflective_dataset: Feedback data generated by make_reflective_dataset + components_to_update: List of component names to update + + Returns: + Dict mapping component names to newly proposed text + """ + if self.reflection_lm is None: + raise ValueError( + "reflection_lm must be set on GEPAAdapter before propose_new_texts can be called. " + "This should be set by run_gepa_optimization before calling gepa.optimize()." + ) + + from gepa.strategies.instruction_proposal import InstructionProposalSignature + + new_texts: dict[str, str] = {} + + for comp_name in components_to_update: + # Gracefully handle missing component data + if comp_name not in reflective_dataset or not reflective_dataset.get( + comp_name + ): + logger.warning( + f"Component '{comp_name}' not in reflective dataset. Skipping." + ) + continue + + current_text = candidate[comp_name] + feedback_data = reflective_dataset[comp_name] + + # Check if this is a tool description component + # Tool metadata is populated in __init__ when tool_descriptions is being optimized + if comp_name in self._tool_metadata: + # Use tool-specific template that includes tool name and parameter schema + # This gives the reflection model context about what the tool does + tool_info = self._tool_metadata[comp_name] + new_texts[comp_name] = self._propose_tool_description( + tool_name=tool_info["name"], + tool_parameters=tool_info["parameters"], + current_description=current_text, + feedback_data=feedback_data, + ) + logger.debug( + f"Proposed new tool description for {comp_name} (tool: {tool_info['name']})" + ) + else: + # Use default GEPA instruction proposal template for system_prompt, etc. + # This is GEPA's standard prompt optimization template + new_texts[comp_name] = InstructionProposalSignature.run( + lm=self.reflection_lm, + input_dict={ + "current_instruction_doc": current_text, + "dataset_with_feedback": feedback_data, + "prompt_template": None, # Use default + }, + )["new_instruction"] + logger.debug(f"Proposed new instruction for {comp_name}") + + return new_texts + + def _propose_tool_description( + self, + tool_name: str, + tool_parameters: dict, + current_description: str, + feedback_data: Sequence[Mapping[str, Any]], + ) -> str: + """ + Propose a new tool description using the tool-specific template. + + Args: + tool_name: Name of the tool being optimized + tool_parameters: JSON schema of tool parameters + current_description: Current tool description text + feedback_data: Reflective examples with feedback + + Returns: + Newly proposed tool description + """ + + # Format the feedback data using GEPA's standard markdown formatter + def format_samples(samples): + def render_value(value, level=3): + if isinstance(value, dict): + s = "" + for k, v in value.items(): + s += f"{'#' * level} {k}\n" + s += render_value(v, min(level + 1, 6)) + if not value: + s += "\n" + return s + elif isinstance(value, list | tuple): + s = "" + for i, item in enumerate(value): + s += f"{'#' * level} Item {i + 1}\n" + s += render_value(item, min(level + 1, 6)) + if not value: + s += "\n" + return s + else: + return f"{str(value).strip()}\n\n" + + def convert_sample_to_markdown(sample, examplenum): + s = f"# Example {examplenum}\n" + for key, val in sample.items(): + s += f"## {key}\n" + s += render_value(val, level=3) + return s + + return "\n\n".join( + convert_sample_to_markdown(sample, i + 1) + for i, sample in enumerate(samples) + ) + + # Build the tool-specific prompt + prompt = TOOL_DESCRIPTION_PROMPT_TEMPLATE + prompt = prompt.replace("", tool_name) + prompt = prompt.replace( + "", json.dumps(tool_parameters, indent=2) + ) + prompt = prompt.replace("", current_description) + prompt = prompt.replace( + "", format_samples(feedback_data) + ) + + # Call reflection LM + response = self.reflection_lm(prompt) + + # Extract the new description from code blocks using GEPA's standard extractor + import re + + def extract_instruction_text(lm_out: str) -> str: + start = lm_out.find("```") + 3 + end = lm_out.rfind("```") + + if start >= end: + stripped = lm_out.strip() + if stripped.startswith("```"): + match = re.match(r"^```\S*\n?", lm_out) + if match: + return lm_out[match.end() :].strip() + elif stripped.endswith("```"): + return stripped[:-3].strip() + return stripped + + content = lm_out[start:end] + match = re.match(r"^\S*\n", content) + if match: + content = content[match.end() :] + + return content.strip() + + return extract_instruction_text(response) + + +__all__ = ["GEPAAdapter"] diff --git a/verifiers/gepa/templates.py b/verifiers/gepa/templates.py new file mode 100644 index 000000000..6d09b9eb5 --- /dev/null +++ b/verifiers/gepa/templates.py @@ -0,0 +1,41 @@ +""" +Prompt templates for GEPA optimization in Verifiers. + +This module contains specialized templates for different component types +(tool descriptions, system prompts, etc.) used during GEPA's reflection phase. +""" + +# Tool-specific prompt template for GEPA reflection +TOOL_DESCRIPTION_PROMPT_TEMPLATE = """You are improving the description of a tool (function) that an AI assistant can call. + +TOOL NAME: + +TOOL PARAMETERS: +```json + +``` + +CURRENT DESCRIPTION: +``` + +``` + +The following are examples of how the assistant used this tool, along with feedback on the results: +``` + +``` + +Your task is to write an improved TOOL DESCRIPTION for the "" tool. + +A good tool description should: +- Clearly explain what the tool does and when to use it +- Match the parameter schema shown above +- Mention any important constraints, edge cases, or common mistakes +- Be concise but informative enough for the AI to decide when/how to call this tool + +Based on the feedback, identify patterns in tool misuse and improve the description to prevent them. + +Provide the new tool description within ``` blocks.""" + + +__all__ = ["TOOL_DESCRIPTION_PROMPT_TEMPLATE"] diff --git a/verifiers/gepa/utils.py b/verifiers/gepa/utils.py new file mode 100644 index 000000000..664c0dfb3 --- /dev/null +++ b/verifiers/gepa/utils.py @@ -0,0 +1,598 @@ +"""Utility functions for GEPA optimization.""" + +import importlib.resources +import json +import logging +import math +import os +import sys +import textwrap +from datetime import datetime +from pathlib import Path +from typing import Any, Dict + +try: + import tomllib # type: ignore[unresolved-import] +except ImportError: + import tomli as tomllib # type: ignore[unresolved-import] + +from openai import AsyncOpenAI, OpenAI + +import verifiers as vf +from verifiers.gepa.adapter import GEPAAdapter +from verifiers.types import GEPAConfig +from verifiers.utils.client_utils import setup_client +from verifiers.utils.eval_utils import save_rollout_results +from verifiers.utils.path_utils import get_gepa_results_path + +logger = logging.getLogger(__name__) + +# Auto-budget constants for clarity and tuning +AUTO_BUDGET_CANDIDATES = { + "light": 6, + "medium": 12, + "heavy": 18, +} +TRIAL_LOG_BASE_MULTIPLIER = 2.0 +TRIAL_COMPONENT_MULTIPLIER = 2 +TRIAL_LINEAR_MULTIPLIER = 1.5 +BOOTSTRAP_TRIALS_PER_CANDIDATE = 5 + + +def get_env_gepa_defaults(env_id: str) -> Dict[str, Any]: + """Get GEPA config defaults from environment package's pyproject.toml. + + Returns dict with 'num_examples', 'num_val', and 'rollouts_per_example' keys if found, + otherwise returns empty dict. + """ + defaults: Dict[str, Any] = {} + module_name = env_id.replace("-", "_").split("/")[-1] + + try: + # read pyproject.toml from installed package + package_ref = importlib.resources.files(module_name) + pyproject_file = package_ref / "pyproject.toml" + + if not pyproject_file.is_file(): + logger.debug(f"pyproject.toml not found in installed package {module_name}") + return defaults + + with pyproject_file.open("rb") as f: + pyproject_data = tomllib.load(f) + + # Extract [tool.verifiers.gepa] section + gepa_config = ( + pyproject_data.get("tool", {}).get("verifiers", {}).get("gepa", {}) + ) + + if "num_examples" in gepa_config: + defaults["num_examples"] = gepa_config["num_examples"] + if "num_val" in gepa_config: + defaults["num_val"] = gepa_config["num_val"] + if "rollouts_per_example" in gepa_config: + defaults["rollouts_per_example"] = gepa_config["rollouts_per_example"] + + if defaults: + logger.debug( + f"Loaded GEPA defaults from {module_name} pyproject.toml: {defaults}" + ) + except ModuleNotFoundError: + logger.debug(f"Package {module_name} not installed") + + return defaults + + +def ensure_env_dir_on_path(env_dir_path: str, env_id: str) -> None: + """Add local environment directory to sys.path if present. + + Adds the specific environment folder (e.g., environments/gsm8k/) to sys.path + so that `import gsm8k` finds gsm8k.py directly, avoiding namespace package issues. + """ + env_dir = Path(env_dir_path).resolve() + if not env_dir.exists(): + return + module_name = env_id.replace("-", "_").split("/")[-1] + candidate = env_dir / module_name + if candidate.exists(): + # Add the specific environment folder so Python finds the .py file directly + # e.g., add environments/gsm8k/ so `import gsm8k` finds gsm8k.py + env_folder_str = str(candidate) + if env_folder_str not in sys.path: + sys.path.insert(0, env_folder_str) + logger.debug(f"Added {env_folder_str} to sys.path for environment loading") + + +async def save_candidate_rollouts( + adapter: GEPAAdapter, + candidate: dict[str, str], + label: str, + client: AsyncOpenAI, + model: str, + sampling_args: dict, + num_examples: int, + rollouts_per_example: int, + max_concurrent: int, + save_every: int, + log_dir: Path, +) -> None: + """ + Evaluate a candidate program and save rollout trajectories to disk. + """ + if num_examples <= 0: + raise ValueError(f"num_examples must be positive, got {num_examples}") + + env = adapter.build_program(candidate) + rollouts_dir = log_dir / "rollouts" / label + rollouts_dir.mkdir(parents=True, exist_ok=True) + logger.info( + "Saving %s candidate rollouts to %s (num_examples=%s, rollouts=%s)", + label, + rollouts_dir, + num_examples, + rollouts_per_example, + ) + results = await env.evaluate( + client=client, + model=model, + sampling_args=sampling_args, + num_examples=num_examples, + rollouts_per_example=rollouts_per_example, + max_concurrent=max_concurrent, + results_path=rollouts_dir, + save_results=False, + save_every=save_every, + ) + save_rollout_results(results) + + +def auto_budget_to_metric_calls( + auto: str, + num_components: int, + valset_size: int, + minibatch_size: int = 35, + full_eval_steps: int = 5, +) -> int: + """ + Convert auto budget (light/medium/heavy) to max_metric_calls. + + This replicates DSPy's auto_budget calculation for consistency with GEPA's + expectations. The formula estimates total metric calls (rollout evaluations) by: + + 1. Mapping budget -> target number of candidates to explore: + - light: ~6 candidates + - medium: ~12 candidates + - heavy: ~18 candidates + + 2. Computing number of optimization trials (iterations) using: + - Log growth: 2.0 * (num_components * 2) * log2(num_candidates) + - Linear fallback: 1.5 * num_candidates + - Take the maximum to ensure sufficient exploration + + 3. Summing all evaluation costs: + - Initial validation: V (full eval on seed candidate) + - Bootstrap: num_candidates * 5 (small evals per candidate) + - Reflection minibatches: N * M (N trials on M examples each) + - Periodic full validations: (N // full_eval_steps + 1) * V + + This ensures the optimization has enough budget to explore candidates + while periodically measuring improvement on the full validation set. + + Args: + auto: Budget level ('light', 'medium', or 'heavy') + num_components: Number of components being optimized + valset_size: Size of validation set + minibatch_size: Reflection minibatch size (default: 35, matching DSPy) + full_eval_steps: Steps between full validations + + Returns: + Maximum number of metric calls + """ + # Map budget name to target number of candidates + num_candidates = AUTO_BUDGET_CANDIDATES[auto] + + # Calculate number of trials using log-growth vs. linear fallback + # Log-growth scales better with more candidates, linear ensures minimum trials + log_trials = ( + TRIAL_LOG_BASE_MULTIPLIER + * (num_components * TRIAL_COMPONENT_MULTIPLIER) + * math.log2(num_candidates) + ) + linear_trials = TRIAL_LINEAR_MULTIPLIER * num_candidates + num_trials = int(max(log_trials, linear_trials)) + + # Use shorter variable names for clarity in formula + V = valset_size # Validation set size + N = num_trials # Number of optimization trials + M = minibatch_size # Minibatch size for reflection + m = full_eval_steps # Steps between full validations + + # Initial full evaluation on the seed (default) program + total = V + + # Bootstrap evaluations: quick evals to initialize each candidate + total += num_candidates * BOOTSTRAP_TRIALS_PER_CANDIDATE + + # Reflection minibatch evaluations: N trials, each on M examples + total += N * M + + if N == 0: + return total + + # Periodic full validations to measure progress + # We do a full validation every m steps, plus potentially a final one + periodic_fulls = (N + 1) // m + 1 + extra_final = 1 if N < m else 0 + + total += (periodic_fulls + extra_final) * V + + logger.info( + f"Auto budget '{auto}' → ~{num_candidates} candidates, " + f"~{total} metric calls (~{total // (V or 1)} full evals)" + ) + + return total + + +def prepare_gepa_dataset(dataset) -> list[dict]: + """ + Convert HuggingFace Dataset to GEPA format. + + GEPA expects a list of dicts with keys like 'question', 'answer', 'info', 'task'. + """ + if dataset is None: + raise ValueError("dataset cannot be None") + + examples = [] + for item in dataset: + example = { + "question": item.get("question", item.get("prompt", "")), + "answer": item.get("answer", ""), + "task": item.get("task", "default"), + "info": item.get("info", {}), + } + examples.append(example) + + return examples + + +def call_reflection_model( + client: OpenAI, + prompt: str, + model: str, + temperature: float = 1.0, + max_tokens: int | None = None, +) -> str: + """ + Call reflection model to generate proposal. + + This is a wrapper around the API call for GEPA's reflection phase. + """ + try: + request_args = { + "model": model, + "messages": [{"role": "user", "content": prompt}], + "temperature": temperature, + } + if max_tokens is not None: + request_args["max_tokens"] = max_tokens + response = client.chat.completions.create(**request_args) + return response.choices[0].message.content or "" + except Exception as e: + logger.error(f"Error calling reflection model: {e}") + raise + + +def save_optimized_components( + env_id: str, + best_candidate: dict[str, str], + seed_candidate: dict[str, str], + output_dir: Path, +): + """Save optimized components to disk for future use.""" + output_file = output_dir / f"{env_id}_optimized.json" + output_file.parent.mkdir(parents=True, exist_ok=True) + + with open(output_file, "w") as f: + json.dump(best_candidate, f, indent=2) + + logger.info(f"Saved optimized components to: {output_file}") + + # Also save the original (seed) components for comparison + original_file = output_dir / f"{env_id}_original.json" + with open(original_file, "w") as f: + json.dump(seed_candidate, f, indent=2) + + logger.info(f"Saved original components to: {original_file}") + + +def save_optimization_metrics( + env_id: str, + result, + output_dir: Path, + run_config: dict, +): + """Save optimization metrics and configuration for analysis.""" + metrics_file = output_dir / f"{env_id}_metrics.json" + + metrics = { + # Run configuration + "config": run_config, + # Timestamps + "date": datetime.now().strftime("%Y-%m-%d"), + "timestamp": datetime.now().isoformat(), + # Results + "val_aggregate_scores": result.val_aggregate_scores, + "num_candidates": len(result.candidates), + "best_val_score": ( + float(max(result.val_aggregate_scores)) + if result.val_aggregate_scores + else 0.0 + ), + "initial_val_score": ( + float(result.val_aggregate_scores[0]) + if result.val_aggregate_scores + else 0.0 + ), + "improvement": ( + float(max(result.val_aggregate_scores) - result.val_aggregate_scores[0]) + if len(result.val_aggregate_scores) > 0 + else 0.0 + ), + "candidates_history": [ + { + "iteration": i, + "score": float(score), + } + for i, score in enumerate(result.val_aggregate_scores) + ], + } + + with open(metrics_file, "w") as f: + json.dump(metrics, f, indent=2) + + logger.info(f"Saved optimization metrics to: {metrics_file}") + + +def print_optimization_results(result, log_dir: Path): + """Print GEPA optimization results to console.""" + print("\n" + "=" * 80) + print("GEPA OPTIMIZATION COMPLETE") + print("=" * 80) + print(f"Best validation score: {max(result.val_aggregate_scores):.3f}") + print(f"Initial validation score: {result.val_aggregate_scores[0]:.3f}") + print( + f"Improvement: {max(result.val_aggregate_scores) - result.val_aggregate_scores[0]:.3f}" + ) + print(f"Total candidates fully explored: {len(result.candidates)}") + print("\nOptimized components:") + print("-" * 80) + + for comp, text in result.best_candidate.items(): + print(f"\n{comp}:") + print(textwrap.indent(text, " ")) + + print("\n" + "=" * 80) + print(f"Logs saved to: {log_dir}") + print("=" * 80) + + +async def run_gepa_optimization(config: GEPAConfig): + """ + Run GEPA optimization with provided configuration. + + Handles: + - Adapter creation + - Reflection client setup + - GEPA optimize() call + - Result saving and output + + Args: + config: GEPAConfig with all optimization parameters + + Returns: + GEPA optimization result + """ + try: + from gepa import optimize + except ImportError: + print("Error: GEPA is not installed.") + print("Install with: uv add 'verifiers[gepa]'") + sys.exit(1) + + # Setup log directory + log_dir = get_gepa_results_path(config) + log_dir.mkdir(parents=True, exist_ok=True) + logger.info(f"Log directory: {log_dir}") + + # Setup task client + client = setup_client(config.client_config) + logger.debug("Initialized OpenAI client") + + # Load environment + vf_env = vf.load_environment(env_id=config.env_id, **config.env_args) + + if isinstance(vf_env, vf.EnvGroup): + raise ValueError( + "GEPA optimization is not supported for EnvGroup environments. " + "Optimize each environment individually, then combine them." + ) + + # Validate components + for component in config.components_to_optimize: + if component == "tool_descriptions": + if not getattr(vf_env, "oai_tools", None): + raise ValueError( + "Cannot optimize tool_descriptions: " + f"environment '{config.env_id}' has no tools configured." + ) + elif not hasattr(vf_env, component): + raise ValueError( + f"Environment '{config.env_id}' is missing component '{component}'. " + "Provide a component that exists on the environment." + ) + + # Create adapter + adapter = GEPAAdapter( + env=vf_env, + client=client, + model=config.model, + sampling_args=config.sampling_args, + components_to_optimize=config.components_to_optimize, + num_rollouts_per_example=config.rollouts_per_example, + max_concurrent=config.max_concurrent, + ) + + # Setup reflection client + reflection_client_kwargs = { + "api_key": config.reflection_api_key, + "base_url": config.reflection_base_url, + } + if config.client_config.extra_headers: + reflection_client_kwargs["default_headers"] = config.client_config.extra_headers + reflection_client = OpenAI(**reflection_client_kwargs) + logger.debug( + "Reflection client configured for model %s at %s", + config.reflection_model, + config.reflection_base_url, + ) + + # Log initial component values + logger.info("Initial component values:") + for comp, value in config.seed_candidate.items(): + preview = value[:200] + "..." if len(value) > 200 else value + logger.info(f" {comp}: {preview}") + + # Run GEPA + logger.info("=" * 80) + logger.info("Starting GEPA optimization...") + logger.info("=" * 80) + + # Build wandb_init_kwargs from config + wandb_init_kwargs = ( + config.wandb_init_kwargs.copy() if config.wandb_init_kwargs else {} + ) + if config.use_wandb: + if config.wandb_project: + wandb_init_kwargs["project"] = config.wandb_project + if config.wandb_entity: + wandb_init_kwargs["entity"] = config.wandb_entity + if config.wandb_name: + wandb_init_kwargs["name"] = config.wandb_name + else: + wandb_init_kwargs.setdefault("name", f"gepa-{config.env_id}") + + # Get wandb API key from env var + wandb_api_key = os.getenv(config.wandb_api_key_var) if config.use_wandb else None + + # Set reflection_lm on adapter for propose_new_texts method + # GEPA's optimize() expects a simple reflection_lm(prompt) -> str callable. + # We create a lambda that captures the reflection client and config, + # allowing the adapter's propose_new_texts() to call the reflection model + # without needing to manage the client itself. + # + # Why set this on the adapter? + # The GEPAAdapter.propose_new_texts() method needs to call the reflection model, + # but GEPA's protocol doesn't pass reflection_lm to that method - it only passes + # it to optimize(). By setting it as an attribute, we make it accessible within + # propose_new_texts() while keeping the GEPA protocol interface clean. + adapter.reflection_lm = lambda x: call_reflection_model( + reflection_client, + x, + config.reflection_model, + config.reflection_temperature, + config.reflection_max_tokens, + ) + + try: + result = optimize( + seed_candidate=config.seed_candidate, + trainset=config.trainset, + valset=config.valset, + adapter=adapter, + max_metric_calls=config.max_metric_calls, + reflection_lm=adapter.reflection_lm, + reflection_minibatch_size=config.reflection_minibatch_size, + run_dir=str(log_dir), + track_best_outputs=config.track_stats, + seed=config.seed, + display_progress_bar=True, + # experiment tracking + use_wandb=config.use_wandb, + wandb_api_key=wandb_api_key, + wandb_init_kwargs=wandb_init_kwargs if config.use_wandb else None, + use_mlflow=config.use_mlflow, + mlflow_tracking_uri=config.mlflow_tracking_uri, + mlflow_experiment_name=config.mlflow_experiment_name, + ) + except Exception as e: + logger.error(f"GEPA optimization failed: {e}", exc_info=True) + raise + + # Print results + print_optimization_results(result, log_dir) + + # Prepare run configuration for saving + run_config = { + "env_id": config.env_id, + "model": config.model, + "reflection_model": config.reflection_model, + "reflection_temperature": config.reflection_temperature, + "components": config.components_to_optimize, + "trainset_size": len(config.trainset), + "valset_size": len(config.valset), + "rollouts_per_example": config.rollouts_per_example, + "max_metric_calls": config.max_metric_calls, + "reflection_minibatch_size": config.reflection_minibatch_size, + "seed": config.seed, + "max_concurrent": config.max_concurrent, + } + + # Save results + save_optimized_components( + config.env_id, result.best_candidate, config.seed_candidate, log_dir + ) + save_optimization_metrics(config.env_id, result, log_dir, run_config) + + # Save rollouts if requested + if config.save_results: + save_every = config.save_every if config.save_every > 0 else -1 + val_examples_for_logging = ( + config.num_val if config.num_val > 0 else config.num_examples + ) + + async def save_all_candidates(): + await save_candidate_rollouts( + adapter=adapter, + candidate=config.seed_candidate, + label="seed", + client=client, + model=config.model, + sampling_args=config.sampling_args, + num_examples=val_examples_for_logging, + rollouts_per_example=config.rollouts_per_example, + max_concurrent=config.max_concurrent, + save_every=save_every, + log_dir=log_dir, + ) + await save_candidate_rollouts( + adapter=adapter, + candidate=result.best_candidate, + label="best", + client=client, + model=config.model, + sampling_args=config.sampling_args, + num_examples=val_examples_for_logging, + rollouts_per_example=config.rollouts_per_example, + max_concurrent=config.max_concurrent, + save_every=save_every, + log_dir=log_dir, + ) + + try: + await save_all_candidates() + except RuntimeError as exc: + logger.error(f"Failed to save rollout trajectories: {exc}") + raise + + logger.info("GEPA optimization completed successfully!") + return result diff --git a/verifiers/rubrics/rubric.py b/verifiers/rubrics/rubric.py index 327caf560..07596cd24 100644 --- a/verifiers/rubrics/rubric.py +++ b/verifiers/rubrics/rubric.py @@ -8,6 +8,7 @@ from verifiers.types import ( GroupRewardFunc, RewardFunc, + RewardResult, RolloutScore, State, ) @@ -47,6 +48,7 @@ def __init__( ) self.parser = parser or vf.Parser() + self._warned_no_feedback = False # class objects for reward functions self.class_objects = {} @@ -98,15 +100,38 @@ def _get_individual_reward_weights(self) -> list[float]: if not self._is_group_func(func) ] + def _parse_reward_result( + self, func_name: str, result: float | RewardResult + ) -> tuple[float, str | None]: + """ + Normalize reward function outputs to (score, feedback). + + Raises: + ValueError: if a RewardResult dict omits the required "score" key. + """ + if isinstance(result, dict): + if "score" not in result: + raise ValueError( + f"RewardResult dict missing required 'score' key for {func_name}: {result}" + ) + score = float(result["score"]) + feedback = result.get("feedback") + return score, feedback + return float(result), None + async def _call_individual_reward_func( self, func: RewardFunc, state: State, score_sem: AsyncContextManager, - ) -> float: + ) -> float | RewardResult: """ Invoke `func` with only the required arguments. + Reward functions can return either: + - float: backward compatible (no feedback) + - dict: {"score": float, "feedback": str} (for FeedbackRubric) + Example: ``` def func(completion, answer, **kwargs): @@ -128,22 +153,31 @@ async def _call(): merged.update(self.class_objects) if any(p.kind == p.VAR_KEYWORD for p in sig.parameters.values()): try: - ans = float(await maybe_await(func, **merged)) + result = await maybe_await(func, **merged) + # Handle both float and dict returns + if isinstance(result, dict): + return result + else: + return float(result) except Exception as e: self.logger.error( f"Error calling reward function {func.__name__}: {e}" # type: ignore[unresolved-attribute] ) - ans = 0.0 + return 0.0 else: allowed = {k: v for k, v in merged.items() if k in sig.parameters} try: - ans = float(await maybe_await(func, **allowed)) + result = await maybe_await(func, **allowed) + # Handle both float and dict returns + if isinstance(result, dict): + return result + else: + return float(result) except Exception as e: self.logger.error( f"Error calling reward function {func.__name__}: {e}" # type: ignore[unresolved-attribute] ) - ans = 0.0 - return ans + return 0.0 async with score_sem: return await _call() @@ -216,14 +250,20 @@ async def score_rollout(self, state: State, score_sem: AsyncContextManager): ) start_time = time.time() reward_scores = [] + feedbacks = [] # Collect feedback from functions that return dicts + for func in reward_funcs: - reward_scores.append( - await self._call_individual_reward_func( - func=func, - state=state, - score_sem=score_sem, - ) + result = await self._call_individual_reward_func( + func=func, + state=state, + score_sem=score_sem, ) + + score, feedback = self._parse_reward_result(func.__name__, result) + if feedback: + feedbacks.append(f"{func.__name__}: {feedback}") + reward_scores.append(score) + rewards = RolloutScore( metrics={ func.__name__: reward @@ -243,6 +283,38 @@ async def score_rollout(self, state: State, score_sem: AsyncContextManager): state["timing"]["total_ms"] += state["timing"]["scoring_ms"] state["reward"] = rewards["reward"] state["metrics"] = rewards["metrics"] + state["feedbacks"] = feedbacks # Store feedback for get_feedback() + + def get_feedback(self, state: State) -> str: + """ + Combine feedback from all reward functions into a single string. + + This method should be called after score_rollout() has been executed, + which populates state["feedbacks"]. + + Args: + state: State dict containing execution results + + Returns: + Combined feedback string from all reward functions + """ + feedbacks = state.get("feedbacks", []) + + if not feedbacks: + # Fallback if no functions provided feedback - warn once + if not self._warned_no_feedback: + self.logger.warning( + "No detailed feedback from reward functions. For better GEPA optimization, " + "return RewardResult({'score': float, 'feedback': str}) from reward functions." + ) + self._warned_no_feedback = True + score = state.get("reward", 0.0) + return f"Score: {score:.2%} (no detailed feedback available)" + + # Combine all feedback with score summary + combined = f"Score: {state.get('reward', 0.0):.2%}\n\n" + combined += "\n\n".join(feedbacks) + return combined async def score_group(self, states: list[State], score_sem: AsyncContextManager): """ @@ -271,7 +343,13 @@ async def score_group(self, states: list[State], score_sem: AsyncContextManager) if func_name not in aggregated_metrics: aggregated_metrics[func_name] = [0.0] * num_states for i in range(num_states): - score_value = scores[i] + score_value, feedback = self._parse_reward_result( + func_name, scores[i] + ) + if feedback: + states[i].setdefault("feedbacks", []).append( + f"{func_name}: {feedback}" + ) aggregated_rewards[i] += score_value * weight aggregated_metrics[func_name][i] = score_value else: @@ -288,7 +366,13 @@ async def score_group(self, states: list[State], score_sem: AsyncContextManager) if func_name not in aggregated_metrics: aggregated_metrics[func_name] = [0.0] * num_states for i in range(num_states): - score_value = scores[i] + score_value, feedback = self._parse_reward_result( + func_name, scores[i] + ) + if feedback: + states[i].setdefault("feedbacks", []).append( + f"{func_name}: {feedback}" + ) aggregated_rewards[i] += score_value * weight aggregated_metrics[func_name][i] = score_value diff --git a/verifiers/scripts/gepa.py b/verifiers/scripts/gepa.py new file mode 100644 index 000000000..d2ac7e541 --- /dev/null +++ b/verifiers/scripts/gepa.py @@ -0,0 +1,570 @@ +#!/usr/bin/env python3 +""" +GEPA optimization script for Verifiers environments. + +Usage: + vf-gepa wordle --budget light + vf-gepa wiki-search --budget heavy --components system_prompt tool_descriptions + vf-gepa my-env --max-metric-calls 1000 -n 100 --num-val 30 +""" + +import argparse +import asyncio +import json +import logging +import os +import sys + +try: + from gepa import optimize # noqa: F401 +except ImportError: + print("Error: GEPA is not installed.") + print("Install with: uv add 'verifiers[gepa]'") + sys.exit(1) + +from verifiers import setup_logging +from verifiers.types import ClientConfig, GEPAConfig +from verifiers.utils.eval_utils import load_endpoints +from verifiers.gepa import ( + auto_budget_to_metric_calls, + ensure_env_dir_on_path, + get_env_gepa_defaults, + prepare_gepa_dataset, + run_gepa_optimization, +) + +import verifiers as vf + +logger = logging.getLogger("gepa") + +# Default constants +DEFAULT_NUM_EXAMPLES = 50 +DEFAULT_NUM_VAL = 20 +DEFAULT_ROLLOUTS_PER_EXAMPLE = 1 + + +def main(): + parser = argparse.ArgumentParser( + description="Run GEPA prompt optimization on Verifiers environments", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Light optimization (quick test) + vf-gepa wordle --budget light + + # Heavy optimization with tool descriptions + vf-gepa wiki-search --budget heavy --components system_prompt tool_descriptions + + # Custom configuration + vf-gepa my-env --max-metric-calls 1000 -n 100 --num-val 30 + """, + ) + + # 1. Positional: env_id + parser.add_argument( + "env_id", type=str, help="Environment ID (e.g., wordle, wiki-search)" + ) + + # 2. Environment config + parser.add_argument( + "--env-args", + "-a", + default="{}", + help="JSON dict of keyword args forwarded to vf.load_environment", + ) + parser.add_argument( + "--env-dir-path", + "-p", + type=str, + default="./environments", + help="Path to environments directory", + ) + + # 3. Dataset + parser.add_argument( + "-n", + "--num-examples", + type=int, + default=None, + help="Number of training examples", + ) + parser.add_argument( + "--num-val", + type=int, + default=None, + help="Number of validation examples", + ) + + # 4. Endpoints/Model + parser.add_argument( + "--endpoints-path", + "-e", + type=str, + default="./configs/endpoints.py", + help="Path to API endpoints registry", + ) + parser.add_argument( + "-m", + "--model", + default="gpt-5-mini", + help="Model to optimize (default: gpt-5-mini)", + ) + parser.add_argument( + "--api-key-var", + "-k", + default="OPENAI_API_KEY", + help="Environment variable containing the task model API key", + ) + parser.add_argument( + "--api-base-url", + "-b", + default="https://api.openai.com/v1", + help="Base URL for the task model API (default: https://api.openai.com/v1)", + ) + parser.add_argument( + "--header", + action="append", + dest="headers", + default=None, + help="Additional HTTP header for the task model client. Format: 'Name: Value'. Repeatable.", + ) + + # 5. Sampling + parser.add_argument( + "-T", + "--temperature", + type=float, + default=1.0, + help="Temperature for task model (default: 1.0)", + ) + parser.add_argument( + "-t", + "--max-tokens", + type=int, + default=None, + help="Max tokens for task model (unset to use model default)", + ) + parser.add_argument( + "--sampling-args", + "-S", + type=json.loads, + default=None, + help=( + "Sampling arguments as JSON object. Keys here override --max-tokens/--temperature. " + 'Example: \'{"enable_thinking": false, "max_tokens": 256}\'' + ), + ) + + # 6. Rollouts + parser.add_argument( + "--rollouts-per-example", + "-r", + type=int, + default=None, + help="Number of rollouts per example", + ) + + # 7. Concurrency + parser.add_argument( + "--max-concurrent", + "-c", + type=int, + default=32, + help="Maximum number of concurrent requests", + ) + + # 8. GEPA budget (mutually exclusive) + budget_group = parser.add_mutually_exclusive_group(required=True) + budget_group.add_argument( + "--budget", + "-B", + choices=["light", "medium", "heavy"], + help="Budget preset: light (~6 candidates), medium (~12), heavy (~18)", + ) + budget_group.add_argument( + "--max-metric-calls", type=int, help="Maximum total metric calls budget" + ) + + # 9. GEPA configuration + parser.add_argument( + "--components", + nargs="+", + default=["system_prompt"], + help="Components to optimize (default: system_prompt)", + ) + parser.add_argument( + "--reflection-model", + default="gpt-5-mini", + help="Model for reflection/proposal (default: gpt-5-mini)", + ) + parser.add_argument( + "--reflection-temperature", + type=float, + default=1.0, + help="Temperature for reflection model (default: 1.0)", + ) + parser.add_argument( + "--reflection-base-url", + default=None, + help="Base URL for reflection model API (default: task client base URL)", + ) + parser.add_argument( + "--reflection-api-key-var", + default="OPENAI_API_KEY", + help="Env var that stores the reflection API key (default: OPENAI_API_KEY)", + ) + parser.add_argument( + "--reflection-max-tokens", + type=int, + default=8000, + help="Max tokens for reflection completions (default: 8000)", + ) + parser.add_argument( + "--reflection-minibatch-size", + type=int, + default=35, + help="Number of examples per reflection step (default: 35)", + ) + + # 10. Output/Logging + parser.add_argument( + "--save-results", + "-s", + default=False, + action="store_true", + help="Save rollout trajectories to disk", + ) + parser.add_argument( + "--save-every", + "-f", + type=int, + default=-1, + help="Save rollout trajectories every n evaluations during optimization", + ) + parser.add_argument( + "--track-stats", + action="store_true", + help="Track detailed optimization statistics", + ) + parser.add_argument( + "--verbose", "-v", action="store_true", help="Enable verbose logging" + ) + parser.add_argument( + "--seed", + type=int, + default=42, + help="Random seed for reproducibility (default: 42)", + ) + + # 11. Experiment tracking - wandb + parser.add_argument( + "--use-wandb", + action="store_true", + help="Enable wandb logging", + ) + parser.add_argument( + "--wandb-project", + type=str, + default=None, + help="Wandb project name", + ) + parser.add_argument( + "--wandb-entity", + type=str, + default=None, + help="Wandb entity/team name", + ) + parser.add_argument( + "--wandb-name", + type=str, + default=None, + help="Wandb run name (default: auto-generated from env_id)", + ) + parser.add_argument( + "--wandb-api-key-var", + type=str, + default="WANDB_API_KEY", + help="Environment variable containing wandb API key (default: WANDB_API_KEY)", + ) + parser.add_argument( + "--wandb-init-kwargs", + type=json.loads, + default=None, + help='Additional wandb.init() kwargs as JSON (e.g., \'{"tags": ["gepa"], "mode": "offline"}\')', + ) + + # 12. Experiment tracking - mlflow + parser.add_argument( + "--use-mlflow", + action="store_true", + help="Enable mlflow logging", + ) + parser.add_argument( + "--mlflow-tracking-uri", + type=str, + default=None, + help="MLflow tracking server URI", + ) + parser.add_argument( + "--mlflow-experiment-name", + type=str, + default=None, + help="MLflow experiment name", + ) + + args = parser.parse_args() + + # Parse env_args + try: + env_args = json.loads(args.env_args) + if not isinstance(env_args, dict): + raise TypeError("env args must be a JSON object") + except (json.JSONDecodeError, TypeError) as exc: + raise ValueError( + "--env-args must be valid JSON representing a dictionary" + ) from exc + + # Parse headers + task_client_headers: dict[str, str] | None = None + if args.headers: + task_client_headers = {} + for header in args.headers: + if ":" not in header: + raise ValueError( + "Headers must be provided in the format 'Name: Value'." + ) + key, value = header.split(":", 1) + task_client_headers[key.strip()] = value.strip() + + # Setup logging + setup_logging("DEBUG" if args.verbose else "INFO") + + # Silence noisy third-party loggers + logging.getLogger("openai").setLevel(logging.WARNING) + logging.getLogger("httpcore").setLevel(logging.WARNING) + logging.getLogger("httpx").setLevel(logging.WARNING) + + logger.info(f"Starting GEPA optimization for environment: {args.env_id}") + logger.info(f"Components to optimize: {args.components}") + + if args.save_every > 0 and not args.save_results: + logger.warning("--save-every is ignored unless --save-results is set") + + # Apply defaults: CLI > env pyproject.toml > hardcoded + env_defaults = get_env_gepa_defaults(args.env_id) + num_examples = ( + args.num_examples + if args.num_examples is not None + else env_defaults.get("num_examples", DEFAULT_NUM_EXAMPLES) + ) + num_val = ( + args.num_val + if args.num_val is not None + else env_defaults.get("num_val", DEFAULT_NUM_VAL) + ) + rollouts_per_example = ( + args.rollouts_per_example + if args.rollouts_per_example is not None + else env_defaults.get("rollouts_per_example", DEFAULT_ROLLOUTS_PER_EXAMPLE) + ) + + # Log sources + if args.num_examples is None: + source = "pyproject.toml" if "num_examples" in env_defaults else "default" + logger.debug(f"Using num_examples={num_examples} from {source}") + if args.num_val is None: + source = "pyproject.toml" if "num_val" in env_defaults else "default" + logger.debug(f"Using num_val={num_val} from {source}") + if args.rollouts_per_example is None: + source = ( + "pyproject.toml" if "rollouts_per_example" in env_defaults else "default" + ) + logger.debug(f"Using rollouts_per_example={rollouts_per_example} from {source}") + + # Load endpoints and resolve model config + endpoints = load_endpoints(args.endpoints_path) + if args.model in endpoints: + task_api_key_var = endpoints[args.model]["key"] + task_api_base_url = endpoints[args.model]["url"] + args.model = endpoints[args.model]["model"] + logger.debug(f"Using endpoint configuration for task model '{args.model}'") + else: + logger.debug(f"Task model '{args.model}' not in registry, using CLI args") + task_api_key_var = args.api_key_var + task_api_base_url = args.api_base_url + + # Also check reflection model + if args.reflection_model in endpoints: + reflection_api_key_var = endpoints[args.reflection_model]["key"] + reflection_base_url = endpoints[args.reflection_model]["url"] + args.reflection_model = endpoints[args.reflection_model]["model"] + logger.debug(f"Using endpoint for reflection model '{args.reflection_model}'") + else: + reflection_api_key_var = args.reflection_api_key_var + reflection_base_url = args.reflection_base_url + + # Merge sampling args with precedence to JSON payload + merged_sampling_args: dict = {} + if args.sampling_args is not None: + merged_sampling_args.update(args.sampling_args) + if "max_tokens" not in merged_sampling_args: + merged_sampling_args["max_tokens"] = args.max_tokens + if args.temperature is not None and "temperature" not in merged_sampling_args: + merged_sampling_args["temperature"] = args.temperature + + # Ensure local environments directory is available for imports + ensure_env_dir_on_path(args.env_dir_path, args.env_id) + + # Setup client config + client_config_kwargs = { + "api_key_var": task_api_key_var, + "api_base_url": task_api_base_url, + } + if task_client_headers is not None: + client_config_kwargs["extra_headers"] = task_client_headers + + client_config = ClientConfig(**client_config_kwargs) + + # Load environment + vf_env = vf.load_environment(env_id=args.env_id, **env_args) + + # Prepare datasets + logger.info(f"Loading {num_examples} training examples") + logger.info(f"Loading {num_val} validation examples") + if vf_env.eval_dataset is not None: + train_dataset_raw = vf_env.get_dataset(n=num_examples, seed=args.seed) + val_dataset_raw = vf_env.get_eval_dataset(n=num_val, seed=args.seed + 1) + else: + total_requested = max(num_examples, 0) + max(num_val, 0) + base_dataset = vf_env.get_dataset(n=total_requested, seed=args.seed) + base_examples = ( + base_dataset.to_list() + if hasattr(base_dataset, "to_list") + else list(base_dataset) + ) + train_dataset_raw = ( + base_examples[:num_examples] if num_examples > 0 else base_examples + ) + val_dataset_raw = ( + base_examples[num_examples : num_examples + num_val] if num_val > 0 else [] + ) + logger.debug( + "Eval dataset missing; derived %s validation examples from train split", + len(val_dataset_raw), + ) + + trainset = prepare_gepa_dataset(train_dataset_raw) + valset = prepare_gepa_dataset(val_dataset_raw) + + if num_examples > 0 and not trainset: + raise ValueError( + "Training dataset is empty - check environment configuration and filters" + ) + if num_val > 0 and not valset: + raise ValueError( + "Validation dataset is empty - check environment configuration and filters" + ) + + logger.info(f"Training set: {len(trainset)} examples") + logger.info(f"Validation set: {len(valset)} examples") + + # Get reflection API key + reflection_api_key = os.getenv(reflection_api_key_var) + if not reflection_api_key: + raise ValueError( + f"{reflection_api_key_var} environment variable not set for reflection client" + ) + + # Use resolved reflection_base_url or fall back to task client base URL + if not reflection_base_url: + reflection_base_url = task_api_base_url + + # Extract seed candidate (initial component values) + seed_candidate = {} + for comp in args.components: + if comp == "tool_descriptions": + # Extract tool descriptions + if hasattr(vf_env, "oai_tools") and vf_env.oai_tools: + for i, tool in enumerate(vf_env.oai_tools): + seed_candidate[f"tool_{i}_description"] = tool["function"][ + "description" + ] + elif hasattr(vf_env, comp): + seed_candidate[comp] = getattr(vf_env, comp) + else: + raise ValueError( + f"Environment '{args.env_id}' does not have component '{comp}'. " + f"Available components: system_prompt, tool_descriptions" + ) + + if not seed_candidate: + raise ValueError( + f"No valid components found to optimize for environment '{args.env_id}'" + ) + + # Convert budget preset to max_metric_calls if needed + if args.budget: + max_metric_calls = auto_budget_to_metric_calls( + auto=args.budget, + num_components=len(seed_candidate), + valset_size=len(valset), + minibatch_size=args.reflection_minibatch_size, + ) + else: + max_metric_calls = args.max_metric_calls + + logger.info(f"Budget: {max_metric_calls} metric calls total") + + # Build GEPA config + gepa_config = GEPAConfig( + # environment + env_id=args.env_id, + env_args=env_args, + env_dir_path=args.env_dir_path, + # task model + model=args.model, + client_config=client_config, + sampling_args=merged_sampling_args, + # reflection model + reflection_model=args.reflection_model, + reflection_api_key=reflection_api_key, + reflection_base_url=reflection_base_url, + reflection_temperature=args.reflection_temperature, + reflection_max_tokens=args.reflection_max_tokens, + reflection_minibatch_size=args.reflection_minibatch_size, + # datasets + num_examples=num_examples, + num_val=num_val, + rollouts_per_example=rollouts_per_example, + trainset=trainset, + valset=valset, + # optimization + components_to_optimize=args.components, + seed_candidate=seed_candidate, + max_metric_calls=max_metric_calls, + # execution + max_concurrent=args.max_concurrent, + seed=args.seed, + # output + save_results=args.save_results, + save_every=args.save_every, + track_stats=args.track_stats, + verbose=args.verbose, + # experiment tracking + use_wandb=args.use_wandb, + wandb_api_key_var=args.wandb_api_key_var, + wandb_project=args.wandb_project, + wandb_entity=args.wandb_entity, + wandb_name=args.wandb_name, + wandb_init_kwargs=args.wandb_init_kwargs, + use_mlflow=args.use_mlflow, + mlflow_tracking_uri=args.mlflow_tracking_uri, + mlflow_experiment_name=args.mlflow_experiment_name, + ) + + # Run GEPA optimization + asyncio.run(run_gepa_optimization(gepa_config)) + + +if __name__ == "__main__": + main() diff --git a/verifiers/types.py b/verifiers/types.py index 1a3125075..f9e56c5a7 100644 --- a/verifiers/types.py +++ b/verifiers/types.py @@ -104,6 +104,7 @@ class State(dict): reward: float | None advantage: float | None metrics: dict[str, float] | None + feedbacks: list[str] | None timing: RolloutTiming | None def __getitem__(self, key: str) -> Any: @@ -174,6 +175,18 @@ class RolloutScore(TypedDict): metrics: dict[str, float] +class RewardResult(TypedDict, total=False): + """Result from a reward function with optional feedback. + + Reward functions can return either: + - float: backward compatible (no feedback) + - RewardResult: {"score": float, "feedback": str} + """ + + score: float # required + feedback: str # optional + + class RolloutScores(TypedDict): """TypedDict for rubric outputs.""" @@ -234,3 +247,51 @@ class EvalConfig(BaseModel): save_every: int = -1 save_to_hf_hub: bool = False hf_hub_dataset_name: str | None = None + + +class GEPAConfig(BaseModel): + """Pydantic model for GEPA optimization configuration.""" + + # environment + env_id: str + env_args: dict + env_dir_path: str + # task model + model: str + client_config: ClientConfig + sampling_args: SamplingArgs + # reflection model + reflection_model: str + reflection_api_key: str + reflection_base_url: str + reflection_temperature: float + reflection_max_tokens: int + reflection_minibatch_size: int + # datasets + num_examples: int + num_val: int + rollouts_per_example: int + trainset: list[dict] + valset: list[dict] + # optimization + components_to_optimize: list[str] + seed_candidate: dict[str, str] + max_metric_calls: int + # execution + max_concurrent: int + seed: int + # output + save_results: bool + save_every: int + track_stats: bool + verbose: bool + # experiment tracking + use_wandb: bool = False + wandb_api_key_var: str = "WANDB_API_KEY" + wandb_project: str | None = None + wandb_entity: str | None = None + wandb_name: str | None = None + wandb_init_kwargs: dict | None = None + use_mlflow: bool = False + mlflow_tracking_uri: str | None = None + mlflow_experiment_name: str | None = None diff --git a/verifiers/utils/path_utils.py b/verifiers/utils/path_utils.py index 6ab89923b..70547e132 100644 --- a/verifiers/utils/path_utils.py +++ b/verifiers/utils/path_utils.py @@ -1,17 +1,18 @@ import uuid from pathlib import Path -from verifiers.types import EvalConfig +from verifiers.types import EvalConfig, GEPAConfig def get_results_path( env_id: str, model: str, base_path: Path = Path("./outputs"), + subdir: str = "evals", ) -> Path: uuid_str = str(uuid.uuid4())[:8] env_model_str = f"{env_id}--{model.replace('/', '--')}" - return base_path / "evals" / env_model_str / uuid_str + return base_path / subdir / env_model_str / uuid_str def get_eval_results_path(config: EvalConfig) -> Path: @@ -20,8 +21,21 @@ def get_eval_results_path(config: EvalConfig) -> Path: if local_env_dir.exists(): base_path = local_env_dir / "outputs" - results_path = get_results_path(config.env_id, config.model, base_path) + results_path = get_results_path(config.env_id, config.model, base_path, "evals") else: base_path = Path("./outputs") - results_path = get_results_path(config.env_id, config.model, base_path) + results_path = get_results_path(config.env_id, config.model, base_path, "evals") + return results_path + + +def get_gepa_results_path(config: GEPAConfig) -> Path: + module_name = config.env_id.replace("-", "_") + local_env_dir = Path(config.env_dir_path) / module_name + + if local_env_dir.exists(): + base_path = local_env_dir / "outputs" + results_path = get_results_path(config.env_id, config.model, base_path, "gepa") + else: + base_path = Path("./outputs") + results_path = get_results_path(config.env_id, config.model, base_path, "gepa") return results_path