Skip to content

Commit 1c3e1bb

Browse files
craig[bot]pav-kv
andcommitted
Merge #158411
158411: loqrecovery: use LogEngine in obvious places r=arulajmani a=pav-kv Part of #97627 Co-authored-by: Pavel Kalinnikov <pavel@cockroachlabs.com>
2 parents 5352394 + a9bcebe commit 1c3e1bb

File tree

6 files changed

+22
-18
lines changed

6 files changed

+22
-18
lines changed

pkg/kv/kvserver/loqrecovery/collect.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -180,21 +180,22 @@ func CollectStoresReplicaInfo(
180180

181181
func visitStoreReplicas(
182182
ctx context.Context,
183-
state, raft storage.Reader,
183+
stateRO kvstorage.StateRO,
184+
raftRO kvstorage.RaftRO,
184185
storeID roachpb.StoreID,
185186
nodeID roachpb.NodeID,
186187
send func(info loqrecoverypb.ReplicaInfo) error,
187188
) error {
188-
if err := kvstorage.IterateRangeDescriptorsFromDisk(ctx, state, func(desc roachpb.RangeDescriptor) error {
189+
if err := kvstorage.IterateRangeDescriptorsFromDisk(ctx, stateRO, func(desc roachpb.RangeDescriptor) error {
189190
rsl := kvstorage.MakeStateLoader(desc.RangeID)
190-
rstate, err := rsl.Load(ctx, state, &desc)
191+
rstate, err := rsl.Load(ctx, stateRO, &desc)
191192
if err != nil {
192193
return err
193194
}
194195
// TODO(pav-kv): the LoQ recovery flow uses only the applied index, and the
195196
// HardState.Commit loaded here is unused. Consider removing. Make sure this
196197
// doesn't break compatibility for ReplicaInfo unmarshalling.
197-
hstate, err := rsl.LoadHardState(ctx, raft)
198+
hstate, err := rsl.LoadHardState(ctx, raftRO)
198199
if err != nil {
199200
return err
200201
}
@@ -208,7 +209,7 @@ func visitStoreReplicas(
208209
// For the heuristics here, it would probably make sense to read from all
209210
// LogIDs with unapplied entries.
210211
rangeUpdates, err := GetDescriptorChangesFromRaftLog(
211-
ctx, desc.RangeID, rstate.RaftAppliedIndex+1, math.MaxInt64, raft)
212+
ctx, desc.RangeID, rstate.RaftAppliedIndex+1, math.MaxInt64, raftRO)
212213
if err != nil {
213214
return err
214215
}
@@ -234,10 +235,10 @@ func visitStoreReplicas(
234235
// lo (inclusive) and hi (exclusive) and searches for changes to range
235236
// descriptors, as identified by presence of a commit trigger.
236237
func GetDescriptorChangesFromRaftLog(
237-
ctx context.Context, rangeID roachpb.RangeID, lo, hi kvpb.RaftIndex, reader storage.Reader,
238+
ctx context.Context, rangeID roachpb.RangeID, lo, hi kvpb.RaftIndex, raftRO kvstorage.RaftRO,
238239
) ([]loqrecoverypb.DescriptorChangeInfo, error) {
239240
var changes []loqrecoverypb.DescriptorChangeInfo
240-
if err := raftlog.Visit(ctx, reader, rangeID, lo, hi, func(ent raftpb.Entry) error {
241+
if err := raftlog.Visit(ctx, raftRO, rangeID, lo, hi, func(ent raftpb.Entry) error {
241242
e, err := raftlog.NewEntry(ent)
242243
if err != nil {
243244
return err

pkg/kv/kvserver/loqrecovery/collect_raft_log_test.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1717
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
1818
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
19+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage"
1920
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery"
2021
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb"
2122
"github.com/cockroachdb/cockroach/pkg/roachpb"
@@ -72,9 +73,9 @@ func TestFindUpdateDescriptor(t *testing.T) {
7273

7374
return srk
7475
},
75-
func(t *testing.T, ctx context.Context, reader storage.Reader) {
76+
func(t *testing.T, ctx context.Context, raftRO kvstorage.RaftRO) {
7677
seq, err := loqrecovery.GetDescriptorChangesFromRaftLog(
77-
ctx, testRangeID, 0, math.MaxInt64, reader)
78+
ctx, testRangeID, 0, math.MaxInt64, raftRO)
7879
require.NoError(t, err, "failed to read raft log data")
7980

8081
requireContainsDescriptor(t, loqrecoverypb.DescriptorChangeInfo{
@@ -125,9 +126,9 @@ func TestFindUpdateRaft(t *testing.T) {
125126

126127
return srk
127128
},
128-
func(t *testing.T, ctx context.Context, reader storage.Reader) {
129+
func(t *testing.T, ctx context.Context, raftRO kvstorage.RaftRO) {
129130
seq, err := loqrecovery.GetDescriptorChangesFromRaftLog(
130-
ctx, sRD.RangeID, 0, math.MaxInt64, reader)
131+
ctx, sRD.RangeID, 0, math.MaxInt64, raftRO)
131132
require.NoError(t, err, "failed to read raft log data")
132133
requireContainsDescriptor(t, loqrecoverypb.DescriptorChangeInfo{
133134
ChangeType: loqrecoverypb.DescriptorChangeType_ReplicaChange,
@@ -142,13 +143,13 @@ func checkRaftLog(
142143
ctx context.Context,
143144
nodeToMonitor int,
144145
action func(ctx context.Context, tc *testcluster.TestCluster) roachpb.RKey,
145-
assertRaftLog func(*testing.T, context.Context, storage.Reader),
146+
assertRaftLog func(*testing.T, context.Context, kvstorage.RaftRO),
146147
leaseType roachpb.LeaseType,
147148
) {
148149
t.Helper()
149150

150151
makeSnapshot := make(chan storage.Engine, 2)
151-
snapshots := make(chan storage.Reader, 2)
152+
snapshots := make(chan kvstorage.RaftRO, 2)
152153

153154
raftFilter := func(args kvserverbase.ApplyFilterArgs) (int, *kvpb.Error) {
154155
t.Helper()
@@ -209,7 +210,7 @@ func checkRaftLog(
209210

210211
// TODO(sep-raft-log): the receiver doesn't use the engine, so remove this
211212
// altogether and use a `chan struct{}{}`.
212-
eng := tc.GetFirstStoreFromServer(t, nodeToMonitor).TODOEngine()
213+
eng := tc.GetFirstStoreFromServer(t, nodeToMonitor).LogEngine()
213214
makeSnapshot <- eng
214215
// After the test action is complete raft might be completely caught up with
215216
// its messages, so we will write a value into the range to ensure filter

pkg/kv/kvserver/loqrecovery/record.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
"github.com/cockroachdb/cockroach/pkg/keys"
1313
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
14+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage"
1415
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/loqrecovery/loqrecoverypb"
1516
"github.com/cockroachdb/cockroach/pkg/storage"
1617
"github.com/cockroachdb/cockroach/pkg/util/hlc"
@@ -64,7 +65,7 @@ func writeReplicaRecoveryStoreRecord(
6465
// recovery actions are properly reflected in server logs as needed.
6566
func RegisterOfflineRecoveryEvents(
6667
ctx context.Context,
67-
readWriter storage.ReadWriter,
68+
readWriter kvstorage.RaftRW,
6869
registerEvent func(context.Context, loqrecoverypb.ReplicaRecoveryRecord) (bool, error),
6970
) (int, error) {
7071
successCount := 0

pkg/kv/kvserver/loqrecovery/recovery_env_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -843,6 +843,7 @@ func (e *quorumRecoveryEnv) dumpRecoveryEvents(
843843
if !ok {
844844
t.Fatalf("store s%d doesn't exist, but event dump is requested for it", store)
845845
}
846+
// TODO(sep-raft-log): store.engine should be the log engine.
846847
if _, err := RegisterOfflineRecoveryEvents(ctx, store.engine, logEvents); err != nil {
847848
return "", err
848849
}

pkg/kv/kvserver/loqrecovery/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,7 @@ func (s Server) NodeStatus(
479479
status.PendingPlanID = &plan.PlanID
480480
}
481481
err = s.stores.VisitStores(func(s *kvserver.Store) error {
482-
r, ok, err := readNodeRecoveryStatusInfo(ctx, s.TODOEngine())
482+
r, ok, err := readNodeRecoveryStatusInfo(ctx, s.LogEngine())
483483
if err != nil {
484484
return err
485485
}

pkg/server/loss_of_quorum.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func logPendingLossOfQuorumRecoveryEvents(ctx context.Context, stores *kvserver.
6262
// cluster-replicated destinations.
6363
eventCount, err := loqrecovery.RegisterOfflineRecoveryEvents(
6464
ctx,
65-
s.TODOEngine(),
65+
s.LogEngine(),
6666
func(ctx context.Context, record loqrecoverypb.ReplicaRecoveryRecord) (bool, error) {
6767
event := record.AsStructuredLog()
6868
log.StructuredEvent(ctx, severity.INFO, &event)
@@ -94,7 +94,7 @@ func maybeRunLossOfQuorumRecoveryCleanup(
9494
if err := stores.VisitStores(func(s *kvserver.Store) error {
9595
_, err := loqrecovery.RegisterOfflineRecoveryEvents(
9696
ctx,
97-
s.TODOEngine(),
97+
s.LogEngine(),
9898
func(ctx context.Context, record loqrecoverypb.ReplicaRecoveryRecord) (bool, error) {
9999
sqlExec := func(ctx context.Context, stmt string, args ...interface{}) (int, error) {
100100
return ie.ExecEx(ctx, "", nil,

0 commit comments

Comments
 (0)