diff --git a/db/background_mgr.go b/db/background_mgr.go index 88e3c2143a..a2544afb4a 100644 --- a/db/background_mgr.go +++ b/db/background_mgr.go @@ -805,7 +805,7 @@ func (b *BackgroundManager[O]) startPollingMultiNodeStatus(ctx context.Context, err := b.updateMultiNodeClusterAwareStatus(ctx, backgroundManagerStatusUpdate) if err != nil { if errors.Is(err, errBackgroundManagerStatusNotRunning) { - b.stopProcess(ctx) + b.terminator.Close() return } else { base.DebugfCtx(ctx, base.KeyAll, "Failed to update multi node cluster aware status: %v, will retry", err) diff --git a/db/background_mgr_test.go b/db/background_mgr_test.go index 58898edc8f..fac6ef9ea2 100644 --- a/db/background_mgr_test.go +++ b/db/background_mgr_test.go @@ -47,6 +47,12 @@ func (m *MockProcess) Run(ctx context.Context, options map[string]any, persistCl ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() + if persistClusterStatusCallback != nil { + defer func() { + _ = persistClusterStatusCallback(ctx) + }() + } + for { select { case <-terminator.Done(): @@ -1388,3 +1394,85 @@ func TestBackgroundManagerResumePreservesPreviousStatus(t *testing.T) { require.NoError(t, mgr1.Stop(ctx)) require.NoError(t, mgr2.Stop(ctx)) } + +func TestBackgroundManagerMultiNodePollingAvoidsOverwrite(t *testing.T) { + testBucket := base.GetTestBucket(t) + ctx := base.TestCtx(t) + defer testBucket.Close(ctx) + metadataStore := testBucket.DefaultDataStore(ctx) + metaKeys := base.NewMetadataKeys("test-polling-overwrite") + + clusterOpts := &ClusterAwareBackgroundManagerOptions{ + metadataStore: metadataStore, + metaKeys: metaKeys, + processSuffix: "polling-overwrite", + multiNode: true, + } + + process1 := &MockProcess{} + mgr1 := &BackgroundManager[map[string]any]{ + name: "mgr1", + Process: process1, + clusterAwareOptions: clusterOpts, + terminator: base.NewSafeTerminator(), + } + + process2 := &MockProcess{} + mgr2 := &BackgroundManager[map[string]any]{ + name: "mgr2", + Process: process2, + clusterAwareOptions: clusterOpts, + terminator: base.NewSafeTerminator(), + } + + // Start both managers. Both should run. + require.NoError(t, mgr1.Start(ctx, nil)) + defer func() { _ = mgr1.Stop(ctx) }() + + require.NoError(t, mgr2.Start(ctx, nil)) + defer func() { _ = mgr2.Stop(ctx) }() + + RequireBackgroundManagerState(t, mgr1, BackgroundProcessStateRunning) + RequireBackgroundManagerState(t, mgr2, BackgroundProcessStateRunning) + + // Simulate mgr1 completing successfully by manually writing a completed status to the cluster status document. + docID := clusterOpts.StatusDocID() + _, err := metadataStore.Update(ctx, docID, 0, func(current []byte) ([]byte, *uint32, bool, error) { + var output map[string]json.RawMessage + if current != nil { + _ = base.JSONUnmarshal(current, &output) + } else { + output = make(map[string]json.RawMessage) + } + + status := BackgroundManagerStatus{ + State: BackgroundProcessStateCompleted, + StartTime: time.Now(), + } + statusBytes, err := base.JSONMarshal(status) + if err != nil { + return nil, nil, false, err + } + output["status"] = statusBytes + output["meta"] = json.RawMessage("null") + + outputBytes, err := base.JSONMarshal(output) + if err != nil { + return nil, nil, false, err + } + return outputBytes, nil, false, nil + }) + require.NoError(t, err) + + // Wait for mgr2's polling loop to detect the completed status and close its terminator. + require.Eventually(t, func() bool { + return mgr2.terminator.IsClosed() + }, 10*time.Second, 100*time.Millisecond, "expected mgr2 terminator to be closed after polling detects completed status") + + // Verify that the status in the bucket remains completed and is not overwritten by mgr2. + rawStatus, err := mgr2.GetStatus(ctx) + require.NoError(t, err) + var status BackgroundManagerStatus + require.NoError(t, base.JSONUnmarshal(rawStatus, &status)) + assert.Equal(t, BackgroundProcessStateCompleted, status.State, "expected the bucket status to remain completed and NOT be overwritten") +} diff --git a/rest/adminapitest/resync_test.go b/rest/adminapitest/resync_test.go index b802df001d..6dd15fcb9c 100644 --- a/rest/adminapitest/resync_test.go +++ b/rest/adminapitest/resync_test.go @@ -491,6 +491,13 @@ func TestResyncPartitionsMaximumValidation(t *testing.T) { // The test then validates that the resync completes successfully on both nodes, and logs the number of documents processed/changed on each node to verify that work is being distributed. // Usage notes: even with 100K docs, the test may intermittently complete resync on just rt1. Putting a breakpoint in ResyncManagerDCP.Run and pausing for a few seconds // is typically sufficient to ensure rebalance happens without further increasing the number of docs. + +// Note: when the second rt comes online, all DCP traffic will be directed to the second rt, because of the following cbgt interaction: +// 1. When a new node/rt is added, a new index definition and new set of feeds are created, and the previous ones closed +// 2. The mapping from a feed to a dest (destKey) is stored in the index params, so only a single destKey can be active at a time for a cbgt index +// 3. SGW's lookup for dest from destKey isn't db-scoped, so rt2's dest will be used for all the new feeds. +// This still allows testing of overall feed close and status handling, but doesn't do a good job of testing concurrent updates to resync status from two nodes. + func TestDistributedResync(t *testing.T) { t.Skip("Long-running dev-time test used to test multi-node cbgt rebalance during resync")