Skip to content

Commit e1eecef

Browse files
committed
feat: uniqueness depth and pull syncing
1 parent 15a8d02 commit e1eecef

File tree

1 file changed

+50
-25
lines changed

1 file changed

+50
-25
lines changed

pkg/puller/puller.go

Lines changed: 50 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -157,10 +157,14 @@ func (p *Puller) manage(ctx context.Context) {
157157
// peersDisconnected is used to mark and prune peers that are no longer connected.
158158
peersDisconnected := maps.Clone(p.syncPeers)
159159

160+
var neighbors []swarm.Address
160161
_ = p.topology.EachConnectedPeerRev(func(addr swarm.Address, po uint8) (stop, jumpToNext bool, err error) {
161162
if _, ok := p.syncPeers[addr.ByteString()]; !ok {
162163
p.syncPeers[addr.ByteString()] = newSyncPeer(addr, p.bins, po)
163164
}
165+
if po >= newRadius {
166+
neighbors = append(neighbors, addr)
167+
}
164168
delete(peersDisconnected, addr.ByteString())
165169
return false, false, nil
166170
}, topology.Select{})
@@ -169,6 +173,28 @@ func (p *Puller) manage(ctx context.Context) {
169173
p.disconnectPeer(peer.address)
170174
}
171175

176+
// minUd := uint8(255)
177+
for i, target := range neighbors {
178+
maxUd := uint8(0)
179+
180+
// find the uniqueness depth, within which they are the only peer in the set
181+
for j, neighbor := range neighbors {
182+
if i == j {
183+
continue
184+
}
185+
186+
ud := swarm.Proximity(target.Bytes(), neighbor.Bytes()) + 1
187+
if ud > maxUd {
188+
maxUd = ud
189+
}
190+
}
191+
p.syncPeers[target.ByteString()].ud = int8(maxUd)
192+
193+
// if maxUd < minUd {
194+
// minUd = maxUd
195+
// }
196+
}
197+
172198
p.recalcPeers(ctx, newRadius)
173199
}
174200

@@ -216,14 +242,23 @@ func (p *Puller) recalcPeers(ctx context.Context, storageRadius uint8) {
216242
p.logger.Debug("sync peer failed", "peer_address", peer.address, "error", err)
217243
}
218244
}(peer)
245+
// TODO: sync bins above or equal to storageRadius not covered by syncpeer
219246
}
220247
wg.Wait()
221248
}
222249

223250
func (p *Puller) syncPeer(ctx context.Context, peer *syncPeer, storageRadius uint8) error {
251+
if peer.ud == -1 { // not neighbor
252+
peer.stop()
253+
return nil
254+
}
224255
peer.mtx.Lock()
225256
defer peer.mtx.Unlock()
226257

258+
// If the peer's epoch has changed (indicating a reserve reset or storage change on the peer):
259+
// - Cancel all ongoing bin syncs for this peer.
260+
// - Reset all previously synced intervals for this peer (to force a fresh sync).
261+
// This guarantees that sync state is consistent with the peer's current reserve, and avoids pulling stale or irrelevant data.
227262
if peer.cursors == nil {
228263
cursors, epoch, err := p.syncer.GetCursors(ctx, peer.address)
229264
if err != nil {
@@ -257,6 +292,11 @@ func (p *Puller) syncPeer(ctx context.Context, peer *syncPeer, storageRadius uin
257292
return errCursorsLength
258293
}
259294

295+
// sync PO bin only
296+
if !peer.isBinSyncing(peer.po) {
297+
p.syncPeerBin(ctx, peer, peer.po, peer.cursors[peer.po])
298+
}
299+
260300
/*
261301
The syncing behavior diverges for peers outside and within the storage radius.
262302
For neighbor peers, we sync ALL bins greater than or equal to the storage radius.
@@ -265,33 +305,16 @@ func (p *Puller) syncPeer(ctx context.Context, peer *syncPeer, storageRadius uin
265305
no syncing is done.
266306
*/
267307

268-
if peer.po >= storageRadius {
269-
270-
// cancel all bins lower than the storage radius
271-
for bin := uint8(0); bin < storageRadius; bin++ {
272-
peer.cancelBin(bin)
273-
}
274-
275-
// sync all bins >= storage radius
276-
for bin, cur := range peer.cursors {
277-
if bin >= int(storageRadius) && !peer.isBinSyncing(uint8(bin)) {
278-
p.syncPeerBin(ctx, peer, uint8(bin), cur)
279-
}
280-
}
308+
// cancel all bins lower than the storage radius
309+
for bin := uint8(0); bin < storageRadius; bin++ {
310+
peer.cancelBin(bin)
311+
}
281312

282-
} else if storageRadius-peer.po <= maxPODelta {
283-
// cancel all non-po bins, if any
284-
for bin := uint8(0); bin < p.bins; bin++ {
285-
if bin != peer.po {
286-
peer.cancelBin(bin)
287-
}
313+
// sync all bins >= uniqueness depth or peer PO equals to bin
314+
for bin, cur := range peer.cursors {
315+
if (bin >= int(peer.ud) || bin == int(peer.po)) && !peer.isBinSyncing(uint8(bin)) {
316+
p.syncPeerBin(ctx, peer, uint8(bin), cur)
288317
}
289-
// sync PO bin only
290-
if !peer.isBinSyncing(peer.po) {
291-
p.syncPeerBin(ctx, peer, peer.po, peer.cursors[peer.po])
292-
}
293-
} else {
294-
peer.stop()
295318
}
296319

297320
return nil
@@ -540,6 +563,7 @@ type syncPeer struct {
540563
address swarm.Address
541564
binCancelFuncs map[uint8]func() // slice of context cancel funcs for historical sync. index is bin
542565
po uint8
566+
ud int8 // uniqueness depth (-1 if not neighbor)
543567
cursors []uint64
544568

545569
mtx sync.Mutex
@@ -551,6 +575,7 @@ func newSyncPeer(addr swarm.Address, bins, po uint8) *syncPeer {
551575
address: addr,
552576
binCancelFuncs: make(map[uint8]func(), bins),
553577
po: po,
578+
ud: -1, // calculated later when all neighbors are collected
554579
}
555580
}
556581

0 commit comments

Comments
 (0)