Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions packages/agent/src/dkg-agent-cg-registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,26 @@ export class ContextGraphRegistryMethods extends DKGAgentBase {
* unavailable, contract not deployed, transient errors) are logged
* and the field is left undefined.
*/
async getContextGraphOnChainPolicy(this: DKGAgent, contextGraphId: string): Promise<{
async getContextGraphOnChainPolicy(this: DKGAgent, contextGraphId: string, options?: {
/**
* Cap the age (ms) of a cached `publishPolicy` entry this call will accept;
* default `ON_CHAIN_PUBLISH_POLICY_CACHE_TTL_MS` (60s). `publishPolicy` is
* mutable on-chain (`PublishPolicyUpdated`) and the agent has no event
* watcher, so the cache can be stale-PERMISSIVE for up to its age after an
* owner downgrades open→curated publish. Most callers (e.g. the
* import-artifact owner guard) tolerate the full 60s. A SECURITY-POSITIVE
* admission decision (host-mode self-signed plaintext ingest, see
* `isConfirmedPublicForHostMode`) passes a SHORT window
* (`HOST_MODE_PUBLISH_POLICY_MAX_CACHE_AGE_MS` = 5s): an open→curated
* downgrade is then honored within seconds, AND — because the resolver
* writes through to this same cache — the chain RPC is rate-capped to ~1
* per window per CG instead of an `eth_call` on every admitted envelope
* (Branimir review #1239 follow-on). An RPC failure/timeout still leaves
* `publishPolicy` undefined → the caller fails closed. (`accessPolicy` is
* immutable on-chain, so its cache read below is left un-TTL'd.)
*/
publishPolicyMaxCacheAgeMs?: number;
}): Promise<{
accessPolicy?: number;
publishPolicy?: number;
}> {
Expand All @@ -492,7 +511,12 @@ export class ContextGraphRegistryMethods extends DKGAgentBase {
const isPublishPolicyCacheFresh = (key: string): boolean => {
const fetchedAt = this.onChainPublishPolicyCacheUpdatedAt.get(key);
if (fetchedAt === undefined) return false;
return Date.now() - fetchedAt <= ON_CHAIN_PUBLISH_POLICY_CACHE_TTL_MS;
// A security-positive caller can shrink the accepted cache age (e.g. the
// host-mode admission gate passes ~5s) so an open→curated downgrade is
// re-verified within seconds, while still rate-capping the chain RPC to
// ~1 per window per CG. Default is the general 60s TTL.
const maxAge = options?.publishPolicyMaxCacheAgeMs ?? ON_CHAIN_PUBLISH_POLICY_CACHE_TTL_MS;
return Date.now() - fetchedAt <= maxAge;
};
let publishPolicy = isPublishPolicyCacheFresh(contextGraphId)
? this.onChainPublishPolicyCache.get(contextGraphId)
Expand Down
8 changes: 8 additions & 0 deletions packages/agent/src/dkg-agent-crypto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,14 @@ export class WorkspaceCryptoMethods extends DKGAgentBase {
let fallback: (AgentKeyRecord & { privateKey: string }) | null = null;
for (const record of this.localAgents.values()) {
if (!record.privateKey) continue;
// GH #787 — a node-level key record can carry a privateKey but no (or an
// invalid) agentAddress (an operational identity, not an agent). Such a
// record is NOT a usable gossip signer: encodeWorkspaceGossipMessage emits
// `agentAddress` into the envelope and the downstream host-mode authority
// check rejects a missing/invalid one. Skip it entirely — that both avoids
// the original `toLowerCase()`-of-undefined crash (HTTP 500 on SWM write)
// AND prevents it becoming a fallback that emits an unverifiable envelope.
if (!record.agentAddress || !ethers.isAddress(record.agentAddress)) continue;
const signingRecord = { ...record, privateKey: record.privateKey };
if (defaultAddress && record.agentAddress.toLowerCase() === defaultAddress) {
return signingRecord;
Expand Down
182 changes: 158 additions & 24 deletions packages/agent/src/dkg-agent-swm-host.ts
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,16 @@ import type { DKGAgent } from './dkg-agent.js';

const DEFAULT_HOST_MODE_RECONCILE_BATCH_SIZE = 32;

/**
* Max age (ms) of a cached `publishPolicy` value the host-mode self-signed
* admission gate (`isConfirmedPublicForHostMode`) will trust. Deliberately
* short: it bounds the open→curated downgrade staleness to a few seconds
* (vs the general 60s `ON_CHAIN_PUBLISH_POLICY_CACHE_TTL_MS`) AND rate-caps the
* chain RPC to ~1 per window per CG, so spammed public-plaintext gossip can't
* amplify into a per-message `eth_call` (Branimir review #1239 follow-on).
*/
const HOST_MODE_PUBLISH_POLICY_MAX_CACHE_AGE_MS = 5_000;

function normalizeHostModeReconcileBatchSize(value: number | undefined): number {
if (typeof value !== 'number' || !Number.isFinite(value)) return DEFAULT_HOST_MODE_RECONCILE_BATCH_SIZE;
return Math.max(1, Math.floor(value));
Expand Down Expand Up @@ -678,6 +688,48 @@ export class SwmHostModeMethods extends DKGAgentBase {
}
}

/**
* GH #1124 — DEFINITIVE "fully-open CG" check gating the self-signed public
* host-mode ingest path. "Open" requires BOTH axes, because this codebase
* separates READ visibility from WRITE authority:
* - accessPolicy === 0 → publicly READABLE (SWM is plaintext), AND
* - publishPolicy === 1 → OPEN PUBLISH (anyone may write).
* A public-readable but curated-publish CG (accessPolicy 0, publishPolicy 0 /
* PCA) still restricts WHO may publish, so the self-signed path must NOT apply
* — otherwise any key could store plaintext SWM on host-mode cores and bypass
* the on-chain publisher authorization (otReviewAgent #1239-r3). Curated OR
* unknown on EITHER axis → false: the conservative ciphertext + allowlist gates
* stay in force and a chain-event race heals via member catchup, so a curated
* (or restricted-publish) CG is never misclassified as self-publishable.
*/
async isConfirmedPublicForHostMode(this: DKGAgent, contextGraphId: string): Promise<boolean> {
// Resolve via the SHARED on-chain policy resolver rather than a direct
// cleartext `subscribedContextGraphs` lookup. `getContextGraphOnChainPolicy`
// re-keys cleartext↔on-chain-id, consults the cache + local `_meta`, AND
// falls back to a direct chain RPC — so it resolves BOTH policies even for a
// host-only core keyed by the wire HASH with no local `_meta` (the #1124
// sharded topology). Both must positively resolve to their open value; any
// undefined (unknown) → false (safe).
try {
// `publishPolicyMaxCacheAgeMs`: publishPolicy is mutable on-chain and the
// general cache is ≤60s-TTL'd, so it could be stale-PERMISSIVE for up to
// the TTL after an owner downgrades open→curated publish. This is a
// security-positive gate (it admits a self-signed plaintext write that host
// catchup later applies under trustedReplay), so it accepts only a SHORT
// (~5s) cache window — bounding the downgrade staleness to seconds while
// rate-capping the chain RPC to ~1 per window per CG (vs an eth_call on
// every admitted envelope). An RPC failure/timeout leaves publishPolicy
// undefined → we fail CLOSED (drop; the share heals via retry/catchup
// once the policy re-resolves).
const { accessPolicy, publishPolicy } = await this.getContextGraphOnChainPolicy(
contextGraphId, { publishPolicyMaxCacheAgeMs: HOST_MODE_PUBLISH_POLICY_MAX_CACHE_AGE_MS },
);
return accessPolicy === 0 && publishPolicy === 1;
} catch {
return false;
}
}

/**
* Register the host-mode gossip handler for `contextGraphId` and
* track its reference so {@link unwireSwmHostModeHandler} can
Expand Down Expand Up @@ -1009,33 +1061,57 @@ export class SwmHostModeMethods extends DKGAgentBase {
isCiphertext = skm.type === SWM_SENDER_KEY_MESSAGE_TYPE;
} catch { /* fall through */ }
}
if (!isCiphertext) return;

// Authority check: verify the envelope signature against the
// curated CG's agent allowlist. Without this, a topic-reachable
// peer can fill per-CG storage with valid-looking ciphertext
// and evict legitimate history.
// GH #1124 — a curated CG MUST carry ciphertext, so a non-ciphertext
// envelope there is garbage → drop early. A CONFIRMED-public (open) CG
// legitimately gossips PLAINTEXT SWM. Resolve the public flag and reuse it
// for both the plaintext gate and the authority check. UNKNOWN CGs stay on
// the drop path (safe; member catchup heals once the policy resolves).
//
// Use `storageCgId` (cleartext from the envelope) so the
// member-side meta-graph + chain-fallback resolvers in
// `verifyHostModeEnvelopeAuthority` work on the canonical id
// shape. The hash subscription key is internal bookkeeping;
// never crosses an external authorization boundary.
// LAZY by design (Branimir review #1239 follow-on): the self-signed public
// exception only matters for `!isCiphertext` traffic. So short-circuit on
// `!isCiphertext` to skip the (now chain-backed) policy resolution entirely
// on the dominant CIPHERTEXT/curated path — otherwise the bulk of host-mode
// traffic would pay a synchronous eth_call to compute a value it discards.
// Security-preserving: a ciphertext envelope on a public CG just stays in the
// curated authority path / opaque append and heals via catchup.
const confirmedPublic = !isCiphertext && await this.isConfirmedPublicForHostMode(storageCgId);
if (!isCiphertext && !confirmedPublic) return;

// Authority check. Curated traffic verifies the envelope signature against
// the CG's agent allowlist. For a self-publishable (open) CG, inject the
// on-chain policy RESOLVER (not a pre-decided flag): the SHARED verifier
// re-checks accessPolicy===0 && publishPolicy===1 itself, then validates the
// signature + timestamp-freshness AND binds the inner request to THIS CG —
// same envelope validation as curated, only the allowlist decision diverges
// (see SharedMemoryHandler.verifyHostModeEnvelopeAuthority).
//
// Use `storageCgId` (cleartext from the envelope) so the meta-graph +
// chain-fallback resolvers work on the canonical id shape.
const handler = this.getOrCreateSharedMemoryHandler();
const verdict = await handler.verifyHostModeEnvelopeAuthority(data, storageCgId, fromPeerId);
const verdict = await handler.verifyHostModeEnvelopeAuthority(
data, storageCgId, fromPeerId,
// Inject the on-chain policy RESOLVER (not a pre-decided flag) so the
// verifier enforces accessPolicy===0 && publishPolicy===1 itself and can
// take the self-signed path even when a STALE participant allowlist
// survives an open-publish flip. Lazy: pass it only for non-ciphertext,
// so the dominant ciphertext/curated path pays no chain read (the resolver
// shares the same ~5s publishPolicy cache window as the confirmedPublic
// resolution above, so this is at most a warm cache hit, never a 2nd RPC).
isCiphertext
? undefined
: {
resolveOpenPublishPolicy: () => this.getContextGraphOnChainPolicy(
storageCgId, { publishPolicyMaxCacheAgeMs: HOST_MODE_PUBLISH_POLICY_MAX_CACHE_AGE_MS },
),
},
);
if (!verdict.accepted) {
// "no agent allowlist" is the expected outcome during the brief
// chain-event race window (cores see the beacon, auto-engage
// host-mode, then receive ciphertext BEFORE the
// `ContextGraphCreated` event lands AND before the curator
// beacon arrived). The beaconCuratorOracle fallback closes
// most of that window; the remaining race (envelope arrives
// before the beacon is received & verified) is recoverable
// via member catchup and should not spam WARN logs in steady-
// state operation. Other rejection reasons (sig mismatch, peer
// not in allowlist, decode failure) remain WARN — those are
// real authority failures that operators need to see.
const isTransientRace = verdict.reason === 'no agent allowlist on context graph';
// 'no agent allowlist' on a NON-public CG is the expected brief chain-event
// race (curated allowlist not loaded yet) — recoverable via member catchup,
// so log at debug. Every other rejection (decode / unsigned / signature-or-
// freshness / peer-not-allowed / CG-mismatch) is a real authority failure
// operators should see.
const isTransientRace = verdict.reasonCode === 'NO_AGENT_ALLOWLIST';
if (isTransientRace) {
this.log.debug(
ctx,
Expand Down Expand Up @@ -1112,6 +1188,64 @@ export class SwmHostModeMethods extends DKGAgentBase {
}
}
}
// GH #1124 — make a CONFIRMED-PUBLIC host-only (non-member) core ACK-CAPABLE.
// The opaque `append` below retains the raw envelope so this host can serve
// member host-catchup (LU-6 replay), but the StorageACKHandler a publisher
// dials reads `<cg>/_shared_memory` from `this.store` (loadSWMQuads /
// sharedMemoryReadBothFilter) — it has NO path into SwmHostModeStore. So
// without ALSO applying the plaintext into that triple-store graph, a
// non-member host would still DECLINE `NO_DATA_IN_SWM` and a public CG's
// storage-ACK quorum stays unreachable on a host-mode (non-member) topology
// — the exact bug this PR claims to fix. Reuse the member apply path
// (`handle`) on the SAME, already-authority-verified envelope bytes; for a
// public CG it routes the plaintext quads to the per-KA SWM layer the ACK
// handler reads (graph-agnostic merkle, no re-skolemize), so the recompute
// matches and this host signs a quorum-eligible ACK exactly as a member does.
//
// SECURITY — the `if (confirmedPublic)` wrapper is the SOLE authority gate
// for this apply, and it is LOAD-BEARING: on a host-only core `handle()`
// CANNOT distinguish curated from public (a non-member holds no local `_meta`
// allowlist nor accessPolicy, so a curated AND a public CG both resolve to
// `agentGateAddresses === null` && `hasPrivateAccessPolicy === false`, and
// `handle()` would apply plaintext for EITHER). What guarantees this CG is
// genuinely public is `isConfirmedPublicForHostMode` — accessPolicy === 0
// (immutable) AND a FORCED-fresh publishPolicy === 1 (fail-closed on RPC
// error). DO NOT hoist this apply out of the `confirmedPublic` branch or
// reuse a `confirmedPublic` resolved further from the apply — either silently
// re-opens curated-plaintext injection into a non-member's SWM store.
// `verifyHostModeEnvelopeAuthority` already bound sig + 5-min freshness + CG +
// `publisherPeerId === fromPeerId` on these exact `data` bytes one block up,
// so `handle({ trustedReplay: true })` skips only the transport re-checks it
// already performed — for a public CG (agentGateAddresses === null) it skips
// no cryptography. Mirrors the catchup-replay call (~line 3575).
if (confirmedPublic) {
try {
const apply = await handler.handle(data, fromPeerId, undefined, { trustedReplay: true });
Comment thread
Bojan131 marked this conversation as resolved.
if (apply.applied) {
this.log.info(
ctx,
`Host-mode applied confirmed-public SWM plaintext cg=${storageCgId} triples=${apply.insertedTriples ?? 0} (now ACK-capable)`,
);
} else {
// Apply declined (validation / CAS / dedup). Keep going to the opaque
// append so member catchup still works; this host just won't ACK this
// share (it falls back to the pre-fix NO_DATA_IN_SWM decline). Logged
// at WARN so a SYSTEMATIC public-CG apply failure is observable here
// rather than only downstream as quorum-unmet.
const reason = 'reason' in apply ? apply.reason : 'unknown';
this.log.warn(
ctx,
`Host-mode confirmed-public SWM apply NOT applied cg=${storageCgId}: ${reason} (host keeps opaque copy for catchup but will DECLINE NO_DATA_IN_SWM on ACK)`,
);
}
} catch (err) {
// Never let an apply error drop the opaque retention path below.
this.log.warn(
ctx,
`Host-mode confirmed-public SWM apply threw cg=${storageCgId}: ${err instanceof Error ? err.message : String(err)} (opaque retention below unaffected)`,
);
}
}

const seqno = await this.swmHostModeStore.append(storageCgId, data);
Comment thread
Bojan131 marked this conversation as resolved.
this.log.debug(
Expand Down
53 changes: 53 additions & 0 deletions packages/agent/test/dkg-agent-on-chain-policy.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,59 @@ describe('DKGAgent.getContextGraphOnChainPolicy', () => {
expect(Date.now() - newTs).toBeLessThan(1_000);
});

// Branimir review on #1239 — the host-mode self-signed admission gate
// (`isConfirmedPublicForHostMode`) is a SECURITY-POSITIVE consumer of the
// cached `publishPolicy`, so it passes a SHORT `publishPolicyMaxCacheAgeMs`
// (~5s) instead of the general 60s TTL. The follow-on goal (per the re-review)
// is BOTH directions at once: within the short window the cache is trusted so
// the chain RPC is rate-capped to ~1 per window per CG (no eth_call per
// envelope), and PAST it the chain re-verifies so an open→curated downgrade is
// caught within seconds (not up to 60s).
it('publishPolicyMaxCacheAgeMs: within-window cache is trusted (no RPC); a beyond-window entry re-verifies on-chain (Branimir #1239 follow-on)', async () => {
const getPolicy = (stub: any, cg: string, maxAgeMs: number) =>
(DKGAgent.prototype as any).getContextGraphOnChainPolicy.call(stub, cg, { publishPolicyMaxCacheAgeMs: maxAgeMs });

// (a) FRESH entry (age 0) within a 5s window → cached "1" used, NO RPC.
const freshRpc = recorder(async () => ({ publishPolicy: 0, publishAuthority: '0x0000000000000000000000000000000000000000' }));
const fresh = makeStub({
subscribedContextGraphs: new Map([['cg-w', { onChainId: '88' }]]),
onChainPublishPolicyCache: new Map([['cg-w', 1]]),
onChainPublishPolicyCacheUpdatedAt: new Map([['cg-w', Date.now()]]),
onChainAccessPolicyCache: new Map([['cg-w', 0]]),
isContextGraphRegistered: recorder(async () => true),
chain: { getContextGraphPublishPolicy: freshRpc },
});
expect(await getPolicy(fresh, 'cg-w', 5_000)).toEqual({ accessPolicy: 0, publishPolicy: 1 });
expect(freshRpc.calls).toEqual([]); // rate-capped: no eth_call within the window

// (b) entry OLDER than the 5s window (6s) → bypassed, chain RPC re-verifies → "0".
const staleRpc = recorder(async () => ({ publishPolicy: 0, publishAuthority: '0x0000000000000000000000000000000000000000' }));
const stale = makeStub({
subscribedContextGraphs: new Map([['cg-w', { onChainId: '88' }]]),
onChainPublishPolicyCache: new Map([['cg-w', 1]]),
onChainPublishPolicyCacheUpdatedAt: new Map([['cg-w', Date.now() - 6_000]]),
onChainAccessPolicyCache: new Map([['cg-w', 0]]),
isContextGraphRegistered: recorder(async () => true),
chain: { getContextGraphPublishPolicy: staleRpc },
});
expect(await getPolicy(stale, 'cg-w', 5_000)).toEqual({ accessPolicy: 0, publishPolicy: 0 });
expect(staleRpc.calls.at(-1)).toEqual([88n]);

// (c) the SAME 6s-old entry is still FRESH under the default 60s TTL — the
// short window is a per-caller tightening, not a global behaviour change.
const defaultRpc = recorder(async () => ({ publishPolicy: 0, publishAuthority: '0x0000000000000000000000000000000000000000' }));
const def = makeStub({
subscribedContextGraphs: new Map([['cg-w', { onChainId: '88' }]]),
onChainPublishPolicyCache: new Map([['cg-w', 1]]),
onChainPublishPolicyCacheUpdatedAt: new Map([['cg-w', Date.now() - 6_000]]),
onChainAccessPolicyCache: new Map([['cg-w', 0]]),
isContextGraphRegistered: recorder(async () => true),
chain: { getContextGraphPublishPolicy: defaultRpc },
});
expect(await callPolicy(def, 'cg-w')).toEqual({ accessPolicy: 0, publishPolicy: 1 });
expect(defaultRpc.calls).toEqual([]); // 6s < 60s default TTL → cache hit, no RPC
});

// Round 3 — degenerate state: registered locally but the on-chain
// id resolution fails (e.g. corrupted ontology graph). We can't
// RPC without a numeric id, so we return whatever local triples
Expand Down
Loading
Loading