diff --git a/pyk/src/pyk/concolic/DESIGN.md b/pyk/src/pyk/concolic/DESIGN.md new file mode 100644 index 0000000000..6a5e55589a --- /dev/null +++ b/pyk/src/pyk/concolic/DESIGN.md @@ -0,0 +1,225 @@ +# Concolic execution — design + +This document records the design of the `pyk.concolic` engine and the backend work it depends on. +It is the agreed target (**v1**); the code currently in the package is an intermediate **v0** (see +*Current status*). + +## Goals + +1. **Accelerate symbolic execution.** Use a fast, deterministic *concrete* run to pin down a single + path, so the symbolic side does not search for applicable rules, fork at every branch, or spend + an SMT call per branch deciding feasibility. +2. **Bug / assertion reproduction.** Systematically generate concrete inputs that drive execution + down every feasible path until a target is hit (a stuck state, a failing assertion, a target cell + pattern); the input that reaches it is the witness. + +## Background: the two engines today + +K already provides both halves concolic needs, against one kompiled definition: + +- **Concrete:** the LLVM backend (directly, or inside Booster) rewrites ground terms fast. +- **Symbolic:** the Haskell backend / Booster, exposed over Kore JSON-RPC (`execute`, `simplify`, + `implies`, `get-model`), keeps inputs symbolic and branches when a guard is undecided. + +Crucially, rule identities (`rule_id`, a per-rule hash) are shared across these backends because they +come from the same kompiled definition. This is what lets a trace recorded on one side drive the +other. + +## Core idea: reuse the concrete trace + +A concrete run collapses input-derived values to concretes, so it cannot, by itself, recover a +*symbolic* path condition (`if (10 <= 5)` does not record that the `10` came from input `N`). The +symbolic input must be propagated somewhere. Instead of re-running a full *search-based* symbolic +execution (which forks and prunes), we **replay the exact rule sequence the concrete run took**, +applying those rules to a symbolic configuration. The path is already known, so the replay is +deterministic — no search, no forking, no per-branch feasibility SMT. + +``` + CONCRETE PASS (ground input, deterministic — no fork) + execute(config[N := 10]) -> ordered rule-id trace: [.. , var, <=, if-false, assign, ..] + │ reuse the rule sequence + ▼ + SYMBOLIC REPLAY (N stays symbolic; apply exactly those rules) + at each branch: the fired rule's guard, instantiated over N, is the path constraint + NO search, NO fork, NO per-branch SMT +``` + +## The path-exploration algorithm (DART, remainder-driven) + +A program's executions form a tree: every input-dependent branch splits it. Each root→leaf path is +one *path condition*; the goal is one concrete input per feasible leaf. + +``` + n <= 5 ? + / \ + true (N<=5) false (¬N<=5) + | | + n <= 10 ? n <= 10 ? + / \ / \ + true false true false + N<=5∧N<=10 N<=5∧¬N<=10 ¬N<=5∧N<=10 ¬N<=5∧¬N<=10 + N=3 ✓ UNSAT ✗ N=7 ✓ N=11 ✓ +``` + +(example: `if (n<=5) {a=1;} else {a=0;} if (n<=10) {b=1;} else {b=0;}`) + +### `ck` and `rk` + +At the k-th branch node, the backend applies the rule `R` the concrete trace selected and returns two +**predicates over the symbolic input** (not concrete values): + +| symbol | name | meaning | example (if-false taken, N=10) | +|---|---|---|---| +| `ck` | applied | the condition under which `R` fires (its instantiated `requires` / match) | `ck = ¬(N <=Int 5)` | +| `rk` | remainder | the subspace where `R` does **not** fire — the sibling branch(es) | `rk = (N <=Int 5)` | + +- For a binary boolean branch, `rk = ¬ck`. For multi-way / `owise`, `rk` is the union of the other + siblings — which is why the backend should compute it rather than the engine negating `ck`. +- `ck`/`rk` are *formulas*; a concrete value comes only from solving them. + +### The loop + +``` +worklist = [seed_input]; seen = {} # seen keyed by path-condition signature +while worklist and budget remains: + input = worklist.pop() + rule_trace = concrete_run(init, input) # ordered rule_ids (deterministic) + prefix = [] # accumulated applied conditions + for each step k in guided_replay(init, rule_trace): # symbolic, no fork + ck, rk = step.applied, step.remainder + prefix.append(ck) + if rk is satisfiable: # this is a branch node + N' = solve(prefix_{ + rule-ids: [id0, id1, ...] # the concrete trace's rule sequence + +per applied rule, return: + rewritten-term: + applied-predicate: ck # R's instantiated guard (accumulate into the path condition) + remainder-predicate: rk # subspace where R does not apply (sibling branches) + +final: rewritten-term after the last rule +``` + +Semantics: for each id, match `R`'s LHS against the current term, rewrite, and return `ck`/`rk`. **Do +not** search other rules, **do not** branch, **do not** run a feasibility SMT check — feasibility is +witnessed by the concrete run. Booster already computes remainders internally (for `owise` / hand-off +decisions), so returning `rk` is close to free. + +## Engine data flow + +``` +ConcolicEngine + concrete_trace(init, input) -> rule-id sequence (KoreClient.execute, ground, logs) + trace(init, input) -> ConcolicTrace (guided-step replay; collect ck, rk) + for each branch node with SAT rk: queue solve(prefix ∧ rk) (get-model) + explore(init, seed) -> [ConcolicTrace] (worklist + signature dedup) +``` + +Branch discovery and the diverging input both come from the sibling (remainder) predicates `rk` +recorded at each branch node, instead of the engine post-hoc negating the chosen condition. The +sibling predicates are already returned by the existing `execute` (each successor carries its +`rule_predicate`), so **the remainder-driven loop needs no backend change** — see *Current status* and +*Milestones*. The guided-step backend feature is a performance optimization on top of this. + +## Backend changes required + +**Haskell backend / Booster (v1, optimization — not required for correctness):** + +The remainder-driven loop already works on the existing `execute`: at a branch, Booster returns each +successor with its `rule_predicate` (the siblings = `rk`), and it already computes a *remainder +predicate* internally (`Rewrite.hs`: `RewriteBranch` is returned only when a rule group's remainder is +unsatisfiable; a satisfiable remainder currently *aborts*). The optimization: + +- A guided/forced-rule mode on `execute` (extra request field) that, given the concrete trace's rule + sequence, applies the named rule per step **without** the per-branch feasibility SMT (feasibility is + witnessed by the concrete run), and surfaces the `remainder-predicate` Booster already computes + (including the fall-through/`owise` region it currently aborts on) so the engine can also generate + inputs for it. Surface: `kore-rpc-types/Types.hs`, `booster/JsonRpc.hs`, `booster/JsonRpc/Utils.hs`, + `kore/JsonRpc.hs`, plus pyk's `KoreClient`. + +**LLVM backend (v2 / v3, optional optimizations):** + +- *v2:* expose an LLVM rule-ordinal → `rule_id` (unique-id) mapping (or emit unique-ids in proof + hints) so the concrete trace can be sourced from fast LLVM proof hints instead of a Booster ground + run. Hints already carry rule events (ordinal + substitution) and side-condition results. +- *v3:* a full concolic mode (symbolic shadow / taint) in the LLVM interpreter that emits the symbolic + path condition during a single concrete run — removes Haskell from the forward path entirely. + +## Milestones + +- **v1 (done, no backend change):** concrete pass via ground `execute`; symbolic replay via existing + `execute`, selecting the branch by matching `rule_id` against the concrete trace; **remainder-driven** + diverging-input generation (`diverging_inputs`) using the sibling `rule_predicate`s returned at each + branch. Works on legacy + Booster; the backend still forks/checks feasibility internally. +- **v1.1 (optimization):** guided/forced-rule mode on `execute` so the forward replay skips per-branch + feasibility SMT and surfaces Booster's remainder (incl. the `owise` fall-through it currently aborts on). +- **v2:** LLVM proof hints (+ ordinal→id) as the concrete-trace source for a faster concrete pass. +- **v3:** LLVM concolic mode (Option B). + +Suggested next implementation step (v1.1): a spike confirming the forced-rule + remainder contract on +Booster on the IMP example, then wiring the engine's replay onto it. + +## Current status (v1, remainder-driven) + +`_engine.py` implements: `concrete_trace` (ground run → rule-id sequence), `trace` (symbolic replay +selecting branches via `rule_id` matching, recording each chosen condition `ck` and the sibling +remainder conditions `rk`), `diverging_inputs` (solve `prefix ∧ rk` per sibling), and `explore` +(worklist + signature dedup). It is +validated by `src/tests/unit/test_concolic.py` (trace-matching logic) and +`src/tests/integration/concolic/test_imp_concolic.py` (end-to-end on IMP, single- and two-branch +programs, legacy + Booster — the two-branch test confirms exactly the three feasible leaves are found +and the UNSAT one is pruned). The next (v1.1) step replaces the internal `execute`-and-select with the +forced-rule mode so the forward replay drops per-branch feasibility SMT. + +## Scope & limitations + +- A path's feasibility is established by a real concrete run, so branch selection needs no SMT; only + spawning a diverging input does. +- Loops/recursion make the path tree unbounded; exploration is bounded by `max_step`, `max_branches`, + and `max_iterations`. +- v1 still relies on the symbolic backend for the forward replay (one guided step per rule). Removing + that dependency is v3 (Option B). diff --git a/pyk/src/pyk/concolic/README.md b/pyk/src/pyk/concolic/README.md new file mode 100644 index 0000000000..7ad7f6f290 --- /dev/null +++ b/pyk/src/pyk/concolic/README.md @@ -0,0 +1,202 @@ +# Concolic execution (proof-of-concept) + +This package is a **proof-of-concept** concolic (concrete + symbolic) execution engine +built on top of pyk's existing symbolic-execution interface, `CTermSymbolic`. + +> **Design & roadmap:** see [`DESIGN.md`](DESIGN.md) for the target architecture (the +> remainder-driven DART loop and the Haskell guided-step backend feature). This README documents +> what is implemented today. + +It does **not** add a new backend. The defining idea is that a fast, deterministic *concrete* +run produces an execution trace, which is then **reused** to drive a *symbolic* run down the +very same path. Because the concrete trace already records which rule fired at every branch, the +symbolic pass selects each branch by **matching rule ids** — it never asks the SMT solver "which +branch does this input take?". The solver is consulted only to synthesize a new input that +diverges from the current path, by solving a path prefix against a sibling (remainder) condition +`rk` recorded at a branch node (the classic DART/SAGE loop). + +## How it works + +Both passes run against the **same** Kore backend, so rule ids share one namespace. Given an +initial symbolic configuration with free *input variables* and a concrete assignment to them: + +1. **`concrete_trace(init, concrete_input)`** — Substitute the concrete input into the + configuration and execute. With the branch-relevant data ground, the run is **deterministic + (no forking)**, and the ordered rule ids applied are harvested from the rewrite logs. This is + the *concrete trace*. + +2. **`trace(init, concrete_input)`** — Execute with the input left **symbolic**. At each branch, + follow the successor whose `rule_id` appears next in the concrete trace, recording its guard + as a path constraint. Branch selection is an O(1) rule-id lookup — **no per-branch SMT / + simplify call**. (This pass uses the lower-level `KoreClient` directly, because the + `CTermSymbolic` wrapper drops `rule_id` from successors.) + +3. **`diverging_inputs(init, trace)`** — For each branch `i` along the recorded path, and for each + sibling (remainder) condition `rk` recorded at that node, build the constraint set + `c_0 & … & c_(i-1) & rk` and ask `CTermSymbolic.get_model` for a satisfying assignment over the + input variables. Each feasible model is a new concrete input that follows the same path up to + branch `i`, then diverges to the sibling branch guarded by `rk`. Infeasible siblings are pruned + automatically (`get_model` returns `None`). Using the backend-provided sibling predicates (rather + than negating `c_i`) handles multi-way branches correctly. + +4. **`explore(init, seed_input)`** — A bounded worklist loop: trace an input, enqueue the inputs + produced by flipping its branches, skip inputs whose path condition was already seen, and stop + after `max_iterations`. The result is one trace per distinct path discovered. + +### Why reuse the trace? + +Without the concrete trace, the symbolic engine would have to *decide* at each branch which side +a given input takes — i.e. substitute the value and call the solver/simplifier per candidate. +Reusing the concrete trace replaces that work with a cheap rule-id lookup, and keeps the +expensive solver on the critical path only once per new input (the flip). + +> **Honest scope:** this reuses the concrete trace for *branch selection*. The symbolic backend +> still computes the branch split itself (it returns both successors at a branch over the RPC); +> we do not yet stop it from forking. Eliminating that forking entirely would require either +> *concretizing* the branch-deciding term or backend-level support to apply a chosen rule — see +> *Scope and limitations*. + +## Architecture + +The engine adds no new backend; it orchestrates pyk's existing `CTermSymbolic` interface, which +talks to a running Kore RPC server (the legacy Haskell backend or Booster) and its SMT solver. + +```mermaid +flowchart TB + subgraph User["Caller / tests"] + T["test_imp_concolic.py / unit tests
build initial symbolic config + seed input"] + end + + subgraph Concolic["pyk.concolic (this package · PoC)"] + E["ConcolicEngine"] + subgraph API["Public methods"] + M0["concrete_trace(init, input)
ground run -> ordered rule-id trace"] + M1["trace(init, input)
symbolic replay, select branch by rule-id"] + M2["diverging_inputs(init, trace)
solve prefix & sibling remainder -> new inputs"] + M3["explore(init, seed_input)
worklist loop + path dedup"] + end + subgraph Helpers["Internal helpers"] + H1["_match_branch
next successor whose rule-id is next in trace"] + H2["_rewrite_rule_ids
harvest rule-ids from rewrite logs"] + H3["_restrict_to_inputs
trim model to input variables"] + end + subgraph Data["Data types"] + D1["ConcolicTrace
(input, path, final, status, rule_trace)"] + D2["PathConstraint
(condition, depth, rule_id)"] + end + E --> API + API --> Helpers + API --> Data + end + + subgraph Pyk["pyk symbolic-execution interface (existing)"] + CS["CTermSymbolic"] + CS3["get_model()
SMT solve -> Subst model (only for flips)"] + CONV["kast_to_kore / kore_to_kast"] + CS --> CS3 & CONV + end + + subgraph Backend["K symbolic backend (Kore RPC)"] + KC["KoreClient (JSON-RPC)
execute() returns next_states w/ rule_id + logs"] + SRV["kore-rpc (legacy Haskell backend)
/ kore-rpc-booster (Booster)"] + Z3["Z3 SMT solver"] + KC --> SRV --> Z3 + end + + T -->|"trace / explore"| E + M0 -->|"execute ground (no fork) + logs"| KC + M1 -->|"execute symbolic, read rule_id"| KC + M2 --> CS3 + E -.->|convert KAST <-> KORE| CONV + CS --> KC + + classDef new fill:#d4f7d4,stroke:#2a8a2a,color:#000; + class Concolic,E,API,Helpers,Data,M0,M1,M2,M3,H1,H2,H3,D1,D2 new; +``` + +## Exploration flow + +```mermaid +flowchart TD + Start(["explore(init, seed_input)"]) --> Init["worklist = [seed_input]
seen_signatures = empty
traces = []"] + Init --> Cond{"worklist non-empty
and len(traces) < max_iterations?"} + Cond -->|no| Done(["return traces
(one per distinct path)"]) + Cond -->|yes| Pop["input = worklist.pop(0)"] + + Pop --> Trace["trace(init, input)"] + + subgraph TraceLoop["trace: concrete pass feeds symbolic replay"] + direction TB + CP["CONCRETE PASS — concrete_trace(init, input)
execute(config[input := value]) ground, deterministic
harvest ordered rule-ids from rewrite logs"] + CP --> RT[/"rule_trace = (r0, r1, ..., r_if-false, ...)"/] + RT --> SP["SYMBOLIC PASS — input stays symbolic
cterm = init, path = []"] + SP --> Ex["KoreClient.execute(cterm) — symbolic"] + Ex --> Br{"next_states?"} + Br -->|"none (terminal/stuck)"| TEnd["status = terminal
return ConcolicTrace"] + Br -->|"one (cut-point)"| Follow["follow that state"] --> Ex + Br -->|"many (branching)"| Sel["_match_branch:
pick successor whose rule_id is the
next matching entry in rule_trace
(no SMT/simplify)"] + Sel --> Pick["path.append(condition, rule_id)
cterm = that successor + its guard"] --> Ex + end + + Trace --> TraceLoop + TraceLoop --> Sig{"trace.signature
already in seen?"} + Sig -->|yes| Cond + Sig -->|no| Record["seen.add(signature)
traces.append(trace)"] + + Record --> Flip["diverging_inputs(init, trace)"] + + subgraph FlipLoop["diverging_inputs: synthesize diverging inputs"] + direction TB + FS["for each branch i, each sibling rk"] --> FC["build constraints:
c0 & ... & c(i-1) & rk"] + FC --> GM["CTermSymbolic.get_model(constraints)
(Z3 solve)"] + GM --> Feas{"satisfiable?"} + Feas -->|yes| Add["trim to input variables
-> new concrete input"] + Feas -->|no| Skip["skip (branch infeasible)"] + end + + Flip --> FlipLoop + FlipLoop --> Enq["enqueue new inputs onto worklist"] + Enq --> Cond + + classDef accent fill:#fff3cd,stroke:#b8860b,color:#000; + class Sel,Pick,FC,GM accent; +``` + +## Example + +For the one-branch IMP program + +``` +if (n <= 5) { found = 1 ; } else { found = 0 ; } +``` + +with `n` symbolic, seeding the engine with `n = 10` (the `else` branch) yields a flipped input +`n ≤ 5` (the `then` branch), and `explore` discovers both feasible paths. See +`src/tests/integration/concolic/test_imp_concolic.py` for the end-to-end version and +`src/tests/unit/test_concolic.py` for a toolchain-free unit test of the orchestration logic. + +## Scope and limitations + +This is a deliberately small PoC: + +- **Single concrete path per trace.** Branch selection assumes branches are mutually exclusive + and that the concrete input determines exactly one (true for ground integer guards like the + IMP example). +- **No coverage metric or smart search heuristic.** `explore` is a plain breadth-first + worklist bounded by `max_iterations`; it is not coverage-guided. +- **No CLI yet.** The engine is a library only. A `kpyk concolic` entry point and a + coverage-guided search strategy are natural follow-ups. +- **Loops/recursion** are bounded only by `max_step` / `max_branches`; unbounded loops are not + handled specially. + +## Running the tests + +``` +# Toolchain-free unit test of the engine orchestration (run from pyk/) +uv run -- pytest src/tests/unit/test_concolic.py + +# End-to-end test (run from pyk/; needs a `kompile` + Kore RPC server matching this source +# tree -- e.g. `kup install k --version v7.1.322`). Exercises both the legacy Haskell backend +# and the Booster server. +uv run -- pytest src/tests/integration/concolic/test_imp_concolic.py +``` diff --git a/pyk/src/pyk/concolic/__init__.py b/pyk/src/pyk/concolic/__init__.py new file mode 100644 index 0000000000..a05089d715 --- /dev/null +++ b/pyk/src/pyk/concolic/__init__.py @@ -0,0 +1,5 @@ +from __future__ import annotations + +from ._engine import ConcolicEngine, ConcolicTrace, PathConstraint + +__all__ = ['ConcolicEngine', 'ConcolicTrace', 'PathConstraint'] diff --git a/pyk/src/pyk/concolic/_engine.py b/pyk/src/pyk/concolic/_engine.py new file mode 100644 index 0000000000..c64f2adba2 --- /dev/null +++ b/pyk/src/pyk/concolic/_engine.py @@ -0,0 +1,357 @@ +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from typing import TYPE_CHECKING + +from ..cterm import CTerm +from ..kast.inner import Subst +from ..kore.rpc import LogRewrite, RewriteSuccess, StopReason + +if TYPE_CHECKING: + from collections.abc import Iterable + + from ..cterm.symbolic import CTermSymbolic + from ..kast.inner import KInner + from ..kore.rpc import ExecuteResult, LogEntry, State + + +_LOGGER = logging.getLogger(__name__) + +# Safety bound on the number of `execute` round-trips per pass, so a runaway loop always terminates. +_MAX_EXECUTE_CALLS = 10_000 + + +@dataclass(frozen=True) +class PathConstraint: + """A single branch taken along an execution path. + + Attributes: + condition: The ML predicate guarding the branch that was taken (e.g. ``{ true #Equals X tuple[str, ...]: + """A hashable fingerprint of the path condition, used to deduplicate explored paths.""" + return tuple(str(pc.condition) for pc in self.path) + + +class ConcolicEngine: + """A proof-of-concept concolic (concrete + symbolic) execution engine on top of `CTermSymbolic`. + + The engine realizes the defining idea of concolic execution: a fast, deterministic *concrete* + run produces an execution trace, which is then *reused* to drive a *symbolic* run down the very + same path. Because the concrete trace already records which rule fired at every branch, the + symbolic pass picks each branch by matching rule ids -- it never asks the SMT solver "which + branch does this input take?". The solver is consulted only when synthesizing a new diverging + input, by solving ``prefix ∧ rk`` where ``rk`` is the sibling (remainder) condition recorded + at a branch node (remainder-driven DART). + + Concretely, for each input the engine runs two passes against the same Kore backend (so rule + ids share one namespace): + + 1. **Concrete pass** (``concrete_trace``): substitute the concrete input into the configuration and + execute. With branch-relevant data ground, execution is deterministic -- no forking -- and the + ordered list of applied rule ids is harvested from the rewrite logs. + 2. **Symbolic pass** (``trace``): execute with the input left symbolic. At each branch, follow the + successor whose rule id appears next in the concrete trace, recording its guard as a path + constraint (``ck``) and the sibling successors' guards as remainder conditions (``rk``). No + per-branch solver call is made. + 3. **Diverging inputs** (``diverging_inputs``): for each branch node ``i``, solve + ``prefix_{ None: + self.cterm_symbolic = cterm_symbolic + self.input_vars = tuple(input_vars) + self.max_step = max_step + self.max_branches = max_branches + self.module_name = module_name + self.assume_remainder_unsat = assume_remainder_unsat + + def concrete_trace(self, init: CTerm, concrete_input: Subst) -> tuple[str, ...]: + """Run `init` concretely under `concrete_input` and return the ordered rule ids applied. + + The concrete input is substituted into the configuration before execution. As long as it + determines every branch, the run is deterministic and the returned trace is the rule-id + sequence used to guide the symbolic pass. + + Args: + init: The initial symbolic configuration, with the `input_vars` left free. + concrete_input: A concrete assignment to the `input_vars`. + + Returns: + The ordered tuple of rule ids applied during the concrete run. + """ + cterm = CTerm(concrete_input(init.config), [concrete_input(c) for c in init.constraints]) + pattern = self.cterm_symbolic.kast_to_kore(cterm.kast) + + rule_ids: list[str] = [] + for _ in range(_MAX_EXECUTE_CALLS): + result = self._execute(pattern) + rule_ids.extend(self._rewrite_rule_ids(result.logs)) + + if result.next_states: + # Branch-relevant data should be ground, so this is unexpected; follow the first + # successor deterministically to stay robust. + _LOGGER.warning('Concrete run branched unexpectedly; following the first successor') + pattern = result.next_states[0].kore + continue + if result.reason is StopReason.DEPTH_BOUND: + pattern = result.state.kore + continue + break + return tuple(rule_ids) + + def trace(self, init: CTerm, concrete_input: Subst) -> ConcolicTrace: + """Symbolically replay `init` along the path that `concrete_input` takes concretely. + + First computes the concrete rule trace, then executes with the input left symbolic, selecting + each branch by matching rule ids against that trace (no per-branch solver call). At each + branch, the sibling (non-chosen) successors' predicates are recorded as remainder conditions + (``rk``) on the ``PathConstraint``, enabling remainder-driven input generation. + + Args: + init: The initial symbolic configuration, with the `input_vars` left free. + concrete_input: A concrete assignment to the `input_vars`. + + Returns: + A `ConcolicTrace` recording the branch conditions taken and the final symbolic state. + """ + rule_trace = self.concrete_trace(init, concrete_input) + trace_idx = 0 + + cterm = init + path: list[PathConstraint] = [] + total_depth = 0 + status = 'depth_bound' + + for _ in range(_MAX_EXECUTE_CALLS): + result = self._execute(self.cterm_symbolic.kast_to_kore(cterm.kast)) + total_depth += result.depth + next_states = result.next_states + + if not next_states: + cterm = self._to_cterm(result.state) + if result.reason is StopReason.DEPTH_BOUND: + continue + status = 'terminal' + break + + if len(next_states) == 1: + # A single successor (e.g. a cut-point rule); follow it without recording a branch. + cterm = self._to_cterm(next_states[0]) + continue + + chosen, siblings, trace_idx = self._match_branch(next_states, rule_trace, trace_idx) + if chosen is None: + _LOGGER.warning('No branch matched the concrete rule trace; stopping trace') + cterm = self._to_cterm(result.state) + status = 'undetermined' + break + + # Collect remainder (rk) conditions from sibling successors. + sibling_conditions: list[KInner] = [] + for ns in siblings: + if ns.rule_predicate is not None: + sibling_conditions.append(self.cterm_symbolic.kore_to_kast(ns.rule_predicate)) + + cterm = self._to_cterm(chosen) + if chosen.rule_predicate is not None: + condition = self.cterm_symbolic.kore_to_kast(chosen.rule_predicate) + cterm = cterm.add_constraint(condition) + path.append( + PathConstraint( + condition=condition, + depth=total_depth, + rule_id=chosen.rule_id, + siblings=tuple(sibling_conditions), + ) + ) + + return ConcolicTrace( + input=concrete_input, + path=tuple(path), + final=cterm, + status=status, + rule_trace=rule_trace, + ) + + def diverging_inputs(self, init: CTerm, trace: ConcolicTrace) -> list[Subst]: + """Synthesize new concrete inputs by solving prefix conditions against sibling (remainder) conditions. + + For each branch node ``i`` along the path, and for each sibling (remainder) condition ``rk`` + recorded at that node, builds the constraint set ``c_0 & ... & c_(i-1) & rk`` and asks the + SMT solver for a satisfying model over the ``input_vars``. Each feasible model is a new + concrete input that follows the same path up to branch ``i``, then diverges to the sibling + branch guarded by ``rk``. + + This remainder-driven approach handles multi-way branches without manual negation and + naturally prunes infeasible siblings (those for which ``get_model`` returns ``None``). + + Args: + init: The same initial symbolic configuration used to produce `trace`. + trace: A previously recorded trace whose sibling branches should be explored. + + Returns: + The list of feasible new inputs; infeasible sibling branches are dropped. + """ + results: list[Subst] = [] + for i, pc in enumerate(trace.path): + prefix = [trace.path[j].condition for j in range(i)] + for sibling in pc.siblings: + constraints = prefix + [sibling] + constrained = init + for constraint in constraints: + constrained = constrained.add_constraint(constraint) + model = self.cterm_symbolic.get_model(constrained, module_name=self.module_name) + if model is None: + _LOGGER.debug(f'Sibling branch at node {i} is infeasible; skipping') + continue + results.append(self._restrict_to_inputs(model)) + return results + + def explore(self, init: CTerm, seed_input: Subst, *, max_iterations: int = 50) -> list[ConcolicTrace]: + """Run the concolic exploration loop, discovering distinct execution paths from a seed input. + + Maintains a worklist of concrete inputs, traces each one, and enqueues the inputs synthesized + by solving prefix conditions against sibling (remainder) conditions -- skipping inputs whose + path condition was already seen. Bounded by `max_iterations` so the loop always terminates. + + Args: + init: The initial symbolic configuration with `input_vars` free. + seed_input: The first concrete input to explore. + max_iterations: Maximum number of traces to run. + + Returns: + One `ConcolicTrace` per distinct path discovered, in the order they were found. + """ + worklist: list[Subst] = [seed_input] + seen_signatures: set[tuple[str, ...]] = set() + traces: list[ConcolicTrace] = [] + + while worklist and len(traces) < max_iterations: + current = worklist.pop(0) + trace = self.trace(init, current) + if trace.signature in seen_signatures: + continue + seen_signatures.add(trace.signature) + traces.append(trace) + + for new_input in self.diverging_inputs(init, trace): + worklist.append(new_input) + + return traces + + def _execute(self, pattern: object) -> ExecuteResult: + # The CTerm-level API drops rule ids from successors, so the trace-guided passes go through + # the lower-level Kore client to access `State.rule_id` and the rewrite logs. + return self.cterm_symbolic._kore_client.execute( + pattern, # type: ignore[arg-type] + max_depth=self.max_step, + module_name=self.module_name, + log_successful_rewrites=True, + assume_remainder_unsat=self.assume_remainder_unsat, + ) + + def _to_cterm(self, state: State) -> CTerm: + return CTerm.from_kast(self.cterm_symbolic.kore_to_kast(state.kore)) + + @staticmethod + def _rewrite_rule_ids(logs: Iterable[LogEntry]) -> list[str]: + return [ + entry.result.rule_id + for entry in logs + if isinstance(entry, LogRewrite) and isinstance(entry.result, RewriteSuccess) + ] + + @staticmethod + def _match_branch( + next_states: Iterable[State], + rule_trace: tuple[str, ...], + trace_idx: int, + ) -> tuple[State | None, list[State], int]: + """Select the branch successor whose rule id appears next in the concrete rule trace. + + Args: + next_states: The successor states returned by a symbolic ``execute`` call at a branch. + rule_trace: The full ordered rule id sequence from the concrete pass. + trace_idx: The current position in ``rule_trace`` (entries before this index are already + consumed by earlier branch decisions). + + Returns: + A triple ``(chosen, siblings, new_idx)`` where ``chosen`` is the matched successor (or + ``None`` if no match was found), ``siblings`` is the list of non-chosen successors, and + ``new_idx`` is the updated trace index (advanced past the matched entry). + """ + states = list(next_states) + candidates = {s.rule_id: s for s in states if s.rule_id is not None} + for k in range(trace_idx, len(rule_trace)): + match = candidates.get(rule_trace[k]) + if match is not None: + siblings = [s for s in states if s is not match] + return match, siblings, k + 1 + return None, [], trace_idx + + def _restrict_to_inputs(self, model: Subst) -> Subst: + return Subst({var: term for var, term in model.items() if var in self.input_vars}) diff --git a/pyk/src/pyk/kore/rpc.py b/pyk/src/pyk/kore/rpc.py index 77ef641a8f..b46311d617 100644 --- a/pyk/src/pyk/kore/rpc.py +++ b/pyk/src/pyk/kore/rpc.py @@ -958,6 +958,7 @@ def execute( *, max_depth: int | None = None, assume_state_defined: bool | None = None, + assume_remainder_unsat: bool | None = None, cut_point_rules: Iterable[str] | None = None, terminal_rules: Iterable[str] | None = None, moving_average_step_timeout: bool | None = None, @@ -970,6 +971,7 @@ def execute( { 'max-depth': max_depth, 'assume-state-defined': assume_state_defined, + 'assume-remainder-unsat': assume_remainder_unsat, 'cut-point-rules': list(cut_point_rules) if cut_point_rules is not None else None, 'terminal-rules': list(terminal_rules) if terminal_rules is not None else None, 'moving-average-step-timeout': moving_average_step_timeout, diff --git a/pyk/src/tests/integration/concolic/__init__.py b/pyk/src/tests/integration/concolic/__init__.py new file mode 100644 index 0000000000..9d48db4f9f --- /dev/null +++ b/pyk/src/tests/integration/concolic/__init__.py @@ -0,0 +1 @@ +from __future__ import annotations diff --git a/pyk/src/tests/integration/concolic/test_concolic_bench.py b/pyk/src/tests/integration/concolic/test_concolic_bench.py new file mode 100644 index 0000000000..7417be42f1 --- /dev/null +++ b/pyk/src/tests/integration/concolic/test_concolic_bench.py @@ -0,0 +1,171 @@ +from __future__ import annotations + +import os +import time +from typing import TYPE_CHECKING, Any + +import pytest + +from pyk.concolic import ConcolicEngine +from pyk.cterm import CTerm +from pyk.kast.inner import KApply, KSequence, KToken, KVariable, Subst +from pyk.kast.prelude.kint import intToken +from pyk.kore.rpc import BranchingResult +from pyk.testing import CTermSymbolicTest, KPrintTest + +from ..utils import K_FILES + +if TYPE_CHECKING: + from pyk.cterm import CTermSymbolic + from pyk.ktool.kprint import KPrint + + +class TestConcolicBenchmark(CTermSymbolicTest, KPrintTest): + """Benchmark scaling the concolic engine over k sequential conditionals.""" + + KOMPILE_MAIN_FILE = K_FILES / 'imp.k' + DISABLE_LEGACY = True + + @staticmethod + def _build_program(k: int) -> str: + """Build a program of k sequential conditionals with monotone thresholds 1..k. + + Args: + k: Number of sequential conditionals. + + Returns: + IMP program string with k if-else statements over variable n. + """ + stmts = [] + for i in range(1, k + 1): + stmts.append(f'if (n <= {i}) {{ x{i} = 1 ; }} else {{ x{i} = 0 ; }}') + return ' '.join(stmts) + + @staticmethod + def _build_state_map(k: int) -> str: + """Build the state map token string for k variables plus n. + + Args: + k: Number of x-variables (x1..xk). + + Returns: + Map token string for parse_token. + """ + parts = ['#token("n","Id") |-> N:Int'] + for i in range(1, k + 1): + parts.append(f'#token("x{i}","Id") |-> 0') + return ' '.join(parts) + + @staticmethod + def config(kprint: KPrint, k: int) -> CTerm: + """Build the initial CTerm configuration for k conditionals. + + Args: + kprint: KPrint instance for parsing tokens. + k: Number of sequential conditionals. + + Returns: + Initial CTerm with symbolic input N. + """ + pgm = TestConcolicBenchmark._build_program(k) + state_str = TestConcolicBenchmark._build_state_map(k) + k_parsed = kprint.parse_token(KToken(pgm, 'Stmt'), as_rule=False) + state_parsed = kprint.parse_token(KToken(state_str, 'Map'), as_rule=True) + return CTerm( + KApply( + '', + KApply( + '', + ( + KApply('', KSequence(k_parsed)), + KApply('', state_parsed), + ), + ), + KVariable('GENERATED_COUNTER_CELL'), + ), + ) + + @pytest.mark.skipif(not os.environ.get('CONCOLIC_BENCH'), reason='benchmark; set CONCOLIC_BENCH=1 to run') + def test_benchmark(self, cterm_symbolic: CTermSymbolic, kprint: KPrint) -> None: + """Measure execute vs get_model cost split across scaling family of branchy IMP programs.""" + # Counters shared across closures via mutable dict + counters: dict[str, Any] = {} + + def reset_counters() -> None: + counters['n_execute'] = 0 + counters['t_execute'] = 0.0 + counters['n_branching'] = 0 + counters['n_get_model'] = 0 + counters['t_get_model'] = 0.0 + + # Save originals before patching + orig_execute = cterm_symbolic._kore_client.execute + orig_get_model = cterm_symbolic.get_model + + def wrapped_execute(*args: Any, **kwargs: Any) -> Any: + counters['n_execute'] += 1 + t0 = time.perf_counter() + result = orig_execute(*args, **kwargs) + counters['t_execute'] += time.perf_counter() - t0 + if isinstance(result, BranchingResult): + counters['n_branching'] += 1 + return result + + def wrapped_get_model(*args: Any, **kwargs: Any) -> Any: + counters['n_get_model'] += 1 + t0 = time.perf_counter() + result = orig_get_model(*args, **kwargs) + counters['t_get_model'] += time.perf_counter() - t0 + return result + + cterm_symbolic._kore_client.execute = wrapped_execute # type: ignore[method-assign] + cterm_symbolic.get_model = wrapped_get_model # type: ignore[method-assign] + + rows: list[tuple[int, int, int, int, float, int, float, float]] = [] + + try: + for k in [1, 2, 3, 4, 5, 6]: + reset_counters() + init = self.config(kprint, k) + engine = ConcolicEngine(cterm_symbolic, input_vars=['N']) + + t0_total = time.perf_counter() + traces = engine.explore(init, Subst({'N': intToken(0)})) + t_total = time.perf_counter() - t0_total + + n_paths = len({t.signature for t in traces}) + assert n_paths == k + 1, ( + f'k={k}: expected {k + 1} paths, got {n_paths}. ' f'traces={[t.signature for t in traces]}' + ) + + rows.append( + ( + k, + n_paths, + counters['n_execute'], + counters['n_branching'], + counters['t_execute'], + counters['n_get_model'], + counters['t_get_model'], + t_total, + ) + ) + finally: + cterm_symbolic._kore_client.execute = orig_execute # type: ignore[method-assign] + cterm_symbolic.get_model = orig_get_model # type: ignore[method-assign] + + # Print markdown table + header = '| k | paths | n_execute | n_branching | t_execute(s) | n_get_model | t_get_model(s) | t_total(s) |' + sep = '|---|-------|-----------|-------------|--------------|-------------|----------------|------------|' + print() + print(header) + print(sep) + for k, n_paths, n_exec, n_branch, t_exec, n_gm, t_gm, t_tot in rows: + print(f'| {k} | {n_paths} | {n_exec} | {n_branch} | {t_exec:.3f} | {n_gm} | {t_gm:.3f} | {t_tot:.3f} |') + + print() + print('Per-k breakdown (% of t_total):') + for k, _n_paths, _n_exec, _n_branch, t_exec, _n_gm, t_gm, t_tot in rows: + exec_pct = 100.0 * t_exec / t_tot if t_tot > 0 else 0.0 + gm_pct = 100.0 * t_gm / t_tot if t_tot > 0 else 0.0 + print(f' k={k}: t_execute={exec_pct:.1f}% t_get_model={gm_pct:.1f}%') diff --git a/pyk/src/tests/integration/concolic/test_imp_concolic.py b/pyk/src/tests/integration/concolic/test_imp_concolic.py new file mode 100644 index 0000000000..1643a282f3 --- /dev/null +++ b/pyk/src/tests/integration/concolic/test_imp_concolic.py @@ -0,0 +1,174 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from pyk.concolic import ConcolicEngine +from pyk.cterm import CTerm +from pyk.kast.inner import KApply, KSequence, KToken, KVariable, Subst +from pyk.kast.prelude.kint import intToken +from pyk.testing import CTermSymbolicTest, KPrintTest + +from ..utils import K_FILES + +if TYPE_CHECKING: + from pyk.cterm import CTermSymbolic + from pyk.ktool.kprint import KPrint + + +# A one-branch IMP program over a single symbolic input ``n``: +# if (n <= 5) { found = 1 ; } else { found = 0 ; } +PGM: str = 'if (n <= 5) { found = 1 ; } else { found = 0 ; }' + + +class TestImpConcolic(CTermSymbolicTest, KPrintTest): + KOMPILE_MAIN_FILE = K_FILES / 'imp.k' + + @staticmethod + def config(kprint: KPrint) -> CTerm: + # ``n`` is the symbolic input variable ``N:Int``; ``found`` is initialized concretely. + # The program has no variables, so it parses in program mode; the state map needs rule + # mode to admit the free variable ``N:Int``. + k_parsed = kprint.parse_token(KToken(PGM, 'Stmt'), as_rule=False) + state_parsed = kprint.parse_token( + KToken('#token("n","Id") |-> N:Int #token("found","Id") |-> 0', 'Map'), as_rule=True + ) + return CTerm( + KApply( + '', + KApply( + '', + ( + KApply('', KSequence(k_parsed)), + KApply('', state_parsed), + ), + ), + KVariable('GENERATED_COUNTER_CELL'), + ), + ) + + @staticmethod + def found_value(kprint: KPrint, cterm: CTerm) -> str: + return kprint.pretty_print(cterm.cell('STATE_CELL')) + + def test_trace_follows_concrete_branch(self, cterm_symbolic: CTermSymbolic, kprint: KPrint) -> None: + # Given + init = self.config(kprint) + engine = ConcolicEngine(cterm_symbolic, input_vars=['N']) + + # When: n = 10 should take the else branch (found = 0) + trace = engine.trace(init, Subst({'N': intToken(10)})) + + # Then + assert trace.status == 'terminal' + assert len(trace.path) == 1 + assert 'found |-> 0' in self.found_value(kprint, trace.final) + + def test_diverging_input_produces_other_branch(self, cterm_symbolic: CTermSymbolic, kprint: KPrint) -> None: + # Given + init = self.config(kprint) + engine = ConcolicEngine(cterm_symbolic, input_vars=['N']) + trace = engine.trace(init, Subst({'N': intToken(10)})) + + # When: use remainder-driven diverging_inputs to synthesize an input for the sibling branch + new_inputs = engine.diverging_inputs(init, trace) + + # Then + assert len(new_inputs) == 1 + new_input = new_inputs[0] + assert 'N' in new_input + new_value = new_input['N'] + assert isinstance(new_value, KToken) + assert int(new_value.token) <= 5 + + # And: re-tracing the diverging input takes the other branch (found = 1) + new_trace = engine.trace(init, new_input) + assert new_trace.signature != trace.signature + assert 'found |-> 1' in self.found_value(kprint, new_trace.final) + + def test_explore_discovers_both_paths(self, cterm_symbolic: CTermSymbolic, kprint: KPrint) -> None: + # Given + init = self.config(kprint) + engine = ConcolicEngine(cterm_symbolic, input_vars=['N']) + + # When + traces = engine.explore(init, Subst({'N': intToken(10)})) + + # Then: the two feasible branches of the program are both discovered + assert len(traces) == 2 + signatures = {trace.signature for trace in traces} + assert len(signatures) == 2 + + +# A two-branch IMP program: two sequential conditionals over a single symbolic input ``n``: +# if (n <= 5) { a = 1 ; } else { a = 0 ; } +# if (n <= 10) { b = 1 ; } else { b = 0 ; } +# +# The feasible path leaves are: +# n<=5 ∧ n<=10 (e.g. N=3) +# ¬n<=5 ∧ n<=10 (e.g. N=7) +# ¬n<=5 ∧ ¬n<=10 (e.g. N=11) +# +# The combination n<=5 ∧ ¬n<=10 is UNSAT and must NOT be generated. +TWO_BRANCH_PGM: str = 'if (n <= 5) { a = 1 ; } else { a = 0 ; } if (n <= 10) { b = 1 ; } else { b = 0 ; }' + + +class TestImpConcolicTwoBranch(CTermSymbolicTest, KPrintTest): + KOMPILE_MAIN_FILE = K_FILES / 'imp.k' + + @staticmethod + def config(kprint: KPrint) -> CTerm: + k_parsed = kprint.parse_token(KToken(TWO_BRANCH_PGM, 'Stmt'), as_rule=False) + state_parsed = kprint.parse_token( + KToken( + '#token("n","Id") |-> N:Int #token("a","Id") |-> 0 #token("b","Id") |-> 0', + 'Map', + ), + as_rule=True, + ) + return CTerm( + KApply( + '', + KApply( + '', + ( + KApply('', KSequence(k_parsed)), + KApply('', state_parsed), + ), + ), + KVariable('GENERATED_COUNTER_CELL'), + ), + ) + + def test_explore_discovers_exactly_three_feasible_paths( + self, cterm_symbolic: CTermSymbolic, kprint: KPrint + ) -> None: + """Remainder-driven exploration finds the 3 feasible leaves and skips the UNSAT one. + + Seed N=3 (takes both true branches): + - B1 sibling rk=¬(n<=5): solve prefix=[] ∧ ¬(n<=5) -> N=6 queued + - B2 sibling rk=¬(n<=10): solve prefix=[n<=5] ∧ ¬(n<=10) = UNSAT -> pruned + + N=6 (false, true): + - B1 sibling rk=(n<=5): solve prefix=[] ∧ (n<=5) -> rediscovers N=3 path, skipped + - B2 sibling rk=¬(n<=10): solve prefix=[¬(n<=5)] ∧ ¬(n<=10) -> N=11 queued + + N=11 (false, false): + - B1 sibling produces N<=5 path (already seen) + - B2 sibling produces ¬(n<=5)∧(n<=10) path (already seen as N=6) + + Result: exactly 3 distinct path signatures. + """ + # Given + init = self.config(kprint) + engine = ConcolicEngine(cterm_symbolic, input_vars=['N']) + + # When: seed N=3 (both branches take the true side) + traces = engine.explore(init, Subst({'N': intToken(3)})) + + # Then: exactly 3 distinct feasible paths are discovered + signatures = {trace.signature for trace in traces} + assert len(signatures) == 3, f'Expected 3 distinct paths, got {len(signatures)}: {signatures}' + + # All discovered traces must have terminated normally + for trace in traces: + assert trace.status == 'terminal', f'Trace did not terminate: {trace.status}' diff --git a/pyk/src/tests/unit/test_concolic.py b/pyk/src/tests/unit/test_concolic.py new file mode 100644 index 0000000000..355e4fe7f4 --- /dev/null +++ b/pyk/src/tests/unit/test_concolic.py @@ -0,0 +1,121 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING, cast + +from pyk.concolic import ConcolicEngine +from pyk.kast.inner import Subst +from pyk.kast.prelude.kint import intToken +from pyk.kore.rpc import LogRewrite, RewriteFailure, RewriteSuccess + +if TYPE_CHECKING: + from pyk.cterm.symbolic import CTermSymbolic + from pyk.kore.rpc import LogEntry, State + + +# --------------------------------------------------------------------------- +# These unit tests target the trace-guided "brain" of the engine -- the pure +# logic that reuses a concrete rule trace to drive branch selection -- without +# a kompiled definition or a Kore server. The full two-pass flow (concrete +# pass -> symbolic replay -> flip) is covered by the IMP integration test. +# --------------------------------------------------------------------------- + + +@dataclass(frozen=True) +class _FakeState: + """Minimal stand-in for `kore.rpc.State`; `_match_branch` only reads `rule_id`.""" + + rule_id: str | None + + +def _states(*rule_ids: str | None) -> list[State]: + return [cast('State', _FakeState(rid)) for rid in rule_ids] + + +def _engine() -> ConcolicEngine: + return ConcolicEngine(cast('CTermSymbolic', object()), input_vars=['N']) + + +# --- branch selection by matching the concrete rule trace -------------------- + + +def test_match_branch_picks_rule_from_trace() -> None: + # Candidates are the two branch successors; the concrete trace took 'else'. + states = _states('then-id', 'else-id') + chosen, siblings, idx = ConcolicEngine._match_branch(states, ('lookup', 'le', 'else-id', 'assign'), 0) + assert chosen is not None + assert chosen.rule_id == 'else-id' + assert idx == 3 # pointer advanced past the matched entry + assert len(siblings) == 1 + assert siblings[0].rule_id == 'then-id' + + +def test_match_branch_respects_start_index() -> None: + # An earlier 'else-id' in the trace must be ignored once the pointer has moved past it. + states = _states('then-id', 'else-id') + chosen, siblings, idx = ConcolicEngine._match_branch(states, ('else-id', 'then-id'), 1) + assert chosen is not None + assert chosen.rule_id == 'then-id' + assert idx == 2 + assert len(siblings) == 1 + assert siblings[0].rule_id == 'else-id' + + +def test_match_branch_handles_loops() -> None: + # The same branch rule taken twice (a loop) is consumed one occurrence per call. + states = _states('body-id', 'exit-id') + trace = ('body-id', 'body-id', 'exit-id') + + first, siblings_1, idx = ConcolicEngine._match_branch(states, trace, 0) + assert first is not None and first.rule_id == 'body-id' and idx == 1 + assert len(siblings_1) == 1 and siblings_1[0].rule_id == 'exit-id' + + second, siblings_2, idx = ConcolicEngine._match_branch(states, trace, idx) + assert second is not None and second.rule_id == 'body-id' and idx == 2 + assert len(siblings_2) == 1 and siblings_2[0].rule_id == 'exit-id' + + third, siblings_3, idx = ConcolicEngine._match_branch(states, trace, idx) + assert third is not None and third.rule_id == 'exit-id' and idx == 3 + assert len(siblings_3) == 1 and siblings_3[0].rule_id == 'body-id' + + +def test_match_branch_no_match_returns_none() -> None: + states = _states('then-id', 'else-id') + chosen, siblings, idx = ConcolicEngine._match_branch(states, ('lookup', 'le', 'assign'), 0) + assert chosen is None + assert siblings == [] + assert idx == 0 # pointer unchanged + + +# --- harvesting the concrete trace from rewrite logs ------------------------- + + +def test_rewrite_rule_ids_extracts_successes_in_order() -> None: + logs: list[LogEntry] = [ + LogRewrite(origin=cast('object', None), result=RewriteSuccess(rule_id='r1')), # type: ignore[arg-type] + LogRewrite(origin=cast('object', None), result=RewriteFailure(rule_id='r2', reason='nope')), # type: ignore[arg-type] + LogRewrite(origin=cast('object', None), result=RewriteSuccess(rule_id='r3')), # type: ignore[arg-type] + ] + assert ConcolicEngine._rewrite_rule_ids(logs) == ['r1', 'r3'] + + +def test_rewrite_rule_ids_ignores_non_rewrite_entries() -> None: + @dataclass + class _Other: + pass + + logs: list[LogEntry] = [ + cast('LogEntry', _Other()), + LogRewrite(origin=cast('object', None), result=RewriteSuccess(rule_id='r1')), # type: ignore[arg-type] + ] + assert ConcolicEngine._rewrite_rule_ids(logs) == ['r1'] + + +# --- model restriction ------------------------------------------------------- + + +def test_restrict_to_inputs_keeps_only_input_vars() -> None: + engine = _engine() + model = Subst({'N': intToken(3), 'GENERATED_COUNTER_CELL': intToken(9)}) + restricted = engine._restrict_to_inputs(model) + assert dict(restricted) == {'N': intToken(3)}