feat(dag/walker): opt-in BloomTracker to avoid duplicated walks#1124
feat(dag/walker): opt-in BloomTracker to avoid duplicated walks#1124
Conversation
c16a5b7 to
4dad6c1
Compare
VisitedTracker interface for memory-efficient DAG traversal dedup. BloomTracker uses a scalable bloom filter chain (~4 bytes/CID vs ~75 for a map), enabling dedup on repos with tens of millions of CIDs. - BloomTracker: auto-scaling chain, configurable FP rate via BloomParams, unique random SipHash keys per instance (uncorrelated FPs across nodes) - MapTracker: exact dedup for tests and small datasets - *cid.Set satisfies the interface for drop-in compatibility - go.mod: update ipfs/bbloom to master (for NewWithKeys)
4dad6c1 to
c8962fc
Compare
iterative DFS walker that integrates VisitedTracker dedup directly into the traversal loop, skipping entire subtrees in O(1). - LinksFetcherFromBlockstore: extracts links from any codec registered in the global multicodec registry (dag-pb, dag-cbor, raw, etc.) - ~2x faster than legacy go-ipld-prime selector traversal (no selector machinery, simpler decoding, fewer allocations) - WithLocality option for MFS providers to skip non-local blocks - best-effort error handling: fetch failures log and skip, do not mark the CID as visited (allows retry via another pin or next cycle) - benchmarks comparing BlockAll vs WalkDAG across dag-pb, dag-cbor, and mixed-codec DAGs
19bf557 to
224c2ae
Compare
Codecov Report❌ Patch coverage is @@ Coverage Diff @@
## main #1124 +/- ##
==========================================
+ Coverage 62.56% 62.77% +0.21%
==========================================
Files 261 265 +4
Lines 26216 26539 +323
==========================================
+ Hits 16402 16660 +258
- Misses 8125 8170 +45
- Partials 1689 1709 +20
... and 7 files with indirect coverage changes 🚀 New features to boost your workflow:
|
emits entity roots (files, directories, HAMT shards) skipping internal file chunks. core of the +entities provide strategy. - NodeFetcherFromBlockstore: detects UnixFS entity type from the ipld-prime decoded node's Data field - directories and HAMT shards: emit and recurse into children - non-UnixFS codecs (dag-cbor, dag-json): emit and follow links - same options as WalkDAG: WithVisitedTracker, WithLocality - tests: dag-pb, raw, dag-cbor, mixed codecs, HAMT, dedup, error handling, stop conditions
catch unexpected regressions in ipfs/bbloom behavior or BloomParams derivation that would silently degrade the false positive rate. - measurable rate (1/1000): 100K probes produce observable FPs, asserts rate is within 5x of target - default rate (1/4.75M): 100K probes must produce exactly 0 FPs
- NewPrioritizedProvider: stream init error no longer stops remaining streams (e.g. MFS flush error does not prevent pinned content from being provided) - NewConcatProvider: concatenates pre-deduplicated streams without its own visited set, for use with shared VisitedTracker
…vider NewUniquePinnedProvider: emits all pinned blocks with cross-pin dedup via shared VisitedTracker (bloom or map). walks recursive pin DAGs first, then direct pins. NewPinnedEntityRootsProvider: same structure but uses WalkEntityRoots, emitting only entity roots and skipping internal file chunks. existing NewPinnedProvider is unchanged.
- remove unused daggen variable in uniquepinprovider_test.go
…tency match the defensive read-side ctx.Done select pattern already used by NewPrioritizedProvider in the same file
- deduplicate LinkSystem construction used by both LinksFetcherFromBlockstore and NodeFetcherFromBlockstore - wrap blockstore with NewIdStore so identity CIDs (multihash 0x00, data inline in the CID) are decoded without a datastore lookup
identity CIDs (multihash 0x00) embed data inline, so providing them to the DHT is wasteful. the walker now traverses through identity CIDs (following their links) but never emits them. - add isIdentityCID check to WalkDAG and WalkEntityRoots - simplify WalkEntityRoots emit/descend logic - tests for identity raw leaf, identity dag-pb directory with normal children, normal directory with identity child
- inline identity CID check (c.Prefix().MhType == mh.IDENTITY) in all emit paths: WalkDAG, WalkEntityRoots, and direct pin loops in both NewUniquePinnedProvider and NewPinnedEntityRootsProvider - move all identity CID tests to dag/walker/identity_test.go - add provider-level identity tests for direct pins and recursive DAGs
the stack-based DFS was pushing children in link order, causing the last child to be popped first (right-to-left). reverse children before pushing so the first link is on top and gets visited first. this matches the legacy fetcherhelpers.BlockAll selector traversal (ipld-prime iterates list/map entries in insertion order) and the conventional DFS order described in IPIP-0412. - walker.go, entity.go: slices.Reverse(children) before stack push - walker.go: document traversal order in WalkDAG godoc - entity.go: document order parity in WalkEntityRoots godoc - walker_test.go, entity_test.go: add sibling order regression tests
a corrupted pin entry was stopping the entire provide cycle because the goroutine returned on RecursiveKeys/DirectKeys error. change to continue so remaining pins are still provided (best-effort). the error from the pinner iterator already contains context (bad CID bytes, datastore key, etc.) -- sc.Pin.Key is zero-value on error so including it in the log would be noise. matches the best-effort pattern used in WalkDAG/WalkEntityRoots where fetch errors are logged and skipped.
- collectLinks: note that map keys are not recursed (no known codec uses link-typed map keys) - detectEntityType: extract c.Prefix() once for readability - grow: document MinBloomCapacity invariant that prevents small-bitset FP rate issues in grown blooms
gammazero
left a comment
There was a problem hiding this comment.
Made a few suggestions but nothing blocking.
uniquepinprovider: use skip-early style for tracker.Visit in direct pin loops (clearer control flow) visited.go: document that VisitedTracker implementations may be probabilistic, and must keep FP rate negligible or allow callers to adjust it
log capacity, FP rate, and hash parameters on creation. log previous/new capacity and chain length on autoscale. helps operators understand bloom sizing and detect unexpected growth during reprovide cycles.
counts Visit() calls that returned false (CID already seen). callers can log this after a reprovide cycle to show how much dedup the bloom filter achieved.
guillaumemichel
left a comment
There was a problem hiding this comment.
Neat implementation! A few comments inline, but nothing major
|
|
||
| func (bt *BloomTracker) Has(c cid.Cid) bool { | ||
| key := []byte(c.Hash()) | ||
| for _, b := range bt.chain { |
There was a problem hiding this comment.
Maybe document the tradeoff of the order in which filters are checked.
Old -> new (current):
Works some CIDs are very often repeated. These CIDs are likely to land in first filter hence this is the one we want to check first. Nodes landing in newer filters are expected to be less frequent.
New -> old:
Duplicates are close to each other in the DAG traversal (e.g after a CID is visited, its duplicates are expected to be in the next 10K visited nodes). Hence it is better to check newest filters first since it has the highest probability to contain the duplicate.
| // Check earlier blooms for the CID. If any reports it as | ||
| // present (true positive from a prior growth epoch, or rare | ||
| // cross-bloom false positive), skip it. | ||
| earlier := bt.chain[:len(bt.chain)-1] |
There was a problem hiding this comment.
Same remark as https://github.com/ipfs/boxo/pull/1124/changes#r3020485430 concerning bloom filter iteration order
| // 1. [VisitedTracker].Visit -- if already visited, skip entire subtree. | ||
| // The CID is marked visited immediately (before fetch). If fetch | ||
| // later fails, the CID stays in the tracker and won't be retried | ||
| // this cycle -- caught in the next cycle (22h). This avoids a |
There was a problem hiding this comment.
22h is a Kubo DHT specific parameter, not sure we want to hardcode it in the comment here
With reprovide sweep it corresponds to the keystore GC interval: calling KeyChanFunc to replace the set of keys being periodically reprovided.
With legacy reprovide it corresponds to the reprovide interval (also calling KeyChanFunc)
| slices.Reverse(children) | ||
| stack = append(stack, children...) | ||
|
|
||
| // skip identity CIDs: content is inline, no need to provide |
There was a problem hiding this comment.
If the dag walker has other purpose than providing, I would suggest leaving this filtering to the provide system.
There was a problem hiding this comment.
Or instead of blocking IDENTITY allow caller to pass a blocklist as option, so that provide systems can block IDENTITY?
| func WalkEntityRoots( | ||
| ctx context.Context, | ||
| root cid.Cid, | ||
| fetch NodeFetcher, | ||
| emit func(cid.Cid) bool, | ||
| opts ...Option, | ||
| ) error { | ||
| cfg := &walkConfig{} | ||
| for _, o := range opts { | ||
| o(cfg) | ||
| } | ||
|
|
||
| stack := []cid.Cid{root} | ||
|
|
||
| for len(stack) > 0 { | ||
| if ctx.Err() != nil { | ||
| return ctx.Err() | ||
| } | ||
|
|
||
| // pop | ||
| c := stack[len(stack)-1] | ||
| stack = stack[:len(stack)-1] | ||
|
|
||
| // dedup via tracker | ||
| if cfg.tracker != nil && !cfg.tracker.Visit(c) { | ||
| continue | ||
| } | ||
|
|
||
| // locality check | ||
| if cfg.locality != nil { | ||
| local, err := cfg.locality(ctx, c) | ||
| if err != nil { | ||
| log.Errorf("entity walk: locality check %s: %s", c, err) | ||
| continue | ||
| } | ||
| if !local { | ||
| continue | ||
| } | ||
| } | ||
|
|
||
| // fetch block and detect entity type | ||
| children, entityType, err := fetch(ctx, c) | ||
| if err != nil { | ||
| log.Errorf("entity walk: fetch %s: %s", c, err) | ||
| continue | ||
| } | ||
|
|
||
| // decide whether to descend into children | ||
| descend := entityType != EntityFile && entityType != EntitySymlink | ||
| if descend { | ||
| // reverse so first link is popped next (left-to-right | ||
| // sibling order, matching WalkDAG and legacy BlockAll) | ||
| slices.Reverse(children) | ||
| stack = append(stack, children...) | ||
| } | ||
|
|
||
| // skip identity CIDs: content is inline, no need to provide. | ||
| // we still descend (above) so an inlined dag-pb directory's | ||
| // normal children get provided. | ||
| if c.Prefix().MhType == mh.IDENTITY { | ||
| continue | ||
| } | ||
|
|
||
| if !emit(c) { | ||
| return nil | ||
| } | ||
| } | ||
|
|
||
| return nil | ||
| } |
There was a problem hiding this comment.
A lot of common code with WalkDAG(). Would it make sense to consolidate them as possible?
| stack := []cid.Cid{root} | ||
|
|
||
| for len(stack) > 0 { | ||
| if ctx.Err() != nil { |
There was a problem hiding this comment.
nit: since checking ctx.Err() implies acquiring a mutex, we could check it only every 1k operations or similar?
| log.Errorf("entity provide recursive pins: %s", sc.Err) | ||
| continue | ||
| } | ||
| if err := walker.WalkEntityRoots(ctx, sc.Pin.Key, fetch, emit, walker.WithVisitedTracker(tracker)); err != nil { |
There was a problem hiding this comment.
This seems to be the only line where NewPinnedEntityRootsProvider() differs from NewUniquePinnedProvider(). Consolidating the functions body would increase clarity.
|
|
||
| ### Fixed | ||
|
|
||
| - `pinner`: `NewUniquePinnedProvider` and `NewPinnedEntityRootsProvider` now log and skip corrupted pin entries instead of aborting the entire provide cycle, allowing remaining pins to still be provided. [#1124](https://github.com/ipfs/boxo/pull/1124) |
There was a problem hiding this comment.
Should this be in the Added section?
Warning
not ready for review, this is a sandbox for running CI
Summary
New
dag/walkerpackage for memory-efficient DAG traversal with bloom filter deduplication, plus new pinned-provider strategies that use it to avoid re-announcing duplicate blocks across pins.dag/walker(new package)VisitedTrackerinterface with two implementations:BloomTracker-- auto-scaling bloom filter chain (~4 bytes/CID vs ~75 for a map), with uncorrelated false positives across nodes (unique random SipHash keys per instance)MapTracker-- exact dedup for tests and small datasetsWalkDAG-- iterative DFS traversal with integrated dedup, codec-agnostic link extraction (dag-pb, dag-cbor, raw, and any registered codec). ~2x faster than the legacygo-ipld-primeselector-based walker.WalkEntityRoots-- entity-aware traversal that emits only file/directory/HAMT shard roots instead of every block, skipping internal file chunks.pinnerNewUniquePinnedProvider-- emits all blocks reachable from pins with cross-pin bloom dedup (recursive DAGs first, then direct pins).NewPinnedEntityRootsProvider-- same but emits only entity roots viaWalkEntityRoots.providerNewPrioritizedProvidernow continues to the next stream when one fails instead of stopping all streams.NewConcatProvideradded for pre-deduplicated streams that don't need thecidutil.StreamingSetoverhead.Other