|
| 1 | +from __future__ import annotations |
| 2 | +import asyncio, json, os, signal |
| 3 | +from dataclasses import dataclass |
| 4 | +from typing import Any, Dict, List, Optional |
| 5 | +from .providers import OllamaProvider, OpenAIStyleProvider |
| 6 | +from .metrics import exact_match, token_f1, bleu, rouge_l, mc_accuracy |
| 7 | +from .monitor import ResourceMonitor |
| 8 | +from .util import stopwatch, write_jsonl |
| 9 | + |
| 10 | +# optional heavy deps are imported lazily inside run_bench |
| 11 | + |
| 12 | + |
| 13 | +@dataclass |
| 14 | +class BenchConfig: |
| 15 | + run_name: str |
| 16 | + provider: Dict[str, Any] |
| 17 | + io: Dict[str, Any] |
| 18 | + prompt: Dict[str, Any] |
| 19 | + limits: Dict[str, Any] |
| 20 | + load: Dict[str, Any] |
| 21 | + metrics: Dict[str, Any] |
| 22 | + |
| 23 | + |
| 24 | +async def _load_provider(cfg: Dict[str, Any]): |
| 25 | + kind = cfg.get('kind') |
| 26 | + if kind == 'ollama': |
| 27 | + return OllamaProvider(cfg['base_url'], cfg['model']) |
| 28 | + if kind == 'openai': |
| 29 | + return OpenAIStyleProvider(cfg['api_base'], cfg['model'], cfg.get('api_key')) |
| 30 | + if kind == 'mock': |
| 31 | + from .providers import MockProvider |
| 32 | + return MockProvider() |
| 33 | + raise ValueError(f"Unknown provider kind: {kind}") |
| 34 | + |
| 35 | + |
| 36 | +async def _worker(name: str, queue: asyncio.Queue, provider, sysmsg, tmpl, options, results, norm): |
| 37 | + while True: |
| 38 | + item = await queue.get() |
| 39 | + if item is None: |
| 40 | + queue.task_done(); break |
| 41 | + rec = item |
| 42 | + prompt = tmpl.format(**rec) |
| 43 | + with stopwatch() as sw: |
| 44 | + out = await provider.generate(prompt, system=sysmsg, options=options) |
| 45 | + latency_s = out.get('latency_s') or sw() |
| 46 | + text = out['output'].strip() |
| 47 | + row = { |
| 48 | + 'id': rec.get('id'), |
| 49 | + 'latency_s': latency_s, |
| 50 | + 'output': text, |
| 51 | + 'target': rec.get('target'), |
| 52 | + 'prompt_eval_count': out.get('prompt_eval_count'), |
| 53 | + 'eval_count': out.get('eval_count'), |
| 54 | + } |
| 55 | + # accuracy |
| 56 | + if 'choices' in rec and rec.get('answer') is not None: |
| 57 | + row['acc'] = mc_accuracy(text, rec['answer'], rec['choices'], norm) |
| 58 | + if rec.get('target') is not None: |
| 59 | + row['em'] = exact_match(text, rec['target'], norm) |
| 60 | + row['f1'] = token_f1(text, rec['target'], norm) |
| 61 | + b = bleu(text, rec['target']) |
| 62 | + if b is not None: row['bleu'] = b |
| 63 | + r = rouge_l(text, rec['target']) |
| 64 | + if r is not None: row['rougeL'] = r |
| 65 | + results.append(row) |
| 66 | + queue.task_done() |
| 67 | + |
| 68 | + |
| 69 | +async def run_bench(config_path: str): |
| 70 | + import yaml |
| 71 | + cfg = BenchConfig(**yaml.safe_load(open(config_path))) |
| 72 | + provider = await _load_provider(cfg.provider) |
| 73 | + |
| 74 | + try: |
| 75 | + import orjson |
| 76 | + except Exception: |
| 77 | + orjson = None |
| 78 | + |
| 79 | + # load jsonl as text, skip empty lines |
| 80 | + rows = [] |
| 81 | + if orjson: |
| 82 | + with open(cfg.io['dataset_path'], 'r', encoding='utf8') as _f: |
| 83 | + for line in _f: |
| 84 | + line = line.strip() |
| 85 | + if not line: |
| 86 | + continue |
| 87 | + rows.append(orjson.loads(line)) |
| 88 | + else: |
| 89 | + import json as _json |
| 90 | + with open(cfg.io['dataset_path'], 'r', encoding='utf8') as _f: |
| 91 | + for line in _f: |
| 92 | + line = line.strip() |
| 93 | + if not line: |
| 94 | + continue |
| 95 | + rows.append(_json.loads(line)) |
| 96 | + if cfg.limits.get('max_samples'): |
| 97 | + rows = rows[: int(cfg.limits['max_samples'])] |
| 98 | + |
| 99 | + q = asyncio.Queue() |
| 100 | + for r in rows: |
| 101 | + await q.put(r) |
| 102 | + for _ in range(cfg.load.get('concurrency', 1)): |
| 103 | + await q.put(None) |
| 104 | + |
| 105 | + results: List[Dict[str, Any]] = [] |
| 106 | + |
| 107 | + mon = ResourceMonitor(os.getpid(), interval_s=0.2) |
| 108 | + mon.start() |
| 109 | + |
| 110 | + workers = [ |
| 111 | + asyncio.create_task( |
| 112 | + _worker(f"w{i}", q, provider, cfg.prompt.get('system'), cfg.prompt.get('template', '{input}'), cfg.provider.get('options'), results, cfg.metrics.get('normalization')) |
| 113 | + ) |
| 114 | + for i in range(cfg.load.get('concurrency', 1)) |
| 115 | + ] |
| 116 | + await asyncio.gather(*workers) |
| 117 | + mon.stop() |
| 118 | + |
| 119 | + # write per-sample |
| 120 | + out_prefix = cfg.io['output_prefix'] |
| 121 | + write_jsonl(f"{out_prefix}.jsonl", results) |
| 122 | + |
| 123 | + try: |
| 124 | + import pandas as pd |
| 125 | + df = pd.DataFrame(results) |
| 126 | + except Exception: |
| 127 | + pd = None |
| 128 | + df = None |
| 129 | + agg = { |
| 130 | + 'latency_s': ['mean', 'p50', 'p95', 'min', 'max'], |
| 131 | + 'em': 'mean', |
| 132 | + 'f1': 'mean', |
| 133 | + 'acc': 'mean', |
| 134 | + 'bleu': 'mean', |
| 135 | + 'rougeL': 'mean', |
| 136 | + 'eval_count': 'mean', |
| 137 | + } |
| 138 | + # Custom percentiles |
| 139 | + # safe summary computation if pandas isn't available |
| 140 | + if df is not None: |
| 141 | + qtiles = df['latency_s'].quantile([0.5, 0.95]).to_dict() if 'latency_s' in df else {} |
| 142 | + n_samples = len(df) |
| 143 | + latency_mean = float(df['latency_s'].mean()) if 'latency_s' in df else None |
| 144 | + latency_p50 = float(qtiles.get(0.5)) if qtiles else None |
| 145 | + latency_p95 = float(qtiles.get(0.95)) if qtiles else None |
| 146 | + throughput = len(df) / df['latency_s'].sum() if 'latency_s' in df else None |
| 147 | + em_mean = float(df['em'].mean()) if 'em' in df else None |
| 148 | + f1_mean = float(df['f1'].mean()) if 'f1' in df else None |
| 149 | + acc_mean = float(df['acc'].mean()) if 'acc' in df else None |
| 150 | + bleu_mean = float(df['bleu'].mean()) if 'bleu' in df else None |
| 151 | + rouge_mean = float(df['rougeL'].mean()) if 'rougeL' in df else None |
| 152 | + tokens_mean = float(df['eval_count'].mean()) if 'eval_count' in df else None |
| 153 | + else: |
| 154 | + n_samples = len(results) |
| 155 | + latency_mean = latency_p50 = latency_p95 = throughput = None |
| 156 | + em_mean = f1_mean = acc_mean = bleu_mean = rouge_mean = tokens_mean = None |
| 157 | + |
| 158 | + summ = { |
| 159 | + 'n_samples': n_samples, |
| 160 | + 'latency_mean_s': latency_mean, |
| 161 | + 'latency_p50_s': latency_p50, |
| 162 | + 'latency_p95_s': latency_p95, |
| 163 | + 'throughput_req_per_s': throughput, |
| 164 | + 'em_mean': em_mean, |
| 165 | + 'f1_mean': f1_mean, |
| 166 | + 'acc_mean': acc_mean, |
| 167 | + 'bleu_mean': bleu_mean, |
| 168 | + 'rougeL_mean': rouge_mean, |
| 169 | + 'tokens_out_mean': tokens_mean, |
| 170 | + } |
| 171 | + try: |
| 172 | + if pd is not None: |
| 173 | + pd.DataFrame([summ]).to_csv(f"{out_prefix}_summary.csv", index=False) |
| 174 | + else: |
| 175 | + # fallback: write minimal csv using stdlib |
| 176 | + import csv |
| 177 | + with open(f"{out_prefix}_summary.csv", 'w', newline='', encoding='utf8') as _f: |
| 178 | + w = csv.DictWriter(_f, fieldnames=list(summ.keys())) |
| 179 | + w.writeheader() |
| 180 | + w.writerow(summ) |
| 181 | + except Exception: |
| 182 | + pass |
| 183 | + |
| 184 | + # memory timeline: try pandas, else write CSV via stdlib |
| 185 | + try: |
| 186 | + import pandas as pd |
| 187 | + mem_df = pd.DataFrame([s.__dict__ for s in mon.samples]) |
| 188 | + mem_df.to_csv(f"{out_prefix}_resources.csv", index=False) |
| 189 | + except Exception: |
| 190 | + import csv |
| 191 | + fields = ['t_s', 'cpu_pct', 'rss_mb', 'gpu_util_pct', 'vram_mb'] |
| 192 | + with open(f"{out_prefix}_resources.csv", 'w', newline='', encoding='utf8') as _f: |
| 193 | + w = csv.DictWriter(_f, fieldnames=fields) |
| 194 | + w.writeheader() |
| 195 | + for s in mon.samples: |
| 196 | + w.writerow(s.__dict__) |
| 197 | + |
| 198 | + # markdown report |
| 199 | + with open(f"{out_prefix}_report.md", 'w') as f: |
| 200 | + f.write("# LLM Bench Report\n\n") |
| 201 | + for k, v in summ.items(): |
| 202 | + f.write(f"- **{k}**: {v}\n") |
| 203 | + f.write("\n## Notes\n- Latency is end‑to‑end per request.\n- Throughput is requests/sec at measured concurrency.\n- See resources.csv for CPU/GPU timeline.\n") |
0 commit comments