feat: hook versioning, per-row provenance, and M2M deploy (#145)#146
Conversation
WIP checkpoint. Foundational + US1 (bundled deploy) + US2 (per-row provenance) production layer for hook versioning. Production type-checks clean; new unit tests pass. Existing test fixtures not yet migrated (suite red); Alembic migration and US3/US4/US5 endpoints pending. - Split HookDefinition -> HookIdentity (name+feature); runtime+source_ref move to the immutable, integer-versioned HookRelease entity. - New validation-domain registry: Hook/HookRelease/HookRun; HookRegistry port + Postgres adapter (row-locked gap-free versions, idempotent-on-digest releases, live-pointer advance/rollback, resolve_live snapshot, append-only hook_runs). - Tables hooks/hook_releases/hook_runs; features.* gain run_id; conventions PK srn->id; records convention_srn->convention_id. - ConventionSRN -> ConventionId across deposition/record/ingest/curation/ validation; conventions reference hooks by name; transactional bundled POST /conventions deploy fan-out. - Provenance: resolve-at-run-start snapshot; one hook_run per hook per batch; run_id threaded through ingest and deposition feature-insert paths; runners take (identity, release). Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Drop hook_run_ids pass-through from ValidationCompleted, DepositionApproved,
RecordDraft, RecordPublished, HookBatchCompleted, IngestBatchPublished — it was
tramp data riding through curation/record handlers that don't use it.
Instead reconstruct {hook_name: run_id} at feature-insert time from the keys the
consumer already holds, via a feature-domain HookRunReader port (Postgres
adapter: one indexed hook_runs ⋈ hook_releases join per batch / per deposition,
not per row). hook_runs are still written at execution time; provenance is
unchanged. Keeps provenance ownership in the validation tables and avoids a new
feature→validation domain-code edge.
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
The HookRunReader port I'd put in the feature domain was wrong: it inverted ownership (feature dictating a validation-aggregate query) and laundered a real feature->validation dependency through an infra adapter that read validation's tables. hook_runs/hook_releases are validation-domain data, so the read capability belongs there: run_ids_for_batch / run_ids_for_deposition now live on the HookRegistry port + Postgres adapter (SQL) + HookRegistryService. The feature insert handlers call the validation service directly — an honest, declared service-to-service cross-domain read. Deleted the feature-domain reader port, its adapter, and the DI binding. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
… types (#145) Reconcile the committed #145 spine to the post-spec design-revisions doc, and tighten loose string types into nominal value objects. Identity & conventions - ConventionId (slug@version) -> ConventionSlug: a frozen RootModel[str], bare unversioned slug. Conventions are now mutable; deploy is a declarative upsert by slug (ON CONFLICT DO UPDATE) with no caller version and no 409 path. - DeployConvention command DTOs renamed to DeployConvention{Schema,Hook,Release}; the shared HookDeploySpec is removed (release input is the command DTO; the internal deploy unit is deposition-domain HookDeploy). Typed names (no bare str across boundaries) - HookName promoted to a frozen RootModel[str]; .root only at PG/k8s string boundaries. HookResult.hook_name is now HookName. - New FeatureName value object for the read/feature surface: expected_features is list[FeatureName]; /data/{schema}/{feature} and the feature store/service speak features, not hooks. HookName -> FeatureName is converted once at the hook->feature boundary (validate_deposition, publish_batch). Provenance (design-revisions §6) - HookRun slimmed to a pure execution record (drop ingest/deposition/batch context + XOR check; finished_at/duration_s/oom_retries required). Add HookRunStatus.from_hook_status; delete the duplicated _run_status helpers. - run_id is carried via a run.json written into each hook output dir and read by the feature-insert handlers (read_run_ref/write_run_ref on the storage ports; RunRef value object). Delete run_ids_for_batch/run_ids_for_deposition and drop the HookRegistryService dependency from the insert handlers. hook_runs table slimmed accordingly; records keep a slug convention ref. All 1126 unit tests pass; ruff + ty(osa) clean. Alembic migration and integration-test reconciliation follow.
…ion slug (#145) Create hooks / hook_releases / hook_runs (slim, no execution-context columns); swap conventions PK from the opaque srn to the slug `id`; rename convention_srn -> convention_id on depositions / ingest_runs / records with their indexes. Pre-launch: forward-only, no data backfill. Dynamic feature tables gain `run_id` via the build_feature_table helper, not a static migration. Excludes a pre-existing, unrelated depositions.owner_id NOT NULL drift.
…ation tests (#145) Two production bugs surfaced by the integration suite once feature tables gained a NOT NULL run_id FK and conventions stored hook names as bare strings: - schema_feature_reader._hook_names: conventions.hooks is now a JSON list of plain hook-name strings, not {"name": ...} dicts — read the element directly. - DataCatalogService.resolve_table: compare the manifest's str resource name against feature_name.root (a FeatureName), not the FeatureName object, so /data/{schema}/{feature} resolves instead of 404ing. Integration tests reconciled to the new API: ConventionSlug, mutable upsert-by- slug convention repo, bundled deploy(), and a seed_hook_run conftest helper that builds the hooks->releases->runs provenance chain so feature rows satisfy the run_id FK. 1126 unit + 120 integration tests pass; ruff + ty(osa) clean.
…#145) Pre-launch with no deployed DB, so collapse all 16 incremental migrations into one `initial_schema` migration generated from tables.py. The schema is now born in its final shape — no convention_srn->convention_id rename churn, no PK swaps, and the stale depositions.owner_id NOT NULL drift is gone. Also fixes a latent index/runtime mismatch the squash surfaced: the uq_records_source unique index used .as_string() (a redundant CAST) which the bulk-publish ON CONFLICT ((source->>'type'),(source->>'id')) could not match. Switched to .astext so the index expression matches the runtime exactly. Verified: `alembic upgrade head` applies clean, `alembic check` reports zero drift (migration == tables.py), and all 120 integration + 1126 unit tests pass.
Completes US3–US5 of the hook-versioning feature on top of the existing
registry spine (deploy + provenance).
US3 — incremental release: CreateRelease mints vN+1 (201) or returns the
existing release on a duplicate digest (200, idempotent), advancing the live
pointer without touching the referencing convention; ListHooks/ListReleases/
GetRelease serve the catalog, history, and detail reads. New hooks router
(POST releases, GET catalog/history/detail) wired in ValidationProvider.
US4 — rollback: SetLive + PUT /hooks/{name}/live repoints the live pointer to
a prior release; history and already-produced rows' provenance are unchanged.
US5 — scoped machine credentials: optional second JWT issuer (extra_issuer
config) with iss-routed verification (RS256/EdDSA); Principal gains scopes +
has_scope; new RequiresScope gate (scope OR ADMIN) wired into the command/query
auth metaclasses, startup validation, and identity resolution. Deploy now
requires conventions:write; release/live require hooks:write (ADMIN still
accepted). The primary HS256 path is byte-identical when no second issuer is
configured.
Tests: handler/gate/token unit tests + DB-free hook contract tests + error-map
unit tests. 1155 unit + 7 contract + 120 integration pass; ruff + ty(osa) clean.
|
|
@greptile |
Greptile SummaryThis PR implements the hook versioning registry, per-row provenance, and scoped M2M deploy credentials described in issue #145. It consolidates 14 incremental migrations into a single
Confidence Score: 4/5The core implementation is thorough and several previously-flagged bugs in provenance recording, TOCTOU race on the created flag, and ingest-path log confinement are addressed. The remaining open items from prior review threads should be confirmed resolved before merge. The previously open thread items represent real bugs that the PR description claims to have fixed; the diff shows fixes for several (created flag race, log confinement, OOM retry count) but the migration FK and upsert_identity race were the most structurally concerning — verifying those are fully addressed is the main merge gate. New findings in this review are all non-blocking quality concerns. server/migrations/versions/c6d9f4c0c3ab_initial_schema.py (FK between hooks and hook_releases), server/osa/infrastructure/persistence/repository/hook_registry.py (upsert_identity concurrent deploy race), server/osa/domain/deposition/service/convention.py (cross-domain service coupling)
|
| Filename | Overview |
|---|---|
| server/migrations/versions/c6d9f4c0c3ab_initial_schema.py | Replaces 14 incremental migrations with a single consolidated initial schema; adds hook_registry tables (hooks, hook_releases, hook_runs) and renames convention_srn→id columns throughout. |
| server/osa/infrastructure/persistence/repository/hook_registry.py | New Postgres adapter for hook registry: row-locked monotonic version assignment, digest idempotency, and live-pointer advance. upsert_identity now uses ON CONFLICT DO NOTHING to handle concurrent deploys safely. |
| server/osa/domain/auth/service/token.py | Adds iss-routed EdDSA verification for M2M tokens; unverified pre-read decode is missing the algorithms=[] kwarg required by PyJWT ≥2.4 (flagged in prior thread). |
| server/osa/domain/ingest/handler/run_hooks.py | Replaced exception-based batch control flow with value-based HookExecution outcomes; adds per-hook provenance recording and deterministic run_id generation via uuid5. Previously flagged timing and provenance issues are addressed. |
| server/osa/domain/validation/service/validation.py | Deposition-path hook execution now resolves live releases at run start and records append-only hook_run rows with correct OOM retry count; deposition path uses non-deterministic uuid4 run_ids (no retry idempotency concern since UoW rolls back). |
| server/osa/domain/deposition/service/convention.py | deploy() performs atomic schema+hook+convention upsert; directly injects HookRegistryService from the validation domain without a port abstraction, adding to the existing TODO-port technical debt. |
| server/osa/domain/shared/command.py | Adds RequiresScope gate handling in the command auth metaclass; logic is copy-pasted verbatim into query.py, extending the existing AtLeast duplication pattern. |
| server/osa/domain/validation/model/hook_run.py | New append-only HookRun entity and HookRunStatus enum; WARNINGS variant is defined but never produced by from_hook_status or any execution path. |
| server/osa/application/api/v1/routes/hooks.py | New hook registry REST routes (releases, live pointer, catalog, run detail/logs); thin HTTP↔DTO coercion with correct 201/200 status code routing for idempotent creates. |
| server/osa/domain/validation/model/hook_result.py | Adds HookExecution value object with per-hook wall-clock window, FailureKind classification, and factory methods; resolves previous provenance timing bugs. |
Sequence Diagram
%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
participant Client
participant API as POST /conventions
participant CS as ConventionService
participant HR as HookRegistryService
participant DB as PostgresHookRegistry
participant Outbox
Client->>API: DeployConvention (schema + hooks + releases)
API->>CS: "deploy(slug, schema, hooks=[HookDeploy])"
CS->>CS: schema_service.create_schema / get_schema
loop per hook
CS->>HR: upsert_identity(name, feature)
HR->>DB: INSERT hooks ON CONFLICT DO NOTHING
CS->>HR: create_release(name, runtime, source_ref)
HR->>DB: SELECT hooks FOR UPDATE → INSERT hook_releases → UPDATE live_release_id
end
CS->>DB: convention_repo.save(convention)
CS->>Outbox: "ConventionRegistered(hooks=[HookIdentity])"
API-->>Client: 201 ConventionCreated
participant Worker as RunHooks (ingest)
Worker->>HR: resolve_live(hook_names) snapshot
loop per hook (sequential)
Worker->>Worker: hook_service.run_hook(identity, release, inputs)
Worker->>DB: hook_registry.record_run(HookRun)
Worker->>Worker: ingest_storage.write_run_ref(run.json)
end
Worker->>Outbox: HookBatchCompleted
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
participant Client
participant API as POST /conventions
participant CS as ConventionService
participant HR as HookRegistryService
participant DB as PostgresHookRegistry
participant Outbox
Client->>API: DeployConvention (schema + hooks + releases)
API->>CS: "deploy(slug, schema, hooks=[HookDeploy])"
CS->>CS: schema_service.create_schema / get_schema
loop per hook
CS->>HR: upsert_identity(name, feature)
HR->>DB: INSERT hooks ON CONFLICT DO NOTHING
CS->>HR: create_release(name, runtime, source_ref)
HR->>DB: SELECT hooks FOR UPDATE → INSERT hook_releases → UPDATE live_release_id
end
CS->>DB: convention_repo.save(convention)
CS->>Outbox: "ConventionRegistered(hooks=[HookIdentity])"
API-->>Client: 201 ConventionCreated
participant Worker as RunHooks (ingest)
Worker->>HR: resolve_live(hook_names) snapshot
loop per hook (sequential)
Worker->>Worker: hook_service.run_hook(identity, release, inputs)
Worker->>DB: hook_registry.record_run(HookRun)
Worker->>Worker: ingest_storage.write_run_ref(run.json)
end
Worker->>Outbox: HookBatchCompleted
Reviews (14): Last reviewed commit: "fix: record real OOM retry count on depo..." | Re-trigger Greptile
| "status", sa.String(length=32), server_default=sa.text("'pending'"), nullable=False | ||
| ), | ||
| sa.Column( | ||
| "ingestion_finished", sa.Boolean(), server_default=sa.text("false"), nullable=False | ||
| ), | ||
| sa.Column("batches_ingested", sa.Integer(), server_default=sa.text("0"), nullable=False), | ||
| sa.Column("batches_completed", sa.Integer(), server_default=sa.text("0"), nullable=False), | ||
| sa.Column("published_count", sa.Integer(), server_default=sa.text("0"), nullable=False), |
There was a problem hiding this comment.
Missing FK:
hooks.live_release_id → hook_releases.id
tables.py has an explicit comment: # FK added in migration (deferrable), but this migration never adds it. The hooks table is created with live_release_id UUID nullable and the hook_releases table is created right afterwards with a back-FK to hooks.name, but no op.create_foreign_key(...) call from hooks.live_release_id to hook_releases.id appears anywhere in the migration. The database therefore has no referential-integrity guard on the live pointer — a buggy or direct-SQL write could set live_release_id to a non-existent UUID and the adapter's _to_hook / resolve_live would silently return a Hook with an orphaned pointer. The fix is an op.create_foreign_key after both tables exist, using use_alter=True or a deferred constraint to handle the circular dependency.
|
|
||
| async def run(self, cmd: CreateRelease) -> ReleaseCreated: | ||
| built_by = str(self.principal.user_id) if self.principal.user_id else None | ||
|
|
||
| # Whether this is a brand-new version (201) or an idempotent no-op (200) | ||
| # for a digest already present. Read before minting; the adapter's row | ||
| # lock still guarantees gap-free versions regardless of this hint. | ||
| seen_digests = {r.runtime.digest for r in await self.service.list_releases(cmd.name)} | ||
| release = await self.service.create_release( | ||
| cmd.name, cmd.to_runtime(), cmd.source_ref, built_by | ||
| ) | ||
| hook = await self.service.get_hook(cmd.name) | ||
| is_live = hook is not None and hook.live_release_id == release.id | ||
|
|
||
| return ReleaseCreated( | ||
| hook_name=release.hook_name, |
There was a problem hiding this comment.
created flag is TOCTOU-racy under concurrent identical submissions
seen_digests is collected via list_releases before the adapter's SELECT … FOR UPDATE row lock is acquired inside create_release. Two concurrent requests carrying the same digest can both read an empty seen_digests, call create_release in series (the lock serializes them correctly for data integrity), and both compute created=True — returning 201 for what should be the second caller's idempotent 200. The simplest fix is to derive created from a flag the adapter already knows: return a (HookRelease, bool) tuple from create_release (or compare the release's built_at to now()), so the determination happens inside the lock.
| started_at=started_at, | ||
| finished_at=finished_at, | ||
| duration_s=duration_by_hook.get(hook.name, 0.0), | ||
| oom_retries=0, |
There was a problem hiding this comment.
oom_retries always recorded as 0 in provenance
HookRun.oom_retries is hardcoded to 0 for every run, even when the hook actually retried after an OOM eviction. The OOM retry loop (which calls with_doubled_memory) lives inside the hook runner infrastructure and the retry count isn't surfaced to HookResult, so this information is currently lost at the provenance layer. At minimum the field should reflect actual retries; if the runner doesn't expose the count yet, a follow-up ticket should be opened to propagate it so the provenance record is accurate.
| oom_retries=0, | |
| oom_retries=0, # TODO: surface actual retry count from hook runner (FR-provenance) |
Addresses two PR review findings on the hook registry.
1. Missing referential integrity on the live pointer: the initial_schema
migration created hooks.live_release_id without the FK to hook_releases.id
(the tables.py comment promised one). Add it as a DEFERRABLE, use_alter FK in
both the SQLAlchemy metadata and the migration (created after both tables
exist, dropped first on downgrade) so the circular dependency is handled and
`alembic check` stays zero-drift. Verified against Postgres: FK present,
DEFERRABLE INITIALLY DEFERRED, no drift.
2. TOCTOU on the release 201-vs-200 hint: the handler read existing digests
before the adapter's row lock, so concurrent identical submissions could both
report "created". Decide `created` inside the lock instead — create_release
now returns a ReleaseOutcome{release, created} (named result, not a tuple),
and the handler maps it straight to the status code with no pre-read.
Tests + integration conftest updated for the new return type. 1155 unit + 120
integration pass; ruff + ty(osa) clean.
Addresses the third PR review finding: HookRun.oom_retries was hardcoded to 0 at every record site, so provenance always claimed zero retries even when a hook OOM-retried with doubled memory. - HookResult gains oom_retries; HookService.run_hook counts each doubled-memory retry inside its loop and sets it on the returned result (success + rejected paths). - Layer-2 cleanup: the ingest handler's parallel status_by_hook/duration_by_hook dicts collapse into one result_by_hook, from which status, duration, and oom_retries are read. - Both record sites (ingest run_hooks, deposition ValidationService) stamp the real count. The two hard-failure paths (OOM-exhausted / other exceptions) keep 0 with a comment — the count isn't surfaced through the exception and those runs produce no feature rows (provenance is best-effort by design). Tests: run_hook counting (0 / 1 / 2 retries with memory doubling) and end-to-end that the recorded HookRun carries the count. 1156 unit + 120 integration pass; ruff + ty(osa) clean.
|
@greptile |
The optional second (machine) JWT issuer now accepts EdDSA/Ed25519 exclusively — the algorithm is no longer configurable. ExtraIssuerConfig drops its `algorithm` field and the verify path pins `algorithms=["EdDSA"]`, so the token header's `alg` is never trusted and can't downgrade to `none` or to HS256 treating the configured PEM as a shared secret. Operators only supply the issuer, Ed25519 public key, audience, and scope claim. Tests cover the EdDSA happy path, wrong-key/wrong-aud rejection, and that an RS256-signed token for the trusted issuer is rejected.
Conventions must now carry a description — every published convention should describe itself. `description` becomes a required `str` end to end: the deploy command DTO (min_length=1 at the API edge), the Convention aggregate, the service signature, the read DTOs, and the `conventions.description` column (NOT NULL in both the metadata and the initial_schema migration; zero-drift `alembic check`). Fixtures/seeds updated to supply one.
The deploy API no longer accepts a slug. ConventionSlug gains a from_title() factory that slugifies the human title (lowercase, non-alphanumerics → single hyphen, trimmed, capped at 64) and the DeployConventionHandler derives the identity from cmd.title. Titles that can't yield a valid slug (no letters, < 3 chars, or leading digit) raise a domain ValidationError → 422. Note: because deploy upserts on the slug, the title is now identity-bearing — a different title produces a different slug (a new convention). The server returns the derived slug in the deploy response so callers learn their identity.
|
@greptile |
The OAuth state signature is base64 of a random HMAC, so its first character is "x" ~1/64 of the time — making the old `"x" + sig[1:]` tamper a no-op that left a valid signature and failed the "should reject" assertion. Flip the first char to a guaranteed-different one so the tamper always mutates the signature.
| raise ConflictError( | ||
| f"Hook {name!r} already exists with a different feature contract; " | ||
| "the output contract is fixed across releases" | ||
| ) | ||
| return self._to_hook(existing) | ||
|
|
||
| await self.session.execute( | ||
| insert(hooks_table).values( | ||
| name=name.root, | ||
| feature_spec=feature.model_dump(), | ||
| live_release_id=None, | ||
| created_at=datetime.now(UTC), | ||
| ) | ||
| ) | ||
| await self.session.flush() | ||
| row = await self._get_hook_row(name) | ||
| assert row is not None | ||
| return self._to_hook(row) | ||
|
|
||
| async def create_release( | ||
| self, | ||
| name: HookName, | ||
| runtime: OciConfig, | ||
| source_ref: str, | ||
| built_by: str | None, | ||
| ) -> ReleaseOutcome: | ||
| # Row-lock the hook so concurrent releases serialize (R7). Also asserts | ||
| # the hook exists. | ||
| locked = await self.session.execute( |
There was a problem hiding this comment.
Unguarded race in
upsert_identity can produce an unhandled PK violation
_get_hook_row runs under READ COMMITTED with no row lock. Two concurrent POST /conventions requests that arrive with the same new hook name will both read an empty result, both attempt INSERT INTO hooks …, and the second will hit the primary-key constraint and propagate an unhandled IntegrityError (→ 500) instead of the expected idempotent response.
create_release handles this correctly by doing a SELECT … FOR UPDATE first. upsert_identity needs the same guard — the cleanest fix is to lock the row with SELECT … FOR UPDATE SKIP LOCKED on the first read, or switch to an INSERT … ON CONFLICT (name) DO NOTHING followed by a SELECT to fetch the winner. Without it, simultaneous deploys of the same convention will non-deterministically fail.
| # performs the real signature + audience verification. The algorithm | ||
| # is pinned to EdDSA (Ed25519): the token header's `alg` is never | ||
| # trusted, so a token can't downgrade to `none`/HS256. | ||
| unverified = jwt.decode(token, options={"verify_signature": False}) |
There was a problem hiding this comment.
Missing
algorithms keyword on the unverified decode. In PyJWT ≥ 2.4 the algorithms parameter is required even when verify_signature=False; omitting it raises DecodeError: It is required that you pass in a value for the "algorithms" argument when calling decode() on strict builds. Pass algorithms=[] (empty list is explicitly allowed when verification is disabled) to make this forward-safe.
| unverified = jwt.decode(token, options={"verify_signature": False}) | |
| unverified = jwt.decode(token, options={"verify_signature": False}, algorithms=[]) |
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
Pulls the latest PyJWT (lock only; the >=2.11.0 floor is unchanged). Confirms
the iss-routing unverified decode (`options={"verify_signature": False}` with no
`algorithms`) still works on 2.13 — so the M2M auth path is current and the
"algorithms required on unverified decode" concern is moot on every supported
version.
Two concurrent `POST /conventions` deploying the same brand-new hook name both read an empty result and both INSERT into `hooks`, the loser hitting the primary-key constraint → unhandled IntegrityError (500) instead of the idempotent result the declarative-upsert deploy promises. Replace the check-then-insert with `INSERT ... ON CONFLICT (name) DO NOTHING` followed by a read-back: PG serializes on its internal lock (blocking on a concurrent uncommitted insert until it resolves), the loser no-ops, and the fixed feature-contract check still fires on a genuine mismatch (409). The old `assert row is not None` is replaced with an explicit error. (FOR UPDATE — Greptile's first suggestion — can't apply here: the row doesn't exist yet, so there's nothing to lock; ON CONFLICT is the correct primitive.) Adds an integration regression test: two concurrent upserts of the same new name both succeed with exactly one row; differing-contract still raises ConflictError.
|
@greptile |
| @@ -127,7 +149,40 @@ async def handle(self, event: IngesterBatchReady) -> None: | |||
| ingest_run_id=event.ingest_run_id, | |||
| ) | |||
|
|
|||
| # Emit HookBatchCompleted | |||
| # Record one append-only hook_run per hook (provenance anchor, #145) and | |||
| # write run.json into each hook's output dir so the feature-insert handler | |||
| # can stamp feature.run_id without a DB lookup (design-revisions §6). | |||
| finished_at = datetime.now(UTC) | |||
| result_by_hook = {r.hook_name: r for r in results} | |||
| for hook, release in pairs: | |||
| run_id = run_id_by_hook[hook.name] | |||
| # Absent result → the hook produced nothing this batch (e.g. an | |||
| # OOM-exhausted batch that raised); record an ERROR run. The per-hook | |||
| # retry count isn't surfaced through that failure path, so it stays 0. | |||
| result = result_by_hook.get(hook.name) | |||
| run_status = ( | |||
| HookRunStatus.from_hook_status(result.status) | |||
| if result is not None | |||
| else HookRunStatus.ERROR | |||
| ) | |||
| await self.ingest_storage.write_run_ref( | |||
| work_dirs[hook.name], str(run_id), str(release.id) | |||
| ) | |||
| await self.hook_registry.record_run( | |||
| HookRun( | |||
| id=run_id, | |||
| release_id=release.id, | |||
| status=run_status, | |||
| started_at=started_at, | |||
| finished_at=finished_at, | |||
| duration_s=result.duration_seconds if result is not None else 0.0, | |||
| oom_retries=result.oom_retries if result is not None else 0, | |||
| ) | |||
| ) | |||
There was a problem hiding this comment.
Successful hook runs misrecorded as ERROR when a later hook OOM-exhausts
results is initialized to [] before the try block and is only assigned when run_hooks_for_batch completes without raising. If hook N passes and hook N+1 exhausts its OOM retries, the OOMError propagates through run_hooks_for_batch before the function returns, so the local results list (which already holds hook N's HookResult) is never surfaced to the caller — results in handle() stays [].
The provenance loop then builds result_by_hook = {} and falls through the else HookRunStatus.ERROR branch for every hook, including those that genuinely passed. Their hook_runs rows are inserted with status=ERROR and duration_s=0.0. Since feature rows produced by those hooks are still published (OOM falls through to HookBatchCompleted), the feature table ends up with a FK to a hook_run record that falsely says the producing run errored — permanently corrupting append-only provenance for a correct result.
Make a client/server runtime-payload mismatch fail loudly at deploy instead of being silently swallowed into an empty config that only blows up at container runtime. The deploy edge DTOs (DeployConvention + nested Schema/Hook/Release) and the standalone release/live bodies gain extra="forbid", and the hook `config` becomes required and typed dict[str, Any] (an opaque JSON object forwarded verbatim to the container — OSA never reads its keys), matching the domain OciConfig.config. `limits` keeps its defaults; a misnamed limits field is still caught by extra="forbid". Contract tests: unknown field -> 422, missing config -> 422.
…r logs (#145) On a non-zero ingester exit, write the container's stdout/stderr to the run's own output dir (ingester.log) and log only a reference — keeping tenant output (which can carry upstream credentials / PII) out of the OSA operator's logs. A tenant-scoped retrieval surface (log_ref + endpoint, mirroring hook runs) is tracked in #147.
|
@greptile |
| @@ -127,7 +149,40 @@ async def handle(self, event: IngesterBatchReady) -> None: | |||
| ingest_run_id=event.ingest_run_id, | |||
| ) | |||
|
|
|||
| # Emit HookBatchCompleted | |||
| # Record one append-only hook_run per hook (provenance anchor, #145) and | |||
| # write run.json into each hook's output dir so the feature-insert handler | |||
| # can stamp feature.run_id without a DB lookup (design-revisions §6). | |||
| finished_at = datetime.now(UTC) | |||
| result_by_hook = {r.hook_name: r for r in results} | |||
| for hook, release in pairs: | |||
| run_id = run_id_by_hook[hook.name] | |||
| # Absent result → the hook produced nothing this batch (e.g. an | |||
| # OOM-exhausted batch that raised); record an ERROR run. The per-hook | |||
| # retry count isn't surfaced through that failure path, so it stays 0. | |||
| result = result_by_hook.get(hook.name) | |||
| run_status = ( | |||
| HookRunStatus.from_hook_status(result.status) | |||
| if result is not None | |||
| else HookRunStatus.ERROR | |||
| ) | |||
| await self.ingest_storage.write_run_ref( | |||
| work_dirs[hook.name], str(run_id), str(release.id) | |||
| ) | |||
| await self.hook_registry.record_run( | |||
| HookRun( | |||
| id=run_id, | |||
| release_id=release.id, | |||
| status=run_status, | |||
| started_at=started_at, | |||
| finished_at=finished_at, | |||
| duration_s=result.duration_seconds if result is not None else 0.0, | |||
| oom_retries=result.oom_retries if result is not None else 0, | |||
| ) | |||
| ) | |||
There was a problem hiding this comment.
Batch-level wall-clock timestamps make
started_at/finished_at wrong for every hook after the first
started_at (line 107) is sampled once before run_hooks_for_batch and finished_at (line 155) is sampled once after it. All HookRun inserts in the batch then use these two shared values regardless of which hook is being recorded.
For a batch with three sequential hooks taking 60 s each, hook 2's record will have started_at = T0 (batch start) and finished_at = T180 (batch end), while duration_s = 60 (correctly sourced from the runner). The invariant finished_at − started_at ≈ duration_s is violated for every hook except possibly the last, making the timestamps in the append-only provenance table permanently misleading.
The validation-service path (ValidationService.run_hooks) handles this correctly by capturing started_at and finished_at inside the per-hook loop. The batch handler should do the same: resolve the per-hook start time before calling run_hook on each pair and capture the per-hook end time immediately after.
The ingest batch handler sampled started_at once before run_hooks_for_batch and
finished_at once after, then stamped every HookRun in the batch with that shared
window — while duration_s came (correctly) from the runner. For a batch of N
sequential hooks, hooks 2..N got finished_at − started_at ≈ the whole batch span
instead of their own run, permanently misdating the append-only hook_runs table.
run_hooks_for_batch now captures each hook's own wall-clock window and returns a
HookExecution{result, started_at, finished_at}; the handler stamps each HookRun
from its hook's window (the batch window remains only as a fallback for a hook
that produced no result, e.g. an OOM-exhausted batch). The deposition path
(ValidationService.run_hooks) was already per-hook and is unchanged.
Regression test: two sequential hooks yield distinct, non-overlapping windows.
…145) Replaces the ingest batch handler's exception-as-control-flow model, which let one hook's failure unwind the loop and erase its siblings' outcomes (recorded with a coarse shared batch window) and re-minted hook_run ids on every retry (duplicate provenance rows). - HookExecution is now a TOTAL outcome (passed | rejected | errored + FailureKind) with its own wall-clock window; run_hooks_for_batch catches each hook's exception and returns a failed execution instead of propagating — a failing hook never discards a sibling. - The handler records each hook from its OWN execution (no coarse fallback), continues on error, and derives batch fate from the set: any TRANSIENT failure re-drives the batch (the UOW rolls back, but filesystem checkpoints make the re-run cheap); otherwise it records all + emits HookBatchCompleted, committing atomically. A PERMANENT/OOM hook failure is a terminal ERROR run, not a batch failure — hooks are independent and PublishBatch already publishes per-record. - Idempotent retry/redelivery: deterministic hook_run id (uuid5 over ingest_run_id+batch_index+hook_name) + record_run ON CONFLICT (id) DO NOTHING. OOMError carries its real retry count. Tests: continue-on-error surfaces passed + failed hooks with their own windows; transient → raise + no emit; permanent/OOM → terminal ERROR + batch completes; deterministic id stable across redeliveries; record_run idempotent (integration). 1176 unit + 123 integration pass; ruff + ty clean. Follow-ups tracked separately: idempotent complete_batch, partial-feature publishing, log_ref on ERROR runs.
|
@greptile take a look at the new way to surface hook execution errors with fine granularity |
When a hook container errors (e.g. an SDK-version mismatch), its hook_run provenance recorded status=ERROR but log_ref=None, leaving the failure undiagnosable. Capture the failed container's stdout/stderr to a tenant-scoped artifact (the hook's own output dir, never operator logs) and stamp the locator on HookRun.log_ref, on both the ingestion (RunHooks) and deposition (ValidationService.run_hooks) paths. - Promote container_logs to a typed InfrastructureError.__init__ keyword field so every read site narrows the exception type instead of using getattr. - Normalize the OCI runner onto container_logs: non-zero exit and OOM now raise with container_logs=… and a clean message (no logs embedded in the message, no stderr tail print — both leaked tenant output to operator logs). - Preserve the runner's container_logs through HookService's OOM-exhaustion re-raise; surface them on HookExecution.log_text for the batch path. - Add write_hook_log to both storage ports (FS + S3 adapters), writing output/hook.log and returning the log_ref locator. The authenticated retrieval endpoint that serves log_ref is #147.
| # (nothing recorded this attempt), but each hook's filesystem checkpoint | ||
| # makes the re-run cheap — completed hooks return instantly without a | ||
| # container, and the deterministic id keeps the eventual insert dup-free. | ||
| pending = [e for e in executions if not e.is_terminal] | ||
| if pending: | ||
| names = ", ".join(e.hook_name.root for e in pending) | ||
| raise TransientError( | ||
| f"batch {event.batch_index}: {len(pending)} hook(s) pending retry: {names}" | ||
| ) | ||
|
|
There was a problem hiding this comment.
Terminal hooks' provenance timing is permanently wrong when a sibling raises
TransientError
The pending guard raises before the provenance-recording loop. If hook A completes (terminal) and hook B raises TransientError (non-terminal), the raise exits before record_run and write_run_ref are called for hook A. On retry, hook A returns from checkpoint and the deterministic run_id produces a correct final DB row — but started_at/finished_at/duration_s in that append-only HookRun row will reflect the fast checkpoint-return of the retry, not the original container execution window. Recording provenance for terminal hooks before the transient check, relying on the existing ON CONFLICT DO NOTHING idempotency, would fix this.
…#147) Adds the read surface for the log_ref captured in 881b136. A hook run is addressed by its globally-unique HookRunId, so one endpoint covers both the deposition and ingestion hook paths (both record hook_run rows). - GET /hooks/runs/{run_id} → HookRunDetail (status/timing/release/has_logs) - GET /hooks/runs/{run_id}/logs → text/plain stream of the captured hook.log Both ADMIN-only (no per-run ownership model; logs are tenant stdout/stderr). The raw log_ref locator is never exposed — clients see has_logs and fetch bytes via the /logs sub-resource. - get_run(run_id) down the registry stack (port + Postgres repo _to_run + service). - read_hook_log(log_ref) on HookStoragePort: FS adapter confines the read under the data root (rejects a tampered locator); S3 reads by key with a '..' guard. - Reserve 'runs' so no hook/feature/schema name shadows the /hooks/runs slot. Ingester container logs (a separate container in the ingest domain, no capture yet) are tracked separately.
|
@greptile look at the new log write/read stuff |
| async def read_hook_log(self, log_ref: str) -> AsyncIterator[bytes]: | ||
| """Stream a captured hook log by its absolute-path locator (#147). | ||
|
|
||
| Confines the read to the data root: a ``log_ref`` that resolves outside | ||
| ``base_path`` is rejected, so a tampered locator can't read arbitrary files. | ||
| """ | ||
| from osa.domain.shared.error import NotFoundError | ||
|
|
||
| target = Path(log_ref).resolve() | ||
| if not target.is_relative_to(self.base_path.resolve()): | ||
| raise ValueError(f"log_ref escapes the data root: {log_ref}") | ||
| if not target.is_file(): | ||
| raise NotFoundError(f"Hook log not found: {log_ref}") | ||
|
|
||
| async def _stream() -> AsyncIterator[bytes]: | ||
| with open(target, "rb") as f: | ||
| while chunk := f.read(8192): | ||
| yield chunk | ||
|
|
||
| return _stream() |
There was a problem hiding this comment.
Ingest-path hook logs are unreadable — wrong base path for confinement check
FilesystemStorageAdapter is initialised with base_path = paths.data_dir / "files" (see persistence/di.py line 142). Logs written by the ingest pipeline go through FilesystemIngestStorage, which uses StorageLayout(paths.data_dir) and writes to {data_dir}/ingests/{run_id}/batches/{batch_index}/hooks/{hook_name}/output/hook.log. When GET /hooks/runs/{run_id}/logs is called for any ingest-run hook, the is_relative_to(self.base_path.resolve()) check fails (the key is under data_dir/ingests/, not data_dir/files/), raising ValueError which falls out of the handler as a 500. Only deposition-path hook logs (written by this same adapter under data_dir/files/) are actually reachable. The S3 path is unaffected since its guard is a simple ".." substring check with no prefix scoping.
read_hook_log confined the FS read to base_path (= data_dir/files), but ingest pipeline hook logs are written under data_dir/ingests/.../hook.log. The is_relative_to(base_path) check rejected every ingest-run log with ValueError → 500; only deposition-path logs (under files/) were reachable. Confine against the node data root instead, which spans both files/ (deposition) and ingests/ (ingestion). FilesystemStorageAdapter takes an explicit data_root (defaults to base_path for deposition-only callers); persistence DI passes paths.data_dir. Regression test reads an ingest-tree log path. S3 is unaffected (its guard is a '..' check with no prefix scoping).
| work_dir = self.hook_storage.get_hook_output_dir(deposition_srn, hook.name.root) | ||
| started_at = datetime.now(timezone.utc) | ||
| run_id = HookRunId(uuid4()) | ||
| try: | ||
| result = await hook_service.run_hook(hook, inputs, work_dir) | ||
| except Exception: | ||
| result = await hook_service.run_hook(hook, release, inputs, work_dir) | ||
| except Exception as exc: | ||
| finished_at = datetime.now(timezone.utc) | ||
| # Record ANY failure as a terminal ERROR run. Capture the failed | ||
| # container's logs (typed on InfrastructureError, else the message) | ||
| # to a tenant-scoped artifact so the ERROR is diagnosable (#145/#147). | ||
| text = ( | ||
| exc.container_logs | ||
| if isinstance(exc, InfrastructureError) and exc.container_logs | ||
| else str(exc) |
There was a problem hiding this comment.
OOM retry count silently zeroed in deposition-path provenance
When hook_service.run_hook raises OOMError, the exception already carries the actual retry count (set by HookService.run_hook which re-raises with oom_retries=oom_retries). The except clause here hardcodes oom_retries=0 and even has a comment claiming the count "isn't available on this failure path" — but it is: exc.oom_retries is populated whenever exc is an OOMError. The batch ingest path was corrected in this PR via HookExecution.failed, but this deposition-validation path was left with the old behaviour, permanently writing a wrong oom_retries=0 into the append-only hook_runs table for every OOM-exhausted deposition run.
ValidationService.run_hooks hardcoded oom_retries=0 on the failure path, with a comment claiming the count wasn't available. It is: HookService.run_hook re-raises OOMError(oom_retries=...) after exhausting memory retries, so exc.oom_retries is populated. Every OOM-exhausted deposition run was writing a wrong oom_retries=0 into the append-only hook_runs table. Read the count off the exception when it's an OOMError (mirrors the batch path's HookExecution.failed); non-OOM failures stay 0. Regression asserts the recorded run carries MAX_OOM_RETRIES.
|
@greptile |
Implements GitHub issue #145 end-to-end: a versioned hook registry, exact per-row provenance, and scoped machine (M2M) deploy credentials. Built on the existing DDD/CQRS layering (API → Command/Query → Service → Repo), following the feature's
architecture.mdanddesign-revisions.md.What's in it
Registry spine (foundational)
Hookidentity (name + fixed output contract) · immutable, integer-versionedHookRelease· live pointer · append-onlyHookRun.HookRegistryadapter: row-locked, gap-free monotonic versions; live-pointer advance/repoint; digest idempotency; live-release snapshot.initial_schemamigration (zero-driftalembic check);records.convention_idtypedConventionSlug; conventions are unversioned, slug-keyed, mutable.US1 — single-call deploy (
POST /conventions): one atomic call creates/versions the schema, upserts hooks + first releases (+ feature tables), and upserts the convention by slug (declarative upsert, no 409).US2 — provenance: every feature row carries
run_id → hook_runs → hook_releases, yielding the exact version/digest/config/source_ref that produced it.run_idis carried via arun.jsonin each hook's output dir (no DB-lookup reconstruction). The live release is resolved once at run start and snapshotted, so in-flight runs are unaffected by a mid-run pointer move.US3 — incremental release (
POST /hooks/{name}/releases): mints vN+1 (201) or returns the existing release on a duplicate digest (200, idempotent), advancing live without touching the convention. Catalog/history/detail reads:GET /hooks,GET /hooks/{name}/releases[/{version}].US4 — rollback (
PUT /hooks/{name}/live): repoints live to a prior release; history and already-produced rows' provenance are preserved.US5 — scoped M2M credentials: optional second JWT issuer (
auth.extra_issuer) withiss-routed verification (RS256/EdDSA).Principalgainsscopes+has_scope; newRequiresScopegate (scope OR ADMIN) wired into the command/query auth metaclasses, startup validation, and identity resolution. Deploy →conventions:write; release/live →hooks:write(ADMIN still accepted). The primary HS256 path is byte-identical when no second issuer is configured.Testing
TDD throughout. 1155 unit + 7 contract + 120 integration tests pass;
ruffandty check osaclean. New this PR: handler/gate/token unit tests, DB-free hook contract tests, and centralized error-mapping unit tests. The integration suite exercises the registry adapter, migration, and provenance chain against real Postgres.Notes