Skip to content

Commit 3d6800d

Browse files
authored
fix(gsoc): improvements (#4899)
1 parent 8c416af commit 3d6800d

File tree

17 files changed

+112
-181
lines changed

17 files changed

+112
-181
lines changed

pkg/api/api.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ import (
4848
"github.com/ethersphere/bee/v2/pkg/settlement/swap"
4949
"github.com/ethersphere/bee/v2/pkg/settlement/swap/chequebook"
5050
"github.com/ethersphere/bee/v2/pkg/settlement/swap/erc20"
51-
"github.com/ethersphere/bee/v2/pkg/soc"
5251
"github.com/ethersphere/bee/v2/pkg/status"
5352
"github.com/ethersphere/bee/v2/pkg/steward"
5453
storage "github.com/ethersphere/bee/v2/pkg/storage"
@@ -687,7 +686,7 @@ type putterSessionWrapper struct {
687686
}
688687

689688
func (p *putterSessionWrapper) Put(ctx context.Context, chunk swarm.Chunk) error {
690-
idAddress, err := soc.IdentityAddress(chunk)
689+
idAddress, err := storage.IdentityAddress(chunk)
691690
if err != nil {
692691
return err
693692
}

pkg/api/gsoc.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@ func (s *Service) gsocWsHandler(w http.ResponseWriter, r *http.Request) {
1818
logger := s.logger.WithName("gsoc_subscribe").Build()
1919

2020
paths := struct {
21-
Address []byte `map:"address" validate:"required"`
21+
Address swarm.Address `map:"address,resolve" validate:"required"`
2222
}{}
23+
2324
if response := s.mapStructure(mux.Vars(r), &paths); response != nil {
2425
response("invalid path params", logger, w)
2526
return
@@ -43,7 +44,7 @@ func (s *Service) gsocWsHandler(w http.ResponseWriter, r *http.Request) {
4344
go s.gsocListeningWs(conn, paths.Address)
4445
}
4546

46-
func (s *Service) gsocListeningWs(conn *websocket.Conn, socAddress []byte) {
47+
func (s *Service) gsocListeningWs(conn *websocket.Conn, socAddress swarm.Address) {
4748
defer s.wsWg.Done()
4849

4950
var (
@@ -56,7 +57,7 @@ func (s *Service) gsocListeningWs(conn *websocket.Conn, socAddress []byte) {
5657
ticker.Stop()
5758
_ = conn.Close()
5859
}()
59-
cleanup := s.gsoc.Subscribe([32]byte(socAddress), func(m []byte) {
60+
cleanup := s.gsoc.Subscribe(socAddress, func(m []byte) {
6061
select {
6162
case dataC <- m:
6263
case <-gone:

pkg/gsoc/gsoc.go

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,18 @@ import (
1212
"github.com/ethersphere/bee/v2/pkg/swarm"
1313
)
1414

15+
// Handler defines code to be executed upon reception of a GSOC sub message.
16+
// it is used as a parameter definition.
17+
type Handler func([]byte)
18+
1519
type Listener interface {
16-
Subscribe(address [32]byte, handler Handler) (cleanup func())
20+
Subscribe(address swarm.Address, handler Handler) (cleanup func())
1721
Handle(c *soc.SOC)
1822
Close() error
1923
}
2024

2125
type listener struct {
22-
handlers map[[32]byte][]*Handler
26+
handlers map[string][]*Handler
2327
handlersMu sync.Mutex
2428
quit chan struct{}
2529
logger log.Logger
@@ -29,26 +33,26 @@ type listener struct {
2933
func New(logger log.Logger) Listener {
3034
return &listener{
3135
logger: logger,
32-
handlers: make(map[[32]byte][]*Handler),
36+
handlers: make(map[string][]*Handler),
3337
quit: make(chan struct{}),
3438
}
3539
}
3640

3741
// Subscribe allows the definition of a Handler func on a specific GSOC address.
38-
func (l *listener) Subscribe(address [32]byte, handler Handler) (cleanup func()) {
42+
func (l *listener) Subscribe(address swarm.Address, handler Handler) (cleanup func()) {
3943
l.handlersMu.Lock()
4044
defer l.handlersMu.Unlock()
4145

42-
l.handlers[address] = append(l.handlers[address], &handler)
46+
l.handlers[address.ByteString()] = append(l.handlers[address.ByteString()], &handler)
4347

4448
return func() {
4549
l.handlersMu.Lock()
4650
defer l.handlersMu.Unlock()
4751

48-
h := l.handlers[address]
52+
h := l.handlers[address.ByteString()]
4953
for i := 0; i < len(h); i++ {
5054
if h[i] == &handler {
51-
l.handlers[address] = append(h[:i], h[i+1:]...)
55+
l.handlers[address.ByteString()] = append(h[:i], h[i+1:]...)
5256
return
5357
}
5458
}
@@ -61,13 +65,11 @@ func (l *listener) Handle(c *soc.SOC) {
6165
if err != nil {
6266
return // no handler
6367
}
64-
h := l.getHandlers([32]byte(addr.Bytes()))
68+
h := l.getHandlers(addr)
6569
if h == nil {
6670
return // no handler
6771
}
68-
l.logger.Info("new incoming GSOC message",
69-
"GSOC Address", addr,
70-
"wrapped chunk address", c.WrappedChunk().Address())
72+
l.logger.Debug("new incoming GSOC message", "GSOC Address", addr, "wrapped chunk address", c.WrappedChunk().Address())
7173

7274
for _, hh := range h {
7375
go func(hh Handler) {
@@ -76,23 +78,19 @@ func (l *listener) Handle(c *soc.SOC) {
7678
}
7779
}
7880

79-
func (p *listener) getHandlers(address [32]byte) []*Handler {
81+
func (p *listener) getHandlers(address swarm.Address) []*Handler {
8082
p.handlersMu.Lock()
8183
defer p.handlersMu.Unlock()
8284

83-
return p.handlers[address]
85+
return p.handlers[address.ByteString()]
8486
}
8587

8688
func (l *listener) Close() error {
8789
close(l.quit)
8890
l.handlersMu.Lock()
8991
defer l.handlersMu.Unlock()
9092

91-
l.handlers = make(map[[32]byte][]*Handler) //unset handlers on shutdown
93+
l.handlers = make(map[string][]*Handler) //unset handlers on shutdown
9294

9395
return nil
9496
}
95-
96-
// Handler defines code to be executed upon reception of a GSOC sub message.
97-
// it is used as a parameter definition.
98-
type Handler func([]byte)

pkg/gsoc/gsoc_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ func TestRegister(t *testing.T) {
2121
t.Parallel()
2222

2323
var (
24-
g = gsoc.New(log.NewLogger("test"))
24+
g = gsoc.New(log.Noop)
2525
h1Calls = 0
2626
h2Calls = 0
2727
h3Calls = 0
@@ -52,8 +52,8 @@ func TestRegister(t *testing.T) {
5252
msgChan <- struct{}{}
5353
}
5454
)
55-
_ = g.Subscribe([32]byte(address1.Bytes()), h1)
56-
_ = g.Subscribe([32]byte(address2.Bytes()), h2)
55+
_ = g.Subscribe(address1, h1)
56+
_ = g.Subscribe(address2, h2)
5757

5858
ch1, _ := cac.New(payload1)
5959
socCh1 := soc.New(socId1, ch1)
@@ -74,7 +74,7 @@ func TestRegister(t *testing.T) {
7474
ensureCalls(t, &h2Calls, 0)
7575

7676
// register another handler on the first address
77-
cleanup := g.Subscribe([32]byte(address1.Bytes()), h3)
77+
cleanup := g.Subscribe(address1, h3)
7878

7979
g.Handle(socCh1)
8080

pkg/postage/stamp_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
"github.com/ethersphere/bee/v2/pkg/postage"
1515
"github.com/ethersphere/bee/v2/pkg/postage/batchstore/mock"
1616
postagetesting "github.com/ethersphere/bee/v2/pkg/postage/testing"
17-
"github.com/ethersphere/bee/v2/pkg/soc"
17+
"github.com/ethersphere/bee/v2/pkg/storage"
1818
"github.com/ethersphere/bee/v2/pkg/storage/inmemstore"
1919
chunktesting "github.com/ethersphere/bee/v2/pkg/storage/testing"
2020
)
@@ -104,7 +104,7 @@ func TestValidStamp(t *testing.T) {
104104
// stamp on execution
105105
ch := chunktesting.GenerateTestRandomChunk()
106106

107-
idAddress, err := soc.IdentityAddress(ch)
107+
idAddress, err := storage.IdentityAddress(ch)
108108
if err != nil {
109109
t.Fatal(err)
110110
}

pkg/pusher/pusher.go

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"github.com/ethersphere/bee/v2/pkg/log"
1919
"github.com/ethersphere/bee/v2/pkg/postage"
2020
"github.com/ethersphere/bee/v2/pkg/pushsync"
21-
"github.com/ethersphere/bee/v2/pkg/soc"
2221
storage "github.com/ethersphere/bee/v2/pkg/storage"
2322
"github.com/ethersphere/bee/v2/pkg/swarm"
2423
"github.com/ethersphere/bee/v2/pkg/topology"
@@ -36,6 +35,8 @@ type Op struct {
3635
Err chan error
3736
Direct bool
3837
Span opentracing.Span
38+
39+
identityAddress swarm.Address
3940
}
4041

4142
type OpChan <-chan *Op
@@ -215,10 +216,12 @@ func (s *Service) chunksWorker(warmupTime time.Duration) {
215216
for {
216217
select {
217218
case op := <-cc:
218-
idAddress, err := soc.IdentityAddress(op.Chunk)
219+
idAddress, err := storage.IdentityAddress(op.Chunk)
219220
if err != nil {
220221
op.Err <- err
222+
continue
221223
}
224+
op.identityAddress = idAddress
222225
if s.inflight.set(idAddress, op.Chunk.Stamp().BatchID()) {
223226
if op.Direct {
224227
select {
@@ -245,12 +248,8 @@ func (s *Service) chunksWorker(warmupTime time.Duration) {
245248

246249
func (s *Service) pushDeferred(ctx context.Context, logger log.Logger, op *Op) (bool, error) {
247250
loggerV1 := logger.V(1).Build()
248-
idAddress, err := soc.IdentityAddress(op.Chunk)
249-
if err != nil {
250-
return true, err
251-
}
252251

253-
defer s.inflight.delete(idAddress, op.Chunk.Stamp().BatchID())
252+
defer s.inflight.delete(op.identityAddress, op.Chunk.Stamp().BatchID())
254253

255254
if _, err := s.validStamp(op.Chunk); err != nil {
256255
loggerV1.Warning(
@@ -278,7 +277,7 @@ func (s *Service) pushDeferred(ctx context.Context, logger log.Logger, op *Op) (
278277
return true, err
279278
}
280279
case errors.Is(err, pushsync.ErrShallowReceipt):
281-
if retry := s.shallowReceipt(idAddress); retry {
280+
if retry := s.shallowReceipt(op.identityAddress); retry {
282281
return true, err
283282
}
284283
if err := s.storer.Report(ctx, op.Chunk, storage.ChunkSynced); err != nil {
@@ -300,13 +299,11 @@ func (s *Service) pushDeferred(ctx context.Context, logger log.Logger, op *Op) (
300299

301300
func (s *Service) pushDirect(ctx context.Context, logger log.Logger, op *Op) error {
302301
loggerV1 := logger.V(1).Build()
303-
idAddress, err := soc.IdentityAddress(op.Chunk)
304-
if err != nil {
305-
return err
306-
}
302+
303+
var err error
307304

308305
defer func() {
309-
s.inflight.delete(idAddress, op.Chunk.Stamp().BatchID())
306+
s.inflight.delete(op.identityAddress, op.Chunk.Stamp().BatchID())
310307
select {
311308
case op.Err <- err:
312309
default:

pkg/pusher/pusher_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,7 @@ func TestPusherRetryShallow(t *testing.T) {
333333
// generate a chunk at PO 1 with closestPeer, meaning that we get a
334334
// receipt which is shallower than the pivot peer's depth, resulting
335335
// in retries
336-
chunk := testingc.GenerateTestRandomChunkAt(t, closestPeer, 1)
336+
chunk := testingc.GenerateValidRandomChunkAt(t, closestPeer, 1)
337337

338338
storer.chunks <- chunk
339339

pkg/pushsync/pushsync.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo
361361
sentErrorsLeft = maxPushErrors
362362
}
363363

364-
idAddress, err := soc.IdentityAddress(ch)
364+
idAddress, err := storage.IdentityAddress(ch)
365365
if err != nil {
366366
return nil, err
367367
}

pkg/soc/utils.go

Lines changed: 0 additions & 34 deletions
This file was deleted.

pkg/storage/storage.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818

1919
var (
2020
ErrOverwriteNewerChunk = errors.New("overwriting chunk with newer timestamp")
21+
ErrUnknownChunkType = errors.New("unknown chunk type")
2122
)
2223

2324
// Result represents the item returned by the read operation, which returns
@@ -293,3 +294,35 @@ func ChunkType(ch swarm.Chunk) swarm.ChunkType {
293294
}
294295
return swarm.ChunkTypeUnspecified
295296
}
297+
298+
// IdentityAddress returns the internally used address for the chunk
299+
// since the single owner chunk address is not a unique identifier for the chunk,
300+
// but hashing the soc address and the wrapped chunk address is.
301+
// it is used in the reserve sampling and other places where a key is needed to represent a chunk.
302+
func IdentityAddress(chunk swarm.Chunk) (swarm.Address, error) {
303+
304+
if cac.Valid(chunk) {
305+
return chunk.Address(), nil
306+
}
307+
308+
// check the chunk is single owner chunk or cac
309+
if sch, err := soc.FromChunk(chunk); err == nil {
310+
socAddress, err := sch.Address()
311+
if err != nil {
312+
return swarm.ZeroAddress, err
313+
}
314+
h := swarm.NewHasher()
315+
_, err = h.Write(socAddress.Bytes())
316+
if err != nil {
317+
return swarm.ZeroAddress, err
318+
}
319+
_, err = h.Write(sch.WrappedChunk().Address().Bytes())
320+
if err != nil {
321+
return swarm.ZeroAddress, err
322+
}
323+
324+
return swarm.NewAddress(h.Sum(nil)), nil
325+
}
326+
327+
return swarm.ZeroAddress, fmt.Errorf("identity address failed on chunk %s: %w", chunk, ErrUnknownChunkType)
328+
}

0 commit comments

Comments
 (0)