diff --git a/.github/workflows/beekeeper.yml b/.github/workflows/beekeeper.yml index 4c69fce7a6f..3ae978ec5b6 100644 --- a/.github/workflows/beekeeper.yml +++ b/.github/workflows/beekeeper.yml @@ -14,7 +14,7 @@ env: SETUP_CONTRACT_IMAGE: "ethersphere/bee-localchain" SETUP_CONTRACT_IMAGE_TAG: "0.9.4" BEELOCAL_BRANCH: "main" - BEEKEEPER_BRANCH: "master" + BEEKEEPER_BRANCH: "feat/soc-dispersed" BEEKEEPER_METRICS_ENABLED: false REACHABILITY_OVERRIDE_PUBLIC: true BATCHFACTOR_OVERRIDE_PUBLIC: 2 diff --git a/openapi/Swarm.yaml b/openapi/Swarm.yaml index 7f48e32d329..70115c3c8fc 100644 --- a/openapi/Swarm.yaml +++ b/openapi/Swarm.yaml @@ -879,6 +879,7 @@ paths: - $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageStamp" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmAct" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmActHistoryAddress" + - $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter" requestBody: required: true description: The SOC binary data is composed of the span (8 bytes) and the at most 4KB payload. @@ -926,6 +927,7 @@ paths: description: Arbitrary identifier of the related data - $ref: "SwarmCommon.yaml#/components/parameters/SwarmOnlyRootChunkParameter" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmCache" + - $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyStrategyParameter" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyFallbackModeParameter" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmChunkRetrievalTimeoutParameter" @@ -977,6 +979,7 @@ paths: - $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageBatchId" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmAct" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmActHistoryAddress" + - $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter" responses: "201": description: Created @@ -1035,6 +1038,7 @@ paths: - $ref: "SwarmCommon.yaml#/components/parameters/SwarmFeedLegacyResolve" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmOnlyRootChunkParameter" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmCache" + - $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyLevelParameter" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyStrategyParameter" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmRedundancyFallbackModeParameter" - $ref: "SwarmCommon.yaml#/components/parameters/SwarmChunkRetrievalTimeoutParameter" diff --git a/pkg/api/accesscontrol.go b/pkg/api/accesscontrol.go index 1ae0fb2fe6e..78c2a0e11b5 100644 --- a/pkg/api/accesscontrol.go +++ b/pkg/api/accesscontrol.go @@ -126,7 +126,7 @@ func (s *Service) actDecryptionHandler() func(h http.Handler) http.Handler { cache = *headers.Cache } ctx := r.Context() - ls := loadsave.NewReadonly(s.storer.Download(cache), s.storer.Cache(), redundancy.DefaultLevel) + ls := loadsave.NewReadonly(s.storer.Download(cache), s.storer.Cache(), redundancy.PARANOID) reference, err := s.accesscontrol.DownloadHandler(ctx, ls, paths.Address, headers.Publisher, *headers.HistoryAddress, timestamp) if err != nil { logger.Debug("access control download failed", "error", err) @@ -159,7 +159,7 @@ func (s *Service) actEncryptionHandler( historyRootHash swarm.Address, ) (swarm.Address, swarm.Address, error) { publisherPublicKey := &s.publicKey - ls := loadsave.New(s.storer.Download(true), s.storer.Cache(), requestPipelineFactory(ctx, putter, false, redundancy.NONE), redundancy.DefaultLevel) + ls := loadsave.New(s.storer.Download(true), s.storer.Cache(), requestPipelineFactory(ctx, putter, false, redundancy.NONE), redundancy.PARANOID) storageReference, historyReference, encryptedReference, err := s.accesscontrol.UploadHandler(ctx, ls, reference, publisherPublicKey, historyRootHash) if err != nil { return swarm.ZeroAddress, swarm.ZeroAddress, err @@ -204,7 +204,7 @@ func (s *Service) actListGranteesHandler(w http.ResponseWriter, r *http.Request) cache = *headers.Cache } publisher := &s.publicKey - ls := loadsave.NewReadonly(s.storer.Download(cache), s.storer.Cache(), redundancy.DefaultLevel) + ls := loadsave.NewReadonly(s.storer.Download(cache), s.storer.Cache(), redundancy.PARANOID) grantees, err := s.accesscontrol.Get(r.Context(), ls, publisher, paths.GranteesAddress) if err != nil { logger.Debug("could not get grantees", "error", err) @@ -344,8 +344,8 @@ func (s *Service) actGrantRevokeHandler(w http.ResponseWriter, r *http.Request) granteeref := paths.GranteesAddress publisher := &s.publicKey - ls := loadsave.New(s.storer.Download(true), s.storer.Cache(), requestPipelineFactory(ctx, putter, false, redundancy.NONE), redundancy.DefaultLevel) - gls := loadsave.New(s.storer.Download(true), s.storer.Cache(), requestPipelineFactory(ctx, putter, granteeListEncrypt, redundancy.NONE), redundancy.DefaultLevel) + ls := loadsave.New(s.storer.Download(true), s.storer.Cache(), requestPipelineFactory(ctx, putter, false, redundancy.NONE), redundancy.PARANOID) + gls := loadsave.New(s.storer.Download(true), s.storer.Cache(), requestPipelineFactory(ctx, putter, granteeListEncrypt, redundancy.NONE), redundancy.PARANOID) granteeref, encryptedglref, historyref, actref, err := s.accesscontrol.UpdateHandler(ctx, ls, gls, granteeref, historyAddress, publisher, grantees.Addlist, grantees.Revokelist) if err != nil { logger.Debug("failed to update grantee list", "error", err) @@ -498,8 +498,8 @@ func (s *Service) actCreateGranteesHandler(w http.ResponseWriter, r *http.Reques } publisher := &s.publicKey - ls := loadsave.New(s.storer.Download(true), s.storer.Cache(), requestPipelineFactory(ctx, putter, false, redundancy.NONE), redundancy.DefaultLevel) - gls := loadsave.New(s.storer.Download(true), s.storer.Cache(), requestPipelineFactory(ctx, putter, granteeListEncrypt, redundancy.NONE), redundancy.DefaultLevel) + ls := loadsave.New(s.storer.Download(true), s.storer.Cache(), requestPipelineFactory(ctx, putter, false, redundancy.NONE), redundancy.PARANOID) + gls := loadsave.New(s.storer.Download(true), s.storer.Cache(), requestPipelineFactory(ctx, putter, granteeListEncrypt, redundancy.NONE), redundancy.PARANOID) granteeref, encryptedglref, historyref, actref, err := s.accesscontrol.UpdateHandler(ctx, ls, gls, swarm.ZeroAddress, historyAddress, publisher, list, nil) if err != nil { logger.Debug("failed to create grantee list", "error", err) diff --git a/pkg/api/bzz.go b/pkg/api/bzz.go index a27e3f813c5..7174af9dc2c 100644 --- a/pkg/api/bzz.go +++ b/pkg/api/bzz.go @@ -32,6 +32,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/log" "github.com/ethersphere/bee/v2/pkg/manifest" "github.com/ethersphere/bee/v2/pkg/postage" + "github.com/ethersphere/bee/v2/pkg/replicas" "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/storer" "github.com/ethersphere/bee/v2/pkg/swarm" @@ -60,6 +61,13 @@ func lookaheadBufferSize(size int64) int { return largeFileBufferSize } +func getRedundancyLevel(rLevel *redundancy.Level) redundancy.Level { + if rLevel != nil { + return *rLevel + } + return redundancy.PARANOID +} + func (s *Service) bzzUploadHandler(w http.ResponseWriter, r *http.Request) { span, logger, ctx := s.tracer.StartSpanFromContext(r.Context(), "post_bzz", s.logger.WithName("post_bzz").Build()) defer span.Finish() @@ -400,13 +408,11 @@ func (s *Service) serveReference(logger log.Logger, address swarm.Address, pathV cache = *headers.Cache } - rLevel := redundancy.DefaultLevel - if headers.RLevel != nil { - rLevel = *headers.RLevel - } + rLevel := getRedundancyLevel(headers.RLevel) ctx := r.Context() - ls := loadsave.NewReadonly(s.storer.Download(cache), s.storer.Cache(), redundancy.DefaultLevel) + g := s.storer.Download(cache) + ls := loadsave.NewReadonly(g, s.storer.Cache(), rLevel) feedDereferenced := false ctx, err := getter.SetConfigInContext(ctx, headers.Strategy, headers.FallbackMode, headers.ChunkRetrievalTimeout, logger) @@ -434,7 +440,7 @@ FETCH: // unmarshal as mantaray first and possibly resolve the feed, otherwise // go on normally. if !feedDereferenced { - if l, err := s.manifestFeed(ctx, m); err == nil { + if l, err := s.manifestFeed(ctx, m, replicas.NewSocGetter(g, rLevel)); err == nil { // we have a feed manifest here ch, cur, _, err := l.At(ctx, time.Now().Unix(), 0) if err != nil { @@ -622,10 +628,7 @@ func (s *Service) downloadHandler(logger log.Logger, w http.ResponseWriter, r *h jsonhttp.BadRequest(w, "could not parse headers") return } - rLevel := redundancy.DefaultLevel - if headers.RLevel != nil { - rLevel = *headers.RLevel - } + rLevel := getRedundancyLevel(headers.RLevel) var ( reader file.Joiner @@ -701,6 +704,7 @@ func manifestMetadataLoad( func (s *Service) manifestFeed( ctx context.Context, m manifest.Interface, + st storage.Getter, ) (feeds.Lookup, error) { e, err := m.Lookup(ctx, "/") if err != nil { @@ -733,5 +737,5 @@ func (s *Service) manifestFeed( return nil, fmt.Errorf("node lookup: %s", "feed metadata absent") } f := feeds.New(topic, common.BytesToAddress(owner)) - return s.feedFactory.NewLookup(*t, f) + return s.feedFactory.NewLookup(*t, f, feeds.WithGetter(st)) } diff --git a/pkg/api/bzz_test.go b/pkg/api/bzz_test.go index 7f636476d1f..74ba32cd0bf 100644 --- a/pkg/api/bzz_test.go +++ b/pkg/api/bzz_test.go @@ -7,6 +7,7 @@ package api_test import ( "bytes" "context" + "encoding/hex" "errors" "fmt" "io" @@ -19,6 +20,7 @@ import ( "testing" "github.com/ethersphere/bee/v2/pkg/api" + "github.com/ethersphere/bee/v2/pkg/feeds" "github.com/ethersphere/bee/v2/pkg/file/loadsave" "github.com/ethersphere/bee/v2/pkg/file/redundancy" "github.com/ethersphere/bee/v2/pkg/jsonhttp" @@ -27,7 +29,10 @@ import ( "github.com/ethersphere/bee/v2/pkg/manifest" mockbatchstore "github.com/ethersphere/bee/v2/pkg/postage/batchstore/mock" mockpost "github.com/ethersphere/bee/v2/pkg/postage/mock" + "github.com/ethersphere/bee/v2/pkg/replicas" + "github.com/ethersphere/bee/v2/pkg/soc" testingsoc "github.com/ethersphere/bee/v2/pkg/soc/testing" + "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/storage/inmemchunkstore" mockstorer "github.com/ethersphere/bee/v2/pkg/storer/mock" "github.com/ethersphere/bee/v2/pkg/swarm" @@ -806,6 +811,161 @@ func TestFeedIndirection(t *testing.T) { t.Fatalf("expected file reference, did not got any") } + // get root chunk of data + // and wrap it in a feed + rootCh, err := storer.ChunkStore().Get(context.Background(), resp.Reference) + if err != nil { + t.Fatal(err) + } + socRootCh := testingsoc.GenerateMockSOC(t, rootCh.Data()[swarm.SpanSize:]).Chunk() + + // now use the "content" to mock the feed lookup + // also, use the mocked mantaray chunks that unmarshal + // into a real manifest with the mocked feed values when + // called from the bzz endpoint. then call the bzz endpoint with + // the pregenerated feed root manifest hash + + t.Run("feed wrapping", func(t *testing.T) { + var ( + look = newMockLookup(-1, 0, socRootCh, nil, &id{}, nil) + factory = newMockFactory(look) + bzzDownloadResource = func(addr, path string) string { return "/bzz/" + addr + "/" + path } + ctx = context.Background() + ) + client, _, _, _ = newTestServer(t, testServerOptions{ + Storer: storer, + Logger: logger, + Feeds: factory, + }) + if err != nil { + t.Fatal(err) + } + m, err := manifest.NewDefaultManifest( + loadsave.New(storer.ChunkStore(), storer.Cache(), pipelineFactory(storer.Cache(), false, 0), redundancy.DefaultLevel), + false, + ) + if err != nil { + t.Fatal(err) + } + emptyAddr := make([]byte, 32) + err = m.Add(ctx, manifest.RootPath, manifest.NewEntry(swarm.NewAddress(emptyAddr), map[string]string{ + api.FeedMetadataEntryOwner: "8d3766440f0d7b949a5e32995d09619a7f86e632", + api.FeedMetadataEntryTopic: "abcc", + api.FeedMetadataEntryType: "epoch", + })) + if err != nil { + t.Fatal(err) + } + manifRef, err := m.Store(ctx) + if err != nil { + t.Fatal(err) + } + + jsonhttptest.Request(t, client, http.MethodGet, bzzDownloadResource(manifRef.String(), ""), http.StatusOK, + jsonhttptest.WithExpectedResponse(updateData), + jsonhttptest.WithExpectedContentLength(len(updateData)), + jsonhttptest.WithExpectedResponseHeader(api.AccessControlExposeHeaders, api.SwarmFeedIndexHeader), + jsonhttptest.WithExpectedResponseHeader(api.AccessControlExposeHeaders, api.ContentDispositionHeader), + jsonhttptest.WithExpectedResponseHeader(api.ContentDispositionHeader, `inline; filename="index.html"`), + jsonhttptest.WithExpectedResponseHeader(api.ContentTypeHeader, "text/html; charset=utf-8"), + ) + }) + + t.Run("redundancy", func(t *testing.T) { + // enough to test two redundancy levels since + tests := []struct { + name string + rLevel redundancy.Level + }{ + { + name: "none", + rLevel: redundancy.NONE, + }, + { + name: "medium", + rLevel: redundancy.MEDIUM, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + rLevel := tt.rLevel + socRoot, _ := soc.FromChunk(socRootCh) + socPutter := replicas.NewSocPutter(storer, rLevel) + err = socPutter.Put(context.Background(), socRootCh) + if err != nil { + t.Fatalf("failed to put SOC chunk with redundancy: %v", err) + } + + m, err := manifest.NewDefaultManifest( + loadsave.New(storer.ChunkStore(), storer.Cache(), pipelineFactory(storer.Cache(), false, rLevel), rLevel), + false, + ) + if err != nil { + t.Fatal(err) + } + + // Add the feed entry to the manifest + hexId := hex.EncodeToString(socRoot.ID()) + hexOwner := hex.EncodeToString(socRoot.OwnerAddress()) + err = m.Add(context.Background(), manifest.RootPath, manifest.NewEntry(socRootCh.Address(), map[string]string{ + api.FeedMetadataEntryOwner: hexOwner, + api.FeedMetadataEntryTopic: hexId, + api.FeedMetadataEntryType: "sequence", + })) + if err != nil { + t.Fatal(err) + } + manifestRef, err := m.Store(context.Background()) + if err != nil { + t.Fatal(err) + } + + // Create mockLookup and mockFactory for feed + look := newRedundancyMockLookup( + rLevel, + storer.ChunkStore(), + func() (swarm.Chunk, feeds.Index, feeds.Index) { + return socRootCh, &id{}, &id{} + }, + ) + feedFactory := newMockFactory(look) + + // Update the test server with the feed factory + client, _, _, _ := newTestServer(t, testServerOptions{ + Storer: storer, + Logger: log.Noop, + Post: mockpost.New(mockpost.WithAcceptAll()), + Feeds: feedFactory, + }) + + // remove original chunk from store + cs, ok := storer.ChunkStore().(storage.ChunkStore) + if !ok { + t.Fatalf("chunk store not available for deletion") + } + err = cs.Delete(context.Background(), socRootCh.Address()) + if err != nil { + t.Fatalf("Failed to delete soc chunk: %v", err) + } + + manifestHex := manifestRef.String() + + if rLevel == redundancy.NONE { + jsonhttptest.Request(t, client, http.MethodGet, "/bzz/"+manifestHex+"/", http.StatusNotFound) + return + } + jsonhttptest.Request(t, client, http.MethodGet, "/bzz/"+manifestHex+"/", http.StatusOK, + jsonhttptest.WithExpectedResponse(updateData), + jsonhttptest.WithExpectedContentLength(len(updateData)), + jsonhttptest.WithExpectedResponseHeader(api.AccessControlExposeHeaders, api.SwarmFeedIndexHeader), + jsonhttptest.WithExpectedResponseHeader(api.AccessControlExposeHeaders, api.ContentDispositionHeader), + jsonhttptest.WithExpectedResponseHeader(api.ContentTypeHeader, "text/html; charset=utf-8"), + jsonhttptest.WithExpectedResponseHeader(api.ContentDispositionHeader, `inline; filename="index.html"`), + ) + }) + } + }) + m, err := manifest.NewDefaultManifest( loadsave.New(storer.ChunkStore(), storer.Cache(), pipelineFactory(storer.Cache(), false, 0), redundancy.DefaultLevel), false, diff --git a/pkg/api/chunk.go b/pkg/api/chunk.go index a9ffcad5ae0..fba6ca6d29e 100644 --- a/pkg/api/chunk.go +++ b/pkg/api/chunk.go @@ -260,8 +260,8 @@ func (s *Service) chunkGetHandler(w http.ResponseWriter, r *http.Request) { loggerV1.Debug("chunk not found", "address", address) jsonhttp.NotFound(w, "chunk not found") return - } + logger.Debug("read chunk failed", "chunk_address", address, "error", err) logger.Error(nil, "read chunk failed") jsonhttp.InternalServerError(w, "read chunk failed") diff --git a/pkg/api/feed.go b/pkg/api/feed.go index 313b04c831a..ca77f2b6e23 100644 --- a/pkg/api/feed.go +++ b/pkg/api/feed.go @@ -23,6 +23,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/manifest/mantaray" "github.com/ethersphere/bee/v2/pkg/manifest/simple" "github.com/ethersphere/bee/v2/pkg/postage" + "github.com/ethersphere/bee/v2/pkg/replicas" "github.com/ethersphere/bee/v2/pkg/soc" "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/storer" @@ -66,15 +67,21 @@ func (s *Service) feedGetHandler(w http.ResponseWriter, r *http.Request) { } headers := struct { - OnlyRootChunk bool `map:"Swarm-Only-Root-Chunk"` + OnlyRootChunk bool `map:"Swarm-Only-Root-Chunk"` + RedundancyLevel redundancy.Level `map:"Swarm-Redundancy-Level"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) return } + getter := s.storer.Download(false) + if headers.RedundancyLevel > redundancy.NONE { + getter = replicas.NewSocGetter(getter, headers.RedundancyLevel) + } + f := feeds.New(paths.Topic, paths.Owner) - lookup, err := s.feedFactory.NewLookup(feeds.Sequence, f) + lookup, err := s.feedFactory.NewLookup(feeds.Sequence, f, feeds.WithGetter(getter)) if err != nil { logger.Debug("new lookup failed", "owner", paths.Owner, "error", err) logger.Error(nil, "new lookup failed") @@ -170,11 +177,12 @@ func (s *Service) feedPostHandler(w http.ResponseWriter, r *http.Request) { } headers := struct { - BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"` - Pin bool `map:"Swarm-Pin"` - Deferred *bool `map:"Swarm-Deferred-Upload"` - Act bool `map:"Swarm-Act"` - HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"` + BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"` + Pin bool `map:"Swarm-Pin"` + Deferred *bool `map:"Swarm-Deferred-Upload"` + Act bool `map:"Swarm-Act"` + HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"` + RedundancyLevel *redundancy.Level `map:"Swarm-Redundancy-Level"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) @@ -231,7 +239,9 @@ func (s *Service) feedPostHandler(w http.ResponseWriter, r *http.Request) { logger: logger, } - l := loadsave.New(s.storer.ChunkStore(), s.storer.Cache(), requestPipelineFactory(r.Context(), putter, false, 0), redundancy.DefaultLevel) + rLevel := getRedundancyLevel(headers.RedundancyLevel) + + l := loadsave.New(s.storer.ChunkStore(), s.storer.Cache(), requestPipelineFactory(r.Context(), putter, false, 0), rLevel) feedManifest, err := manifest.NewDefaultManifest(l, false) if err != nil { logger.Debug("create manifest failed", "error", err) diff --git a/pkg/api/feed_test.go b/pkg/api/feed_test.go index fcfe4b93e68..5782c4abd0c 100644 --- a/pkg/api/feed_test.go +++ b/pkg/api/feed_test.go @@ -18,6 +18,9 @@ import ( "strconv" "testing" + "github.com/ethersphere/bee/v2/pkg/replicas" + "github.com/ethersphere/bee/v2/pkg/storage" + "github.com/ethersphere/bee/v2/pkg/api" "github.com/ethersphere/bee/v2/pkg/feeds" "github.com/ethersphere/bee/v2/pkg/file/loadsave" @@ -345,6 +348,108 @@ func TestFeedDirectUpload(t *testing.T) { ) } +// redundancyMockLookup is a specialized mockLookup that uses redundancy SOC getter +type redundancyMockLookup struct { + redundancyLevel redundancy.Level + getter storage.Getter + lastChunkFn func() (swarm.Chunk, feeds.Index, feeds.Index) // Hook to get the latest chunk, index, nextIndex +} + +func newRedundancyMockLookup(rLevel redundancy.Level, getter storage.Getter, lastChunkFn func() (swarm.Chunk, feeds.Index, feeds.Index)) *redundancyMockLookup { + return &redundancyMockLookup{ + redundancyLevel: rLevel, + getter: getter, + lastChunkFn: lastChunkFn, + } +} + +// At overrides mockLookup.At to use redundancy SOC getter +func (l *redundancyMockLookup) At(ctx context.Context, at int64, after uint64) (swarm.Chunk, feeds.Index, feeds.Index, error) { + chunk, cur, next := l.lastChunkFn() + + // Create redundancy SOC getter if redundancy level is set + redGetter := replicas.NewSocGetter(l.getter, l.redundancyLevel) + + // Try to get the chunk with redundancy + redChunk, err := redGetter.Get(ctx, chunk.Address()) + if err != nil { + return nil, nil, nil, err + } + // Use the chunk retrieved with redundancy + return redChunk, cur, next, nil +} + +// TestFeedAPIWithRedundancy tests the feed API with SOC redundancy +func TestFeedAPIWithRedundancy(t *testing.T) { + t.Parallel() + + var ( + redundancyLevel = redundancy.PARANOID // Use highest redundancy level + topic = swarm.RandAddress(t) + mockStorer = mockstorer.New() + feedData = []byte("feed redundancy test data") + ) + socChunk := testingsoc.GenerateMockSOC(t, feedData) + + // Variables to track the last chunk, index, and next index + var ( + lastChunk swarm.Chunk + lastIndex feeds.Index + lastNext feeds.Index + ) + + // Provide a hook function to return the latest chunk, index, and next index + lastChunkFn := func() (swarm.Chunk, feeds.Index, feeds.Index) { + return lastChunk, lastIndex, lastNext + } + lastChunk = socChunk.Chunk() + lastIndex = &id{} + lastNext = &id{} + + // Create redundancy-aware lookup that wraps our lookup + redLookup := newRedundancyMockLookup(redundancyLevel, mockStorer.ChunkStore(), lastChunkFn) + factory := newMockFactory(redLookup) + + // Create test server with our custom setup + mp := mockpost.New(mockpost.WithIssuer(postage.NewStampIssuer("", "", batchOk, big.NewInt(3), 11, 10, 1000, true))) + client, _, _, _ := newTestServer(t, testServerOptions{ + Storer: mockStorer, + Post: mp, + Feeds: factory, + }) + + socPutter := replicas.NewSocPutter(mockStorer, redundancyLevel) + + ctx := context.Background() + err := socPutter.Put(ctx, socChunk.Chunk()) + if err != nil { + t.Fatalf("failed to put SOC chunk with redundancy: %v", err) + } + + // Get access to the underlying chunk store + cs, ok := mockStorer.ChunkStore().(storage.ChunkStore) + if !ok { + t.Fatal("Could not access underlying ChunkStore with Delete method") + } + + // Delete the original SOC chunk by using the address tracked in lastSOCAddress + // or use socChunk.Address() directly + err = cs.Delete(context.Background(), socChunk.Address()) + if err != nil { + t.Fatalf("Failed to delete original SOC chunk: %v", err) + } + + feedResource := fmt.Sprintf("/feeds/%s/%s", ownerString, topic) + + // Try to retrieve the feed content with redundancy + jsonhttptest.Request(t, client, http.MethodGet, + feedResource, + http.StatusOK, + jsonhttptest.WithRequestHeader(api.SwarmRedundancyLevelHeader, fmt.Sprintf("%d", redundancyLevel)), + jsonhttptest.WithExpectedResponse(feedData), + ) +} + type factoryMock struct { sequenceCalled bool epochCalled bool @@ -356,7 +461,7 @@ func newMockFactory(mockLookup feeds.Lookup) *factoryMock { return &factoryMock{lookup: mockLookup} } -func (f *factoryMock) NewLookup(t feeds.Type, feed *feeds.Feed) (feeds.Lookup, error) { +func (f *factoryMock) NewLookup(t feeds.Type, feed *feeds.Feed, _ ...feeds.FactoryOption) (feeds.Lookup, error) { switch t { case feeds.Sequence: f.sequenceCalled = true diff --git a/pkg/api/soc.go b/pkg/api/soc.go index 85d9bf5aaa3..b38263eb3dc 100644 --- a/pkg/api/soc.go +++ b/pkg/api/soc.go @@ -14,8 +14,10 @@ import ( "github.com/ethersphere/bee/v2/pkg/accesscontrol" "github.com/ethersphere/bee/v2/pkg/cac" + "github.com/ethersphere/bee/v2/pkg/file/redundancy" "github.com/ethersphere/bee/v2/pkg/jsonhttp" "github.com/ethersphere/bee/v2/pkg/postage" + "github.com/ethersphere/bee/v2/pkg/replicas" "github.com/ethersphere/bee/v2/pkg/soc" "github.com/ethersphere/bee/v2/pkg/storer" "github.com/ethersphere/bee/v2/pkg/swarm" @@ -47,10 +49,11 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) { } headers := struct { - BatchID []byte `map:"Swarm-Postage-Batch-Id"` - StampSig []byte `map:"Swarm-Postage-Stamp"` - Act bool `map:"Swarm-Act"` - HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"` + BatchID []byte `map:"Swarm-Postage-Batch-Id"` + StampSig []byte `map:"Swarm-Postage-Stamp"` + Act bool `map:"Swarm-Act"` + HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"` + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) @@ -64,11 +67,19 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) { } var ( - putter storer.PutterSession - err error + basePutter storer.PutterSession // the putter used to store regular chunks + putter storer.PutterSession // the putter used to store SOC replica chunks + err error ) + rLevel := getRedundancyLevel(headers.RLevel) + if len(headers.StampSig) != 0 { + if headers.RLevel != nil { + logger.Error(nil, "redundancy level is not supported with stamp signature") + jsonhttp.BadRequest(w, "redundancy level is not supported with stamp signature") + return + } stamp := postage.Stamp{} if err := stamp.UnmarshalBinary(headers.StampSig); err != nil { errorMsg := "Stamp deserialization failure" @@ -84,6 +95,7 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) { Pin: false, Deferred: false, }, &stamp) + basePutter = putter } else { putter, err = s.newStamperPutter(r.Context(), putterOptions{ BatchID: headers.BatchID, @@ -91,6 +103,10 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) { Pin: false, Deferred: false, }) + basePutter = putter + if rLevel != redundancy.NONE { + putter = replicas.NewSocPutterSession(putter, rLevel) + } } if err != nil { logger.Debug("get putter failed", "error", err) @@ -183,7 +199,7 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) { reference := sch.Address() historyReference := swarm.ZeroAddress if headers.Act { - reference, historyReference, err = s.actEncryptionHandler(r.Context(), putter, reference, headers.HistoryAddress) + reference, historyReference, err = s.actEncryptionHandler(r.Context(), basePutter, reference, headers.HistoryAddress) if err != nil { logger.Debug("access control upload failed", "error", err) logger.Error(nil, "access control upload failed") @@ -201,7 +217,12 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) { } } - err = putter.Done(sch.Address()) + // do not pass sch.Address() since it causes error on parallel GSOC uploads + // in case of deferred upload + // pkg/storer/internal/pinning/pinning.go:collectionPutter.Close -> throws error if pin true but that is not a valid use-case at SOC upload + // pkg/storer/internal/upload/uploadstore.go:uploadPutter.Close -> updates tagID, and the address would be set along with it -> not necessary + // in case of directupload it only waits for the waitgroup for chunk upload and do not use swarm address + err = putter.Done(swarm.Address{}) if err != nil { logger.Debug("done split failed", "error", err) logger.Error(nil, "done split failed") @@ -229,13 +250,16 @@ func (s *Service) socGetHandler(w http.ResponseWriter, r *http.Request) { } headers := struct { - OnlyRootChunk bool `map:"Swarm-Only-Root-Chunk"` + OnlyRootChunk bool `map:"Swarm-Only-Root-Chunk"` + RLevel *redundancy.Level `map:"Swarm-Redundancy-Level"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) return } + rLevel := getRedundancyLevel(headers.RLevel) + address, err := soc.CreateAddress(paths.ID, paths.Owner) if err != nil { logger.Error(err, "soc address cannot be created") @@ -244,6 +268,9 @@ func (s *Service) socGetHandler(w http.ResponseWriter, r *http.Request) { } getter := s.storer.Download(true) + if rLevel > redundancy.NONE { + getter = replicas.NewSocGetter(getter, rLevel) + } sch, err := getter.Get(r.Context(), address) if err != nil { logger.Error(err, "soc retrieval has been failed") diff --git a/pkg/api/soc_test.go b/pkg/api/soc_test.go index fb34eb82297..559b03d2ee7 100644 --- a/pkg/api/soc_test.go +++ b/pkg/api/soc_test.go @@ -22,6 +22,7 @@ import ( testingpostage "github.com/ethersphere/bee/v2/pkg/postage/testing" testingsoc "github.com/ethersphere/bee/v2/pkg/soc/testing" "github.com/ethersphere/bee/v2/pkg/spinlock" + "github.com/ethersphere/bee/v2/pkg/storage" mockstorer "github.com/ethersphere/bee/v2/pkg/storer/mock" "github.com/ethersphere/bee/v2/pkg/swarm" ) @@ -209,3 +210,79 @@ func TestSOC(t *testing.T) { }) }) } + +// Verify that replicas provide fault tolerance +func TestSOCWithRedundancy(t *testing.T) { + + testWithRedundancy := func(t *testing.T, redundancyLevel int) { + t.Helper() + + t.Run(fmt.Sprintf("redundancy=%d", redundancyLevel), func(t *testing.T) { + testData := fmt.Appendf(nil, "redundant-soc-data-%d", redundancyLevel) + + mockStorer := mockstorer.New() + client, _, _, chanStore := newTestServer(t, testServerOptions{ + Storer: mockStorer, + Post: newTestPostService(), + DirectUpload: true, + }) + + soc := testingsoc.GenerateMockSOC(t, testData) + + chanStore.Subscribe(func(ch swarm.Chunk) { + err := mockStorer.Put(context.Background(), ch) + if err != nil { + t.Fatal(err) + } + }) + + jsonhttptest.Request(t, client, http.MethodPost, + fmt.Sprintf("/soc/%s/%s?sig=%s", + hex.EncodeToString(soc.Owner), + hex.EncodeToString(soc.ID), + hex.EncodeToString(soc.Signature)), + http.StatusCreated, + jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, batchOkStr), + jsonhttptest.WithRequestHeader(api.SwarmRedundancyLevelHeader, fmt.Sprintf("%d", redundancyLevel)), + jsonhttptest.WithRequestBody(bytes.NewReader(soc.WrappedChunk.Data())), + jsonhttptest.WithExpectedJSONResponse(api.SocPostResponse{ + Reference: soc.Address(), + }), + ) + + // Wait for replicas to be created in background + time.Sleep(100 * time.Millisecond) + + originalAddress := soc.Address() + + // Delete the original chunk to trigger dispersed retrieval + cs, ok := mockStorer.ChunkStore().(storage.ChunkStore) + if !ok { + t.Fatal("Could not access underlying ChunkStore with Delete method") + } + + err := cs.Delete(context.Background(), originalAddress) + if err != nil { + t.Fatalf("Failed to delete the original chunk: %v", err) + } + + // Try to retrieve the SOC after deletion + if redundancyLevel > 0 { + jsonhttptest.Request(t, client, http.MethodGet, + fmt.Sprintf("/soc/%s/%s", hex.EncodeToString(soc.Owner), hex.EncodeToString(soc.ID)), + http.StatusOK, + jsonhttptest.WithExpectedResponse(soc.WrappedChunk.Data()[swarm.SpanSize:]), + jsonhttptest.WithExpectedContentLength(len(soc.WrappedChunk.Data()[swarm.SpanSize:])), + ) + } else { + jsonhttptest.Request(t, client, http.MethodGet, + fmt.Sprintf("/soc/%s/%s", hex.EncodeToString(soc.Owner), hex.EncodeToString(soc.ID)), + http.StatusNotFound, + ) + } + }) + } + + testWithRedundancy(t, 0) + testWithRedundancy(t, 2) +} diff --git a/pkg/feeds/factory/factory.go b/pkg/feeds/factory/factory.go index 1d555416407..b4d26d1590f 100644 --- a/pkg/feeds/factory/factory.go +++ b/pkg/feeds/factory/factory.go @@ -19,12 +19,18 @@ func New(getter storage.Getter) feeds.Factory { return &factory{getter} } -func (f *factory) NewLookup(t feeds.Type, feed *feeds.Feed) (feeds.Lookup, error) { +func (f *factory) NewLookup(t feeds.Type, feed *feeds.Feed, opts ...feeds.FactoryOption) (feeds.Lookup, error) { + cfg := &feeds.FactoryConfig{Getter: f.Getter} + + for _, opt := range opts { + opt(cfg) + } + switch t { case feeds.Sequence: - return sequence.NewAsyncFinder(f.Getter, feed), nil + return sequence.NewAsyncFinder(cfg.Getter, feed), nil case feeds.Epoch: - return epochs.NewAsyncFinder(f.Getter, feed), nil + return epochs.NewAsyncFinder(cfg.Getter, feed), nil } return nil, feeds.ErrFeedTypeNotFound diff --git a/pkg/feeds/feed.go b/pkg/feeds/feed.go index ac8d232f5ce..ca855c570be 100644 --- a/pkg/feeds/feed.go +++ b/pkg/feeds/feed.go @@ -25,7 +25,23 @@ var ErrFeedTypeNotFound = errors.New("no such feed type") // Factory creates feed lookups for different types of feeds. type Factory interface { - NewLookup(Type, *Feed) (Lookup, error) + NewLookup(Type, *Feed, ...FactoryOption) (Lookup, error) +} + +// FactoryConfig holds configuration for the feed factory +type FactoryConfig struct { + Getter storage.Getter +} + +// LookupOption defines the type for functional options +type FactoryOption func(*FactoryConfig) + +// WithGetter is a factory option to use a custom storage.Getter, overriding +// the default one provided to the factory constructor. +func WithGetter(getter storage.Getter) FactoryOption { + return func(c *FactoryConfig) { + c.Getter = getter + } } // Type enumerates the time-based feed types diff --git a/pkg/file/redundancy/level.go b/pkg/file/redundancy/level.go index 411da15ec98..6045f29be8c 100644 --- a/pkg/file/redundancy/level.go +++ b/pkg/file/redundancy/level.go @@ -164,5 +164,4 @@ func GetReplicaCounts() [5]int { // we use an approximation as the successive powers of 2 var replicaCounts = [5]int{0, 2, 4, 8, 16} -// DefaultLevel is the default redundancy level const DefaultLevel = PARANOID diff --git a/pkg/file/redundancy/redundancy_test.go b/pkg/file/redundancy/redundancy_test.go index e6ad81a16a7..f534969ae46 100644 --- a/pkg/file/redundancy/redundancy_test.go +++ b/pkg/file/redundancy/redundancy_test.go @@ -84,7 +84,7 @@ func TestEncode(t *testing.T) { redundancy.SetErasureEncoder(newMockEncoder) // test on the data level - for _, level := range []redundancy.Level{redundancy.MEDIUM, redundancy.STRONG, redundancy.INSANE, redundancy.PARANOID} { + for _, level := range []redundancy.Level{redundancy.MEDIUM, redundancy.STRONG, redundancy.INSANE, redundancy.DefaultLevel} { for _, encrypted := range []bool{false, true} { maxShards := level.GetMaxShards() if encrypted { diff --git a/pkg/replicas/combinator/combinator.go b/pkg/replicas/combinator/combinator.go new file mode 100644 index 00000000000..5c1253383a0 --- /dev/null +++ b/pkg/replicas/combinator/combinator.go @@ -0,0 +1,134 @@ +// Copyright 2025 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package combinator + +import ( + "iter" + "math/bits" + + "github.com/ethersphere/bee/v2/pkg/swarm" +) + +// IterateReplicaAddresses returns an iterator (iter.Seq) that yields bit +// combinations of an address, starting from replication level 1. The original +// address is not returned. This approach allows for memory-efficient iteration +// over a large set of combinations. The combination with the one flipped bit of +// the original address will be returned at the end. +// +// # Performance and Memory Considerations +// +// To ensure safe use of the yielded addresses, this function returns a new copy +// of the address on each iteration. This prevents accidental modification of +// previously yielded addresses. +// +// The iterator terminates if the replication level exceeds passed maxLevel or if +// the input data slice is not long enough for the bit manipulations required at +// the next replication level. +func IterateReplicaAddresses(addr swarm.Address, maxLevel int) iter.Seq[swarm.Address] { + // State variables for the iterator closure. + // A single buffer is used and mutated in each iteration, and a copy is yielded. + // It is initialized with a copy of the original address data. + currentSlice := append([]byte{}, addr.Bytes()...) + + var currentLevel int + var bytesNeeded int + // nextLevelIndex marks the combination index at which the replication level increases + // (e.g., 1, 2, 4, 8, ...). + nextLevelIndex := 1 + // prevCombinationIndex is used to calculate the bitwise difference for + // efficient state transitions. + var prevCombinationIndex int + + return func(yield func(swarm.Address) bool) { + // combinationIndex iterates through all possible combinations, but skip the original address. + for combinationIndex := 1; ; combinationIndex++ { + // When the combinationIndex reaches the next power of two, the replication level + // of bit combinations is increased for subsequent iterations. + if combinationIndex >= nextLevelIndex { + // The replication level is determined by the number of bits in the combinationIndex. + // combinationIndex=1 -> replication level=1 + // combinationIndex=2 -> replication level=2 + // combinationIndex=4 -> replication level=3 + // combinationIndex=8 -> replication level=4 + currentLevel = bits.Len(uint(combinationIndex)) + // Set the threshold for the next replication level increase. + // For replication level=1 (idx=1), next threshold is 2. + // For replication level=2 (idx=2,3), next threshold is 4. + // For replication level=3 (idx=4..7), next threshold is 8. + nextLevelIndex = 1 << currentLevel + + // Boundary checks are performed only when the replication level changes. + if currentLevel > maxLevel { + if maxLevel <= 0 { + // Do not return the bit flip address of replication level 0, + // because replication level 0 should have no replicas. Negative + // replication levels are invalid and should not return any + // replicas, as well. + return + } + // Create a new slice based on the original address. + originalAddrBytes := addr.Bytes() + flippedAddrBytes := make([]byte, len(originalAddrBytes)) + copy(flippedAddrBytes, originalAddrBytes) + + // Calculate the byte index for the bit to flip. + bitIndexToFlip := maxLevel + byteIndex := bitIndexToFlip / 8 + + // Ensure the flippedAddrBytes is long enough to flip this bit. + if len(flippedAddrBytes) <= byteIndex { + return // Cannot flip bit, slice is too short. + } + + // Flip the level bit in the new slice. + bitPositionInByte := 7 - (bitIndexToFlip % 8) + bitMask := byte(1 << bitPositionInByte) + flippedAddrBytes[byteIndex] ^= bitMask + + // Yield this modified address + if !yield(swarm.NewAddress(flippedAddrBytes)) { + return // Consumer-requested stop. + } + return // Iteration completed up to the defined maximum replication level. + } + + bytesNeeded = (currentLevel + 7) / 8 // Ceiling of integer division. + + if len(addr.Bytes()) < bytesNeeded { + // The data slice is too short for the current replication level. + return + } + } + + // The generation logic is optimized to flip only the bits that + // differ from the previous combination. For subsequent indices, + // the buffer is XORed with the difference between the current and + // previous combination indices. + bitsToFlip := combinationIndex ^ prevCombinationIndex + for bitIndex := 0; bitIndex < currentLevel; bitIndex++ { + // Check if the bit at bitIndex is set in the difference. + if (bitsToFlip>>bitIndex)&1 == 1 { + // If set, flip the corresponding bit in the buffer. + byteIndex := bitIndex / 8 + bitPositionInByte := 7 - (bitIndex % 8) + bitMask := byte(1 << bitPositionInByte) + currentSlice[byteIndex] ^= bitMask + } + } + prevCombinationIndex = combinationIndex // Update for the next iteration. + + // Yield a copy of the mutated slice. If yield returns false, the + // consumer has requested to stop the iteration. + if !yield(swarm.NewAddress(append([]byte(nil), currentSlice...))) { + return // Consumer-requested stop. + } + + // Check for integer overflow on the combinationIndex. + if combinationIndex < 0 { + return // Integer overflow; terminate iteration. + } + } + } +} diff --git a/pkg/replicas/combinator/combinator_test.go b/pkg/replicas/combinator/combinator_test.go new file mode 100644 index 00000000000..5c09c464e08 --- /dev/null +++ b/pkg/replicas/combinator/combinator_test.go @@ -0,0 +1,363 @@ +// Copyright 2025 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package combinator_test + +import ( + "testing" + + "github.com/ethersphere/bee/v2/pkg/replicas/combinator" + "github.com/ethersphere/bee/v2/pkg/swarm" +) + +const maxLevel = 8 + +func TestIterateReplicaAddressesSeq(t *testing.T) { + t.Run("iterate up to level 0", func(t *testing.T) { + input := swarm.NewAddress(make([]byte, swarm.HashSize)) + allCombinations := make(map[string]bool) + count := 0 + maxD := 0 + expectedCount := 0 // No addresses should be returned as level 0 represents no replication. + expected := map[string]bool{} // Not even the maxLevel-bit-flipped address. + + for combo := range combinator.IterateReplicaAddresses(input, maxD) { + comboHex := combo.String() + if allCombinations[comboHex] { + t.Errorf("Duplicate combination found at count %d: %s", count, comboHex) + } + allCombinations[comboHex] = true + count++ + } + + if count != expectedCount { + t.Fatalf("Expected to iterate %d times, got %d", expectedCount, count) + } + if len(allCombinations) != len(expected) { + t.Errorf("Mismatched map sizes. Expected %d, got %d", len(expected), len(allCombinations)) + } + for hexStr := range expected { + if !allCombinations[hexStr] { + t.Errorf("Expected combination %s not found in results", hexStr) + } + } + }) + + t.Run("iterate up to level 1", func(t *testing.T) { + input := swarm.NewAddress(make([]byte, swarm.HashSize)) + allCombinations := make(map[string]bool) + count := 0 + maxD := 1 + expectedCount := 1 << maxD // 2^1 = 2 items + expected := map[string]bool{ + swarm.NewAddress(append([]byte{0b10000000}, make([]byte, swarm.HashSize-1)...)).String(): true, // i=1 (level=1) + swarm.NewAddress(append([]byte{0b01000000}, make([]byte, swarm.HashSize-1)...)).String(): true, // 2nd bit flipped + } + + for combo := range combinator.IterateReplicaAddresses(input, maxD) { + comboHex := combo.String() + if allCombinations[comboHex] { + t.Errorf("Duplicate combination found at count %d: %s", count, comboHex) + } + allCombinations[comboHex] = true + count++ + } + + if count != expectedCount { + t.Fatalf("Expected to iterate %d times, got %d", expectedCount, count) + } + if len(allCombinations) != len(expected) { + t.Errorf("Mismatched map sizes. Expected %d, got %d", len(expected), len(allCombinations)) + } + for hexStr := range expected { + if !allCombinations[hexStr] { + t.Errorf("Expected combination %s not found in results", hexStr) + } + } + }) + + t.Run("iterate up to level 2", func(t *testing.T) { + input := swarm.NewAddress(make([]byte, swarm.HashSize)) + allCombinations := make(map[string]bool) + count := 0 + maxD := 2 + expectedCount := 1 << maxD // 2^2 = 4 items + expected := map[string]bool{ + swarm.NewAddress(append([]byte{0b10000000}, make([]byte, swarm.HashSize-1)...)).String(): true, // i=1 (level=1) + swarm.NewAddress(append([]byte{0b01000000}, make([]byte, swarm.HashSize-1)...)).String(): true, // i=2 (level=2) + swarm.NewAddress(append([]byte{0b11000000}, make([]byte, swarm.HashSize-1)...)).String(): true, // i=3 (level=2) + swarm.NewAddress(append([]byte{0b00100000}, make([]byte, swarm.HashSize-1)...)).String(): true, // 3rd bit flipped + } + + for combo := range combinator.IterateReplicaAddresses(input, maxD) { + comboHex := combo.String() + if allCombinations[comboHex] { + t.Errorf("Duplicate combination found at count %d: %s", count, comboHex) + } + allCombinations[comboHex] = true + count++ + } + + if count != expectedCount { + t.Fatalf("Expected to iterate %d times, got %d", expectedCount, count) + } + if len(allCombinations) != len(expected) { + t.Errorf("Mismatched map sizes. Expected %d, got %d", len(expected), len(allCombinations)) + } + for hexStr := range expected { + if !allCombinations[hexStr] { + t.Errorf("Expected combination %s not found in results", hexStr) + } + } + }) + + t.Run("Iterate up to level=3", func(t *testing.T) { + input := swarm.NewAddress(make([]byte, swarm.HashSize)) + allCombinations := make(map[string]bool) + count := 0 + maxD := 3 + expectedCount := 1 << maxD // 2^3 = 8 items + expected := map[string]bool{ + swarm.NewAddress(append([]byte{0b10000000}, make([]byte, swarm.HashSize-1)...)).String(): true, // i=1 (level=1) + swarm.NewAddress(append([]byte{0b01000000}, make([]byte, swarm.HashSize-1)...)).String(): true, // i=2 (level=2) + swarm.NewAddress(append([]byte{0b11000000}, make([]byte, swarm.HashSize-1)...)).String(): true, // i=3 (level=2) + swarm.NewAddress(append([]byte{0b00100000}, make([]byte, swarm.HashSize-1)...)).String(): true, // i=4 (level=3) + swarm.NewAddress(append([]byte{0b10100000}, make([]byte, swarm.HashSize-1)...)).String(): true, // i=5 (level=3) + swarm.NewAddress(append([]byte{0b01100000}, make([]byte, swarm.HashSize-1)...)).String(): true, // i=6 (level=3) + swarm.NewAddress(append([]byte{0b11100000}, make([]byte, swarm.HashSize-1)...)).String(): true, // i=7 (level=3) + swarm.NewAddress(append([]byte{0b00010000}, make([]byte, swarm.HashSize-1)...)).String(): true, // 4th bit flipped + } + + for combo := range combinator.IterateReplicaAddresses(input, maxD) { + comboHex := combo.String() + if allCombinations[comboHex] { + t.Errorf("Duplicate combination found at count %d: %s", count, comboHex) + } + allCombinations[comboHex] = true + count++ + } + + if count != expectedCount { + t.Fatalf("Expected to iterate %d times, got %d", expectedCount, count) + } + + // Check that the items we got are the ones we expected + if len(allCombinations) != len(expected) { + t.Errorf("Mismatched map sizes. Expected %d, got %d", len(expected), len(allCombinations)) + } + for hexStr := range expected { + if !allCombinations[hexStr] { + t.Errorf("Expected combination %s not found in results", hexStr) + } + } + }) + + t.Run("iterate up to level 4", func(t *testing.T) { + input := swarm.NewAddress(make([]byte, swarm.HashSize)) + allCombinations := make(map[string]bool) + count := 0 + maxD := 4 + expectedCount := 1 << maxD // 2^4 = 16 items + expected := map[string]bool{ + swarm.NewAddress(append([]byte{0b10000000}, make([]byte, swarm.HashSize-1)...)).String(): true, // i=1 (level=1) + swarm.NewAddress(append([]byte{0b01000000}, make([]byte, swarm.HashSize-1)...)).String(): true, // i=2 (level=2) + swarm.NewAddress(append([]byte{0b11000000}, make([]byte, swarm.HashSize-1)...)).String(): true, // i=3 (level=2) + swarm.NewAddress(append([]byte{0b00100000}, make([]byte, swarm.HashSize-1)...)).String(): true, // i=4 (level=3) + swarm.NewAddress(append([]byte{0b10100000}, make([]byte, swarm.HashSize-1)...)).String(): true, // i=5 (level=3) + swarm.NewAddress(append([]byte{0b01100000}, make([]byte, swarm.HashSize-1)...)).String(): true, // i=6 (level=3) + swarm.NewAddress(append([]byte{0b11100000}, make([]byte, swarm.HashSize-1)...)).String(): true, // i=7 (level=3) + swarm.NewAddress(append([]byte{0b00010000}, make([]byte, swarm.HashSize-1)...)).String(): true, // i=8 (level=4) + swarm.NewAddress(append([]byte{0b10010000}, make([]byte, swarm.HashSize-1)...)).String(): true, // i=9 (level=4) + swarm.NewAddress(append([]byte{0b01010000}, make([]byte, swarm.HashSize-1)...)).String(): true, // i=10 (level=4) + swarm.NewAddress(append([]byte{0b11010000}, make([]byte, swarm.HashSize-1)...)).String(): true, // i=11 (level=4) + swarm.NewAddress(append([]byte{0b00110000}, make([]byte, swarm.HashSize-1)...)).String(): true, // i=12 (level=4) + swarm.NewAddress(append([]byte{0b10110000}, make([]byte, swarm.HashSize-1)...)).String(): true, // i=13 (level=4) + swarm.NewAddress(append([]byte{0b01110000}, make([]byte, swarm.HashSize-1)...)).String(): true, // i=14 (level=4) + swarm.NewAddress(append([]byte{0b11110000}, make([]byte, swarm.HashSize-1)...)).String(): true, // i=15 (level=4) + swarm.NewAddress(append([]byte{0b00001000}, make([]byte, swarm.HashSize-1)...)).String(): true, // 5th bit flipped + } + + for combo := range combinator.IterateReplicaAddresses(input, maxD) { + comboHex := combo.String() + if allCombinations[comboHex] { + t.Errorf("Duplicate combination found at count %d: %s", count, comboHex) + } + allCombinations[comboHex] = true + count++ + } + + if count != expectedCount { + t.Fatalf("Expected to iterate %d times, got %d", expectedCount, count) + } + if len(allCombinations) != len(expected) { + t.Errorf("Mismatched map sizes. Expected %d, got %d", len(expected), len(allCombinations)) + } + for hexStr := range expected { + if !allCombinations[hexStr] { + t.Errorf("Expected combination %s not found in results", hexStr) + } + } + }) + + t.Run("maxLevel limits iteration", func(t *testing.T) { + input := swarm.NewAddress(make([]byte, swarm.HashSize)) + count := 0 + // maxLevel=2 should give 3 items (2^2-1 for levels 1, 2) + 1 for the maxLevel bit flipped address + expectedCount := 4 + + for range combinator.IterateReplicaAddresses(input, 2) { + count++ + } + + if count != expectedCount { + t.Errorf("Expected %d items for maxLevel=2, got %d", expectedCount, count) + } + }) + + t.Run("Iterator stops correctly at end of byte slice", func(t *testing.T) { + // 1 byte = 8 bits. + // Iterator should produce 2^8-1 = 255 items (for level=1 through level=8). + // The 257th item (i=256) would require level=9, + // which needs 2 bytes. The iterator should stop there. + input := swarm.NewAddress([]byte{0xDE}) // 1 byte + expectedCount := (1 << 8) - 1 // 255 + count := 0 + + allCombinations := make(map[string]bool) + + for combo := range combinator.IterateReplicaAddresses(input, maxLevel) { + // Just in case, prevent infinite loop in test + if count > expectedCount { + t.Fatalf("Iterator produced more than %d items, count=%d", expectedCount, count) + break + } + comboHex := combo.String() + if allCombinations[comboHex] { + t.Errorf("Duplicate combination found: %s", comboHex) + } + allCombinations[comboHex] = true + count++ + } + + if count != expectedCount { + t.Errorf("Expected exactly %d items for 1 byte, got %d", expectedCount, count) + } + }) + + t.Run("level=0 edge case (nil slice)", func(t *testing.T) { + // The iterator starts at i=1, which needs level=1, which needs 1 byte. + // A nil slice fails this. + // So, this should iterate *exactly zero times*. + var input swarm.Address + count := 0 + + for range combinator.IterateReplicaAddresses(input, maxLevel) { + count++ + } + + if count != 0 { + t.Fatalf("Expected exactly 0 items for nil slice, got %d", count) + } + }) + + t.Run("Consumer stops early (break)", func(t *testing.T) { + input := swarm.NewAddress(make([]byte, swarm.HashSize)) + count := 0 + stopAt := 5 + + seq := combinator.IterateReplicaAddresses(input, maxLevel) + for range seq { + count++ + if count == stopAt { + break + } + } + + if count != stopAt { + t.Errorf("Expected loop to run %d times, got %d", stopAt, count) + } + // This test just proves the 'break' is correctly handled + // by the iterator's `if !yield(newSlice)` check. + }) + + t.Run("iterate with negative level", func(t *testing.T) { + input := swarm.NewAddress(make([]byte, swarm.HashSize)) + count := 0 + maxD := -1 // Negative level + + for range combinator.IterateReplicaAddresses(input, maxD) { + count++ + } + + if count != 0 { + t.Fatalf("Expected to iterate 0 times for negative level, got %d", count) + } + }) +} + +var benchAddress = swarm.NewAddress(append([]byte{0xDE, 0xAD, 0xBE, 0xEF}, make([]byte, swarm.HashSize-4)...)) + +// runBenchmark is a helper to run the iterator for a fixed level. +func runBenchmark(b *testing.B, maxLevel int) { + b.Helper() + + // We run the loop b.N times, as required by the benchmark harness. + for b.Loop() { + // We use a volatile variable to ensure the loop body + // (the slice generation) isn't optimized away. + var volatileAddr swarm.Address + + seq := combinator.IterateReplicaAddresses(benchAddress, maxLevel) + for combo := range seq { + volatileAddr = combo + } + + // To prevent compiler optimizing out the loop if volatileAddr isn't used. + // This is a common pattern, though often `go:noinline` on a helper + // function or global assignment is also used. + if volatileAddr.IsZero() { + b.Error("volatileAddr should not be nil") + } + } +} + +// BenchmarkMaxLevel1 iterates over 2^1 = 2 items +func BenchmarkMaxLevel1(b *testing.B) { + runBenchmark(b, 1) +} + +// BenchmarkMaxLevel2 iterates over 2^2 = 4 items +func BenchmarkMaxLevel2(b *testing.B) { + runBenchmark(b, 2) +} + +// BenchmarkMaxLevel3 iterates over 2^3 = 8 items +func BenchmarkMaxLevel3(b *testing.B) { + runBenchmark(b, 3) +} + +// BenchmarkMaxLevel4 iterates over 2^4 = 16 items +func BenchmarkMaxLevel4(b *testing.B) { + runBenchmark(b, 4) +} + +// BenchmarkMaxLevel8 iterates over 2^8 = 256 items +func BenchmarkMaxLevel8(b *testing.B) { + runBenchmark(b, 8) +} + +// BenchmarkMaxLevel12 iterates over 2^12 = 4096 items +func BenchmarkMaxLevel12(b *testing.B) { + runBenchmark(b, 12) +} + +// BenchmarkMaxLevel16 iterates over 2^16 = 65536 items +func BenchmarkMaxLevel16(b *testing.B) { + runBenchmark(b, 16) +} + +// BenchmarkMaxLevel20 iterates over 2^20 = 1,048,576 items +func BenchmarkMaxLevel20(b *testing.B) { + runBenchmark(b, 20) +} diff --git a/pkg/replicas/export_test.go b/pkg/replicas/export_test.go index 271ad71ed0b..341d51e1517 100644 --- a/pkg/replicas/export_test.go +++ b/pkg/replicas/export_test.go @@ -4,12 +4,4 @@ package replicas -import "github.com/ethersphere/bee/v2/pkg/storage" - -var ( - Signer = signer -) - -func Wait(g storage.Getter) { - g.(*getter).wg.Wait() -} +var Signer = signer diff --git a/pkg/replicas/getter.go b/pkg/replicas/getter.go index b08b5c780e8..276bcbc6475 100644 --- a/pkg/replicas/getter.go +++ b/pkg/replicas/getter.go @@ -36,7 +36,6 @@ var ErrSwarmageddon = errors.New("swarmageddon has begun") // (by default, it is assumed to be 4, ie. total of 16) // - (not implemented) pivot: replicas with address in the proximity of pivot will be tried first type getter struct { - wg sync.WaitGroup storage.Getter level redundancy.Level } @@ -51,16 +50,19 @@ func (g *getter) Get(ctx context.Context, addr swarm.Address) (ch swarm.Chunk, e ctx, cancel := context.WithCancel(ctx) defer cancel() + var wg sync.WaitGroup + defer wg.Wait() + // channel that the results (retrieved chunks) are gathered to from concurrent // workers each fetching a replica resultC := make(chan swarm.Chunk) // errc collects the errors - errc := make(chan error, 17) + errc := make(chan error, g.level.GetReplicaCount()+1) var errs error errcnt := 0 // concurrently call to retrieve chunk using original CAC address - g.wg.Go(func() { + wg.Go(func() { ch, err := g.Getter.Get(ctx, addr) if err != nil { errc <- err @@ -83,8 +85,6 @@ func (g *getter) Get(ctx context.Context, addr swarm.Address) (ch swarm.Chunk, e var wait <-chan time.Time // nil channel to disable case // addresses used are doubling each period of search expansion // (at intervals of RetryInterval) - ticker := time.NewTicker(RetryInterval) - defer ticker.Stop() for level := uint8(0); level <= uint8(g.level); { select { // at least one chunk is retrieved, cancel the rest and return early @@ -101,7 +101,6 @@ func (g *getter) Get(ctx context.Context, addr swarm.Address) (ch swarm.Chunk, e // ticker switches on the address channel case <-wait: - wait = nil next = rr.c level++ target = 1 << level @@ -115,7 +114,7 @@ func (g *getter) Get(ctx context.Context, addr swarm.Address) (ch swarm.Chunk, e continue } - g.wg.Go(func() { + wg.Go(func() { ch, err := g.Getter.Get(ctx, swarm.NewAddress(so.addr)) if err != nil { errc <- err @@ -138,7 +137,7 @@ func (g *getter) Get(ctx context.Context, addr swarm.Address) (ch swarm.Chunk, e continue } next = nil - wait = ticker.C + wait = time.After(RetryInterval) } } diff --git a/pkg/replicas/getter_soc.go b/pkg/replicas/getter_soc.go new file mode 100644 index 00000000000..42ef87d0e20 --- /dev/null +++ b/pkg/replicas/getter_soc.go @@ -0,0 +1,136 @@ +// Copyright 2023 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package replicas + +import ( + "context" + "errors" + "fmt" + "sync" + + "github.com/ethersphere/bee/v2/pkg/file/redundancy" + "github.com/ethersphere/bee/v2/pkg/replicas/combinator" + "github.com/ethersphere/bee/v2/pkg/soc" + "github.com/ethersphere/bee/v2/pkg/storage" + "github.com/ethersphere/bee/v2/pkg/swarm" + "golang.org/x/sync/semaphore" +) + +// socGetter is the implementation of storage.Getter. This getter embeds the +// original simple chunk getter and extends it to a multiplexed variant that +// fetches chunks with replicas for SOC. +type socGetter struct { + storage.Getter + level redundancy.Level +} + +// NewSocGetter is the getter constructor. +func NewSocGetter(g storage.Getter, level redundancy.Level) storage.Getter { + return &socGetter{ + Getter: g, + level: min(level, maxRedundancyLevel), + } +} + +// Number of parallel replica get requests. +const socGetterConcurrency = 4 + +// Get makes the socGetter satisfy the storage.Getter interface +// It attempts to fetch the chunk by its original address first. +// If the original address does not return a result, +// it starts dispatching parallel requests for replicas +// until a chunk is found or all replicas are tried. +func (g *socGetter) Get(ctx context.Context, addr swarm.Address) (ch swarm.Chunk, err error) { + var ( + errs error + mu sync.Mutex + wg sync.WaitGroup + ) + + // First try to get the original chunk. + ch, err = g.Getter.Get(ctx, addr) + if err != nil { + errs = errors.Join(errs, fmt.Errorf("get chunk original address %v: %w", addr, err)) + } else { + return ch, nil + } + + // Try to retrieve replicas. + // Context for cancellation of replica fetching. + // Once a replica is found, this context is cancelled to stop further replica requests. + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // sem is used to limit the number of concurrent replica fetch operations. + sem := semaphore.NewWeighted(socGetterConcurrency) + replicaIter := combinator.IterateReplicaAddresses(addr, int(g.level)) + + // resultChan is used to send the first successfully fetched chunk back to the main goroutine. + resultChan := make(chan swarm.Chunk, 1) + // doneChan signals when all replica iteration and fetching attempts have concluded. + doneChan := make(chan struct{}) + + // This goroutine iterates through potential replica addresses and dispatches + // concurrent fetch operations, respecting the concurrency limit. + go func() { + defer close(doneChan) // Ensure doneChan is closed when all replica attempts are finished. + for replicaAddr := range replicaIter { + select { + case <-ctx.Done(): + // If the context is cancelled (e.g., a replica was found or parent context cancelled), + // stop dispatching new replica requests. + return + default: + } + + // Acquire a semaphore slot to limit concurrency. + if err := sem.Acquire(ctx, 1); err != nil { + // If context is cancelled while acquiring, stop. + return + } + + wg.Add(1) + // Each replica fetch is performed in its own goroutine. + go func(replicaAddr swarm.Address) { + defer sem.Release(1) // Release the semaphore slot when done. + defer wg.Done() // Decrement the WaitGroup counter. + + ch, err := g.Getter.Get(ctx, replicaAddr) + if err != nil { + mu.Lock() + errs = errors.Join(errs, fmt.Errorf("get chunk replica address %v: %w", replicaAddr, err)) + mu.Unlock() + return + } + + if !soc.Valid(swarm.NewChunk(addr, ch.Data())) { + return + } + + select { + case resultChan <- ch: + // If a chunk is successfully fetched and validated, send it to resultChan + // and cancel the context to stop other in-flight replica fetches. + cancel() + case <-ctx.Done(): + // If the context is already cancelled, it means another goroutine found a chunk, + // so this chunk is not needed. + } + }(replicaAddr) + } + wg.Wait() // Wait for all launched goroutines to complete. + }() + select { + case ch := <-resultChan: + return ch, nil + case <-doneChan: + if errs == nil { + return nil, storage.ErrNotFound + } + return nil, errs + case <-ctx.Done(): + return nil, ctx.Err() + } +} diff --git a/pkg/replicas/getter_soc_test.go b/pkg/replicas/getter_soc_test.go new file mode 100644 index 00000000000..5fb3e6acba0 --- /dev/null +++ b/pkg/replicas/getter_soc_test.go @@ -0,0 +1,434 @@ +// Copyright 2025 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package replicas_test + +import ( + "context" + "errors" + "sync" + "testing" + "testing/synctest" + "time" + + "github.com/ethersphere/bee/v2/pkg/file/redundancy" + "github.com/ethersphere/bee/v2/pkg/replicas" + "github.com/ethersphere/bee/v2/pkg/replicas/combinator" + "github.com/ethersphere/bee/v2/pkg/storage" + "github.com/ethersphere/bee/v2/pkg/storage/inmemchunkstore" + "github.com/ethersphere/bee/v2/pkg/swarm" +) + +func TestSocGetter(t *testing.T) { + t.Parallel() + + var ( + chunk = swarm.NewChunk(swarm.NewAddress(make([]byte, 32)), make([]byte, 32)) + chunkAddr = chunk.Address() + mock = &mockGetter{ + getter: inmemchunkstore.New(), + } + socPutter = replicas.NewSocPutter(mock.getter, redundancy.MEDIUM) + getter = replicas.NewSocGetter(mock, redundancy.MEDIUM) + ) + + t.Run("happy path", func(t *testing.T) { + if err := socPutter.Put(context.Background(), chunk); err != nil { + t.Fatal(err) + } + got, err := getter.Get(context.Background(), chunkAddr) + if err != nil { + t.Fatalf("got error %v", err) + } + if !got.Equal(chunk) { + t.Fatalf("got chunk %v, want %v", got, chunk) + } + }) + + t.Run("not found", func(t *testing.T) { + _, err := getter.Get(context.Background(), swarm.RandAddress(t)) + if !errors.Is(err, storage.ErrNotFound) { + t.Fatalf("got error %v, want %v", err, storage.ErrNotFound) + } + }) +} + +func TestSocGetter_ReplicaFound(t *testing.T) { + t.Parallel() + + var ( + chunk = swarm.NewChunk(swarm.NewAddress(make([]byte, 32)), make([]byte, 32)) + chunkAddr = chunk.Address() + mock = &mockGetter{ + getter: inmemchunkstore.New(), + } + socPutter = replicas.NewSocPutter(mock.getter, redundancy.MEDIUM) + getter = replicas.NewSocGetter(mock, redundancy.MEDIUM) + ) + + var replicaChunk swarm.Chunk + replicaIter := combinator.IterateReplicaAddresses(chunkAddr, int(redundancy.MEDIUM)) + for replicaAddr := range replicaIter { + replicaChunk = swarm.NewChunk(replicaAddr, chunk.Data()) + if err := socPutter.Put(context.Background(), replicaChunk); err != nil { + t.Fatal(err) + } + break + } + + got, err := getter.Get(context.Background(), chunkAddr) + if err != nil { + t.Fatalf("got error %v", err) + } + if !got.Equal(chunk) { + t.Fatalf("got chunk %v, want %v", got, chunk) + } +} + +func TestSocGetter_MultipleReplicasFound(t *testing.T) { + t.Parallel() + + var ( + chunk = swarm.NewChunk(swarm.NewAddress(make([]byte, 32)), make([]byte, 32)) + chunkAddr = chunk.Address() + mock = &mockGetter{ + getter: inmemchunkstore.New(), + } + socPutter = replicas.NewSocPutter(mock.getter, redundancy.MEDIUM) + getter = replicas.NewSocGetter(mock, redundancy.MEDIUM) + ) + + replicaIter := combinator.IterateReplicaAddresses(chunkAddr, int(redundancy.MEDIUM)) + var replicaChunk1, replicaChunk2 swarm.Chunk + i := 0 + for replicaAddr := range replicaIter { + if i == 0 { + replicaChunk1 = swarm.NewChunk(replicaAddr, chunk.Data()) + if err := socPutter.Put(context.Background(), replicaChunk1); err != nil { + t.Fatal(err) + } + } else { + replicaChunk2 = swarm.NewChunk(replicaAddr, chunk.Data()) + if err := socPutter.Put(context.Background(), replicaChunk2); err != nil { + t.Fatal(err) + } + break + } + i++ + } + + got, err := getter.Get(context.Background(), chunkAddr) + if err != nil { + t.Fatalf("got error %v", err) + } + + if !got.Equal(chunk) { + t.Fatalf("got unexpected chunk %v, want %v", got, chunk) + } +} + +func TestSocGetter_MaxRedundancyLevelLimit(t *testing.T) { + t.Parallel() + + var ( + chunkAddr = swarm.RandAddress(t) + mock = &countingGetter{ + getter: inmemchunkstore.New(), + } + // Initialize SocGetter with a redundancy level higher than maxRedundancyLevel (which is 4) + getter = replicas.NewSocGetter(mock, redundancy.Level(10)) + ) + + // maxRedundancyLevel is 4, so 2^4 = 16 replicas. Total expected calls: 1 (original) + 16 (replicas) = 17. + expectedCalls := 1 + (1 << 4) // 1 + 2^4 = 17 + + _, err := getter.Get(context.Background(), chunkAddr) + if err == nil { + t.Fatal("expected error, got nil") + } + + if mock.calls != expectedCalls { + t.Fatalf("expected %d Get calls, got %d", expectedCalls, mock.calls) + } +} + +func TestSocGetter_ContextCanceled(t *testing.T) { + t.Parallel() + + var ( + chunkAddr = swarm.RandAddress(t) + mock = &mockGetterWithDelay{ + getter: inmemchunkstore.New(), + } + getter = replicas.NewSocGetter(mock, redundancy.MEDIUM) + ) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + _, err := getter.Get(ctx, chunkAddr) + if !errors.Is(err, context.Canceled) { + t.Fatalf("got error %v, want %v", err, context.Canceled) + } +} + +func TestSocGetter_DeadlineExceeded(t *testing.T) { + t.Parallel() + + synctest.Test(t, func(t *testing.T) { + t.Helper() + + var ( + chunkAddr = swarm.RandAddress(t) + mock = &mockGetterWithDelay{ + getter: inmemchunkstore.New(), + getDelay: 100 * time.Millisecond, + } + getter = replicas.NewSocGetter(mock, redundancy.MEDIUM) + ) + + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + + _, err := getter.Get(ctx, chunkAddr) + if !errors.Is(err, context.DeadlineExceeded) { + t.Fatalf("got error %v, want %v", err, context.DeadlineExceeded) + } + }) +} + +func TestSocGetter_AllReplicasFail(t *testing.T) { + t.Parallel() + + var ( + chunkAddr = swarm.RandAddress(t) + mock = &mockGetter{ + getter: inmemchunkstore.New(), + err: errors.New("some error"), + } + getter = replicas.NewSocGetter(mock, redundancy.MEDIUM) + ) + + _, err := getter.Get(context.Background(), chunkAddr) + if err == nil { + t.Fatal("expected error, got nil") + } +} + +func TestSocGetter_PartialReplicaFailure(t *testing.T) { + t.Parallel() + + var ( + chunk = swarm.NewChunk(swarm.NewAddress(make([]byte, 32)), make([]byte, 32)) + chunkAddr = chunk.Address() + mock = &failingMockGetter{ + getter: inmemchunkstore.New(), + failAddrs: make(map[string]struct{}), + } + socPutter = replicas.NewSocPutter(mock.getter, redundancy.MEDIUM) + getter = replicas.NewSocGetter(mock, redundancy.MEDIUM) + ) + + replicaIter := combinator.IterateReplicaAddresses(chunkAddr, int(redundancy.MEDIUM)) + + i := 0 + var successChunk swarm.Chunk + for addr := range replicaIter { + switch i { + case 0: + // First replica will fail + mock.failAddrs[addr.String()] = struct{}{} + case 1: + // Second replica will succeed + successChunk = swarm.NewChunk(addr, chunk.Data()) + if err := socPutter.Put(context.Background(), successChunk); err != nil { + t.Fatal(err) + } + default: + // Make other replicas fail + mock.failAddrs[addr.String()] = struct{}{} + } + i++ + } + + got, err := getter.Get(context.Background(), chunkAddr) + if err != nil { + t.Fatalf("got error %v", err) + } + if !got.Equal(chunk) { + t.Fatalf("got chunk %v, want %v", got, chunk) + } +} + +func TestSocGetter_DifferentRedundancyLevel(t *testing.T) { + t.Parallel() + + testCases := []struct { + name string + uploadRedundancyLevel redundancy.Level + retrieveRedundancyLevel redundancy.Level + }{ + { + name: "upload PARANOID, retrieve MEDIUM", + uploadRedundancyLevel: redundancy.PARANOID, + retrieveRedundancyLevel: redundancy.MEDIUM, + }, + { + name: "upload PARANOID, retrieve STRONG", + uploadRedundancyLevel: redundancy.PARANOID, + retrieveRedundancyLevel: redundancy.STRONG, + }, + { + name: "upload STRONG, retrieve MEDIUM", + uploadRedundancyLevel: redundancy.STRONG, + retrieveRedundancyLevel: redundancy.MEDIUM, + }, + { + name: "upload MEDIUM, retrieve MEDIUM", + uploadRedundancyLevel: redundancy.MEDIUM, + retrieveRedundancyLevel: redundancy.MEDIUM, + }, + { + name: "upload INSANE, retrieve MEDIUM", + uploadRedundancyLevel: redundancy.INSANE, + retrieveRedundancyLevel: redundancy.MEDIUM, + }, + { + name: "upload INSANE, retrieve PARANOID", + uploadRedundancyLevel: redundancy.INSANE, + retrieveRedundancyLevel: redundancy.PARANOID, + }, + { + name: "upload NONE, retrieve MEDIUM", + uploadRedundancyLevel: redundancy.NONE, + retrieveRedundancyLevel: redundancy.MEDIUM, + }, + { + name: "upload MEDIUM, retrieve NONE", + uploadRedundancyLevel: redundancy.MEDIUM, + retrieveRedundancyLevel: redundancy.NONE, + }, + { + name: "upload MEDIUM, retrieve STRONG (should still find if replica exists)", + uploadRedundancyLevel: redundancy.MEDIUM, + retrieveRedundancyLevel: redundancy.STRONG, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + var ( + chunk = swarm.NewChunk(swarm.NewAddress(make([]byte, 32)), make([]byte, 32)) + chunkAddr = chunk.Address() + mock = &mockGetter{ + getter: inmemchunkstore.New(), + } + ) + + // Use socPutter to put the original chunk and its replicas + putter := replicas.NewSocPutter(mock.getter, tc.uploadRedundancyLevel) + err := putter.Put(context.Background(), chunk) + if err != nil { + t.Fatalf("socPutter.Put failed: %v", err) + } + + getter := replicas.NewSocGetter(mock, tc.retrieveRedundancyLevel) + + got, err := getter.Get(context.Background(), chunkAddr) + if err != nil { + t.Fatalf("got error %v", err) + } + if got == nil { + t.Fatal("expected a chunk, got nil") + } + + // Verify that the retrieved chunk is either the original or one of its replicas + found := false + if got.Equal(chunk) { + found = true + } else { + replicaIter := combinator.IterateReplicaAddresses(chunkAddr, int(tc.uploadRedundancyLevel)) + for replicaAddr := range replicaIter { + replicaChunk := swarm.NewChunk(replicaAddr, chunk.Data()) + if got.Equal(replicaChunk) { + found = true + break + } + } + } + + if !found { + t.Fatalf("retrieved chunk %v is neither the original nor any of its replicas", got) + } + }) + } +} + +type mockGetter struct { + getter storage.ChunkStore + err error +} + +func (m *mockGetter) Get(ctx context.Context, addr swarm.Address) (swarm.Chunk, error) { + if m.err != nil { + return nil, m.err + } + return m.getter.Get(ctx, addr) +} + +type failingMockGetter struct { + getter storage.ChunkStore + failAddrs map[string]struct{} + mu sync.Mutex +} + +func (m *failingMockGetter) Get(ctx context.Context, addr swarm.Address) (swarm.Chunk, error) { + m.mu.Lock() + defer m.mu.Unlock() + + if _, found := m.failAddrs[addr.String()]; found { + return nil, errors.New("failed to get chunk") + } + return m.getter.Get(ctx, addr) +} + +type mockGetterWithDelay struct { + getter storage.ChunkStore + err error + getDelay time.Duration +} + +func (m *mockGetterWithDelay) Get(ctx context.Context, addr swarm.Address) (swarm.Chunk, error) { + if m.getDelay > 0 { + time.Sleep(m.getDelay) + } + if m.err != nil { + return nil, m.err + } + return m.getter.Get(ctx, addr) +} + +// countingGetter is a mock storage.Getter that counts Get calls. + +type countingGetter struct { + getter storage.ChunkStore + + mu sync.Mutex + + calls int +} + +func (c *countingGetter) Get(ctx context.Context, addr swarm.Address) (swarm.Chunk, error) { + + c.mu.Lock() + + c.calls++ + + c.mu.Unlock() + + return c.getter.Get(ctx, addr) + +} diff --git a/pkg/replicas/getter_test.go b/pkg/replicas/getter_test.go index d1d727dd5fd..dcfdc0e068e 100644 --- a/pkg/replicas/getter_test.go +++ b/pkg/replicas/getter_test.go @@ -171,7 +171,6 @@ func TestGetter(t *testing.T) { }() } _, err := g.Get(ctx, ch.Address()) - replicas.Wait(g) cancel() // test the returned error @@ -242,7 +241,7 @@ func TestGetter(t *testing.T) { }) t.Run("dispersion", func(t *testing.T) { - if err := dispersed(redundancy.Level(tc.level), ch, addresses); err != nil { + if err := dispersed(redundancy.Level(tc.level), addresses); err != nil { t.Fatalf("addresses are not dispersed: %v", err) } }) diff --git a/pkg/replicas/putter_soc.go b/pkg/replicas/putter_soc.go new file mode 100644 index 00000000000..54126039028 --- /dev/null +++ b/pkg/replicas/putter_soc.go @@ -0,0 +1,80 @@ +// Copyright 2025 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// the code below implements the integration of dispersed replicas in SOC upload. +// using storer.PutterSession interface. +package replicas + +import ( + "context" + "errors" + "fmt" + + "github.com/ethersphere/bee/v2/pkg/file/redundancy" + "github.com/ethersphere/bee/v2/pkg/replicas/combinator" + "github.com/ethersphere/bee/v2/pkg/storage" + "github.com/ethersphere/bee/v2/pkg/storer" + "github.com/ethersphere/bee/v2/pkg/swarm" +) + +// maxRedundancyLevel ensures that no more than 2^4 = 16 replicas are generated +const maxRedundancyLevel = 4 + +// socPutter is the implementation of the public storage.Putter interface. +// socPutter extends the original putter to a concurrent multiputter. +type socPutter struct { + putter storage.Putter + level redundancy.Level +} + +// NewSocPutter is the putter constructor. +func NewSocPutter(p storage.Putter, level redundancy.Level) storage.Putter { + return &socPutter{ + putter: p, + level: min(level, maxRedundancyLevel), + } +} + +// Put makes the putter satisfy the storage.Putter interface. +func (p *socPutter) Put(ctx context.Context, ch swarm.Chunk) error { + if err := p.putter.Put(ctx, ch); err != nil { + return fmt.Errorf("put original chunk: %w", err) + } + + var errs error + + for replicaAddr := range combinator.IterateReplicaAddresses(ch.Address(), int(p.level)) { + ch := swarm.NewChunk(replicaAddr, ch.Data()) + + if err := p.putter.Put(ctx, ch); err != nil { + errs = errors.Join(errs, fmt.Errorf("put replica chunk %v: %w", ch.Address(), err)) + } + } + + return errs +} + +// socPutterSession extends the original socPutter. +type socPutterSession struct { + socPutter + ps storer.PutterSession +} + +// NewSocPutterSession is the putterSession constructor. +func NewSocPutterSession(p storer.PutterSession, rLevel redundancy.Level) storer.PutterSession { + return &socPutterSession{ + socPutter{ + putter: p, + level: rLevel, + }, p, + } +} + +func (p *socPutterSession) Cleanup() error { + return p.ps.Cleanup() +} + +func (p *socPutterSession) Done(addr swarm.Address) error { + return p.ps.Done(addr) +} diff --git a/pkg/replicas/putter_soc_test.go b/pkg/replicas/putter_soc_test.go new file mode 100644 index 00000000000..365597335e7 --- /dev/null +++ b/pkg/replicas/putter_soc_test.go @@ -0,0 +1,231 @@ +// Copyright 2025 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// This file was created as a copy of the original putter_test.go file +// and tailored to the socPutter implementation. + +package replicas_test + +import ( + "context" + "crypto/rand" + "errors" + "fmt" + "io" + "sync/atomic" + "testing" + "time" + + "github.com/ethersphere/bee/v2/pkg/cac" + "github.com/ethersphere/bee/v2/pkg/crypto" + "github.com/ethersphere/bee/v2/pkg/file/redundancy" + "github.com/ethersphere/bee/v2/pkg/replicas" + "github.com/ethersphere/bee/v2/pkg/soc" + "github.com/ethersphere/bee/v2/pkg/storage" + "github.com/ethersphere/bee/v2/pkg/storage/inmemchunkstore" + "github.com/ethersphere/bee/v2/pkg/swarm" +) + +type putterSession struct { + chunkStore storage.ChunkStore + getErrors func(context.Context, swarm.Address) error + putErrors func(context.Context, swarm.Address) error +} + +func (tbp *putterSession) Get(ctx context.Context, addr swarm.Address) (swarm.Chunk, error) { + + g := tbp.getErrors + if g != nil { + return nil, g(ctx, addr) + } + return tbp.chunkStore.Get(ctx, addr) +} + +func (p *putterSession) Put(ctx context.Context, ch swarm.Chunk) error { + g := p.putErrors + if g != nil { + return g(ctx, ch.Address()) + } + + return p.chunkStore.Put(ctx, ch) +} + +func (p *putterSession) Done(address swarm.Address) error { return nil } + +func (p *putterSession) Cleanup() error { return nil } + +func TestSocPutter(t *testing.T) { + t.Parallel() + + // test key to sign soc chunks + privKey, err := crypto.GenerateSecp256k1Key() + if err != nil { + t.Fatal(err) + } + signer := crypto.NewDefaultSigner(privKey) + + tcs := []struct { + level redundancy.Level + length int + }{ + {0, 1}, + {1, 1}, + {2, 1}, + {3, 1}, + {4, 1}, + {0, 4096}, + {1, 4096}, + {2, 4096}, + {3, 4096}, + {4, 4096}, + } + for _, tc := range tcs { + t.Run(fmt.Sprintf("redundancy:%d, size:%d", tc.level, tc.length), func(t *testing.T) { + buf := make([]byte, tc.length) + if _, err := io.ReadFull(rand.Reader, buf); err != nil { + t.Fatal(err) + } + ctx := context.Background() + ch, err := cac.New(buf) + if err != nil { + t.Fatal(err) + } + // create soc from cac + id := make([]byte, swarm.HashSize) + if _, err := rand.Read(id); err != nil { + t.Fatal(err) + } + s := soc.New(id, ch) + sch, err := s.Sign(signer) + if err != nil { + t.Fatal(err) + } + + store := inmemchunkstore.New() + defer store.Close() + session := &putterSession{chunkStore: store} + p := replicas.NewSocPutter(session, tc.level) + + if err := p.Put(ctx, sch); err != nil { + t.Fatalf("expected no error. got %v", err) + } + var addrs []swarm.Address + orig := false + _ = store.Iterate(ctx, func(chunk swarm.Chunk) (stop bool, err error) { + if sch.Address().Equal(chunk.Address()) { + orig = true + return false, nil + } + if !soc.Valid(chunk) { + t.Fatalf("chunk %v is not a valid SOC chunk", chunk.Address()) + } + addrs = append(addrs, chunk.Address()) + return false, nil + }) + if !orig { + t.Fatal("original chunk missing") + } + t.Run("dispersion", func(t *testing.T) { + if err := dispersed(tc.level, addrs); err != nil { + t.Fatalf("addresses are not dispersed: %v", err) + } + }) + t.Run("attempts", func(t *testing.T) { + count := tc.level.GetReplicaCount() + if len(addrs) != count { + t.Fatalf("incorrect number of attempts. want %v, got %v", count, len(addrs)) + } + }) + + t.Run("replication", func(t *testing.T) { + if err := replicated(store, ch, addrs); err != nil { + t.Fatalf("chunks are not replicas: %v", err) + } + }) + }) + } + t.Run("error handling", func(t *testing.T) { + tcs := []struct { + name string + level redundancy.Level + length int + f func(*putterSession) *putterSession + err []error + }{ + {"put errors", 4, 4096, func(tbp *putterSession) *putterSession { + var j int32 + i := &j + atomic.StoreInt32(i, 0) + tbp.putErrors = func(ctx context.Context, _ swarm.Address) error { + j := atomic.AddInt32(i, 1) + <-time.After(10 * time.Millisecond) + if j == 6 { + return errTestA + } + if j == 12 { + return errTestB + } + return nil + } + return tbp + }, []error{errTestA, errTestB}}, + {"put latencies", 4, 4096, func(tbp *putterSession) *putterSession { + var j int32 + i := &j + atomic.StoreInt32(i, 0) + tbp.putErrors = func(ctx context.Context, _ swarm.Address) error { + j := atomic.AddInt32(i, 1) + if j == 6 { + select { + case <-time.After(100 * time.Millisecond): + case <-ctx.Done(): + return ctx.Err() + } + } + if j == 12 { + return errTestA + } + return nil + } + return tbp + }, []error{errTestA, context.DeadlineExceeded}}, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + buf := make([]byte, tc.length) + if _, err := io.ReadFull(rand.Reader, buf); err != nil { + t.Fatal(err) + } + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, 50*time.Millisecond) + defer cancel() + ch, err := cac.New(buf) + if err != nil { + t.Fatal(err) + } + + id := make([]byte, swarm.HashSize) + if _, err := rand.Read(id); err != nil { + t.Fatal(err) + } + s := soc.New(id, ch) + sch, err := s.Sign(signer) + if err != nil { + t.Fatal(err) + } + + store := inmemchunkstore.New() + defer store.Close() + p := replicas.NewSocPutter(tc.f(&putterSession{chunkStore: store}), tc.level) + errs := p.Put(ctx, sch) + for _, err := range tc.err { + if !errors.Is(errs, err) { + t.Fatalf("incorrect error. want it to contain %v. got %v.", tc.err, errs) + } + } + }) + } + }) + +} diff --git a/pkg/replicas/putter_test.go b/pkg/replicas/putter_test.go index cee4960f5b0..a6f7d85b873 100644 --- a/pkg/replicas/putter_test.go +++ b/pkg/replicas/putter_test.go @@ -104,7 +104,7 @@ func TestPutter(t *testing.T) { t.Fatal("original chunk missing") } t.Run("dispersion", func(t *testing.T) { - if err := dispersed(tc.level, ch, addrs); err != nil { + if err := dispersed(tc.level, addrs); err != nil { t.Fatalf("addresses are not dispersed: %v", err) } }) diff --git a/pkg/replicas/replica_test.go b/pkg/replicas/replica_test.go index 3a253f24f5b..9e11cdfa102 100644 --- a/pkg/replicas/replica_test.go +++ b/pkg/replicas/replica_test.go @@ -17,7 +17,7 @@ import ( ) // dispersed verifies that a set of addresses are maximally dispersed without repetition -func dispersed(level redundancy.Level, ch swarm.Chunk, addrs []swarm.Address) error { +func dispersed(level redundancy.Level, addrs []swarm.Address) error { nhoods := make(map[byte]bool) for _, addr := range addrs { diff --git a/pkg/replicas/replicas.go b/pkg/replicas/replicas.go index cdeb93cc30e..20b3e00cc6d 100644 --- a/pkg/replicas/replicas.go +++ b/pkg/replicas/replicas.go @@ -30,7 +30,7 @@ type replicator struct { addr []byte // chunk address queue [16]*replica // to sort addresses according to di exist [30]bool // maps the 16 distinct nibbles on all levels - sizes [5]int // number of distinct neighnourhoods redcorded for each depth + sizes [5]int // number of distinct neighbourhoods recorded for each depth c chan *replica rLevel redundancy.Level } @@ -40,10 +40,12 @@ func newReplicator(addr swarm.Address, rLevel redundancy.Level) *replicator { rr := &replicator{ addr: addr.Bytes(), sizes: redundancy.GetReplicaCounts(), - c: make(chan *replica, 16), + c: make(chan *replica, rLevel.GetReplicaCount()), rLevel: rLevel, } + go rr.replicas() + return rr } diff --git a/pkg/soc/validator.go b/pkg/soc/validator.go index 06f2fb72e0a..0b478d5e71a 100644 --- a/pkg/soc/validator.go +++ b/pkg/soc/validator.go @@ -26,5 +26,16 @@ func Valid(ch swarm.Chunk) bool { if err != nil { return false } - return ch.Address().Equal(address) + + // if the address does not match the chunk address, check if it is a disperse replica + if !ch.Address().Equal(address) { + c := ch.Address().Bytes() + a := address.Bytes() + // For disperse replicas it is allowed to have the first 4 bits of the first + // byte to be different, and the last 4 bits must be equal. + // Another case is when only the fifth bit from the left is flipped. + return ((c[0]&0x0f == a[0]&0x0f) || (c[0]^a[0] == 1<<3)) && bytes.Equal(c[1:], a[1:]) + } + + return true } diff --git a/pkg/soc/validator_test.go b/pkg/soc/validator_test.go index 203e0b1bc84..c900da740e0 100644 --- a/pkg/soc/validator_test.go +++ b/pkg/soc/validator_test.go @@ -112,7 +112,7 @@ func TestInvalid(t *testing.T) { name: "wrong soc address", chunk: func() swarm.Chunk { wrongAddressBytes := socAddress.Clone().Bytes() - wrongAddressBytes[0] = 255 - wrongAddressBytes[0] + wrongAddressBytes[1] = 255 - wrongAddressBytes[1] wrongAddress := swarm.NewAddress(wrongAddressBytes) data := makeSocData() return swarm.NewChunk(wrongAddress, data) @@ -182,3 +182,75 @@ func TestInvalid(t *testing.T) { }) } } + +func TestValidDisperseReplicaAddress(t *testing.T) { + t.Parallel() + + privKey, err := crypto.GenerateSecp256k1Key() + if err != nil { + t.Fatal(err) + } + signer := crypto.NewDefaultSigner(privKey) + + payload := []byte("foo") + ch, err := cac.New(payload) + if err != nil { + t.Fatal(err) + } + + id := make([]byte, swarm.HashSize) + s := soc.New(id, ch) + + socCh, err := s.Sign(signer) + if err != nil { + t.Fatal(err) + } + + // original address + originalAddr := socCh.Address().Bytes() + + t.Run("last 4 bits equal", func(t *testing.T) { + // change first 4 bits of first byte + addr := make([]byte, len(originalAddr)) + copy(addr, originalAddr) + addr[0] = (addr[0] & 0x0f) | 0xf0 + + replica := swarm.NewChunk(swarm.NewAddress(addr), socCh.Data()) + if !soc.Valid(replica) { + t.Fatal("replica with last 4 bits equal should be valid") + } + }) + + t.Run("5th bit flipped", func(t *testing.T) { + addr := make([]byte, len(originalAddr)) + copy(addr, originalAddr) + addr[0] ^= 1 << 3 // flip 5th bit from the left + + replica := swarm.NewChunk(swarm.NewAddress(addr), socCh.Data()) + if !soc.Valid(replica) { + t.Fatal("replica with 5th bit flipped should be valid") + } + }) + + t.Run("invalid change", func(t *testing.T) { + addr := make([]byte, len(originalAddr)) + copy(addr, originalAddr) + addr[0]++ // change the first byte in a way that is not allowed + + replica := swarm.NewChunk(swarm.NewAddress(addr), socCh.Data()) + if soc.Valid(replica) { + t.Fatal("replica with invalid change should be invalid") + } + }) + + t.Run("invalid change - different last 4 bits", func(t *testing.T) { + addr := make([]byte, len(originalAddr)) + copy(addr, originalAddr) + addr[0] = (addr[0] & 0xf0) | ((addr[0] + 1) & 0x0f) + + replica := swarm.NewChunk(swarm.NewAddress(addr), socCh.Data()) + if soc.Valid(replica) { + t.Fatal("replica with different last 4 bits should be invalid") + } + }) +} diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 312364e2676..df575a83ce5 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -307,10 +307,7 @@ func IdentityAddress(chunk swarm.Chunk) (swarm.Address, error) { // check the chunk is single owner chunk or cac if sch, err := soc.FromChunk(chunk); err == nil { - socAddress, err := sch.Address() - if err != nil { - return swarm.ZeroAddress, err - } + socAddress := chunk.Address() // cannot use sch.Address() because of SOC replicas h := swarm.NewHasher() _, err = h.Write(socAddress.Bytes()) if err != nil {