Skip to content

Support for Parallel Replication#1556

Open
vazois wants to merge 370 commits intodevfrom
vazois/mmrt-dev
Open

Support for Parallel Replication#1556
vazois wants to merge 370 commits intodevfrom
vazois/mmrt-dev

Conversation

@vazois
Copy link
Contributor

@vazois vazois commented Feb 11, 2026

Multi-Log Parallel Replication Feature

This PR introduces multi-log based Append-Only File (AOF) support to Garnet, enhancing write throughput and enabling optimized parallel replication replay. The feature leverages multiple physical TsavoriteLog instances to shard write operations and parallelize log scanning, shipping, and replay across multiple connections and iterators. While designed primarily for cluster mode replication, this feature can also be used in standalone mode to improve performance when AOF is enabled.

Feature Requirements

1. Sharded AOF Architecture

  • Improves AOF write-throughput through key-based sharding across distinct physical TsavoriteLog instances.
  • Accelerates replica synchronization through parallel log scanning and shipping across the network.
  • Full backward compatibility with existing single-log deployments

2. Flexible Parallel Replay with Tunable Task Granularity

  • Introduces virtual sublog abstraction to allow for parallel replay within a given physical sublog.
  • Minimizes inter-task coordination to maximize parallel execution efficiency

3. Read Consistency Protocol

  • Per session prefix consistency through the use of timestamp-based sequence numbers.
  • Sketch based key-level replay status tracking for efficient and lightweight freshness validation.
  • Version-based prefix-consistency across replica reconfiguration operations.
  • Ensures monotonically increasing sequence numbers across failovers through offset tracking during replica promotion.

4. Transaction Support

  • Coordinates multi-exec transactions across sublogs to maintain ACID properties during parallel replay.
  • Preserves consistent commit ordering per session through timestamp-based sequence numbers.

5. Fast Prefix-Consistent Recovery

  • Multi-sublog prefix-consistent recovery within the persisted commit boundaries.
  • Intra-page parallelism during recovery using multiple replay tasks.

Newly Introduced Configuration Parameters

Parameter Purpose
AofPhysicalSublogCount Number of physical TsavoriteLog instances
AofReplayTaskCount Replay tasks per physical sublog at replica
AofRefreshPhysicalSublogTailFrequencyMs Background task frequency for advancing idle sublog timestamps

Implementation Plan

Phase 1: Core Infrastructure

  • 1.1 Implement AofHeader extensions to eliminate single log overhead.

    • ShardedHeader for standalone operations.
    • TransactionHeader for coordinated operations.
  • 1.2 Implement GarnetLog abstraction layer.

    • SingleLog wrapper for legacy single log.
    • ShardedLog implementation for multi-log.
  • 1.3 SequenceNumberGenerator class.

    • Generate monotonically increasing sequence number using timestamps.
    • Ensure monotonicity at failover and recovery by using starting offset.

Phase 2: Primary Replication Stream

  • 2.1 AofSyncDriver class.

    • Single instance AofSyncDriver per attached replica.
    • Multiple instances of AofSyncTask per physical sublog.
    • Use dedicated AdvanceTime background task per attached replica.
  • 2.2 AofSyncTask class.

    • Independent log iterators per sublog
    • Network page shipping per sublog
    • Error handling and connection teardown
  • 2.3 AdvanceTime background task.

    • Primary monitors log changes by comparing last know tail address to the current tail address.
    • Primary associates the current tail address snapshot with a sequence number (timestamp) that is strictly larger than all sequence numbers assigned until that moment and notifies the replica.
    • Replica maintains an advance time background task that updates sublog time using the information from the primary's signal.
    • Primary advances last known tail address to the observed tail address.
    • The system reaches equilibrium when writes are quiesced and not more signals are send unless a new change is detected.

Phase 3: Replica Replay Stream

  • 3.1 ReplicaReplayDriver class.

    • Per-physical-sublog enqueue, scan and replay coordination
    • Manages ReplicaReplayTask for parallel replay within a single physical sublog.
  • 3.2 ReplicaReplayTask class.

    • Record filtering by task affinity.
    • Coordinated update of virtual sublog replay state to enable read prefix consistency.
  • 3.3 Standalone operation replay

    • Each operation executes within its appropriate context (BasicContext or TransactionalContext).
    • The virtual sublog replay state is updated prior to replay to maintain prefix consistency for read operations.
  • 3.4 Multi-exec transaction replay

    • Transaction operations are distributed across replay tasks based on key affinity.
    • Upon encountering the TxnCommit marker, each participating task acquires exclusive locks for its assigned keys.
    • The associated virtual sublog replay state gets updated following the standalone operation replay.
    • All participating tasks synchronize at a barrier before commit, which releases locks and makes results visible.
    • The commit marker advances time prior to execution, ensuring timestamp consistency while locks are still held.
  • 3.5 Custom transaction procedure replay

    • Similar to multi-exec transaction with the exception of having a single thread execute the custom procedure.
    • Virtual sublog replay state gets updated prior to lock acquisition.
    • Exclusive lock acquisition ensures that transaction partial results are not exposed to readers.

Phase 4: Read Consistency Protocol

  • 4.1 ReadConsistencyManager class

    • VirtualSublogReplayState struct using sketch arrays for key freshness tracking and sequence number frontier computation.
    • Provides APIs for updating sequence numbers at key or virtual sublog granularity.
    • Tracks version to maintain prefix consistency during replica reconfiguration events.
  • 4.2 Session based prefix consistency enforcement

    • Implement ConsistentReadGarnetApi and TransactionalConsistentReadGarnetApi to allow the jitter to optimize operational calls.
    • Define callbacks to enforce consistent read protocol (e.g. ValidateKeySequenceNumber, UpdateKeySequenceNumber).
    • Session level ReplicaReadSessionContext struct used to maximumSessionSequenceNumber metadata (i.e. sessionVersion, lastHash, lastVirtualSublogIdx) to enforce prefix consistency when is stable or during recovery

Phase 6: Prefix consistent recovery

  • 5.1 Commit operation

    • Occurs in unison across alls sublogs. AutoCommit disabled and triggered at the GarnetLog layer instead of within TsavoriteLog to control across sublogs commit.
    • Commit adds cookie tracking the timestamp value of when commit occurred to enforce prefix consistent recovery.
  • 5.2 RecoverLogDriver implementation

    • Independent iterators with shared bounds.
    • Record filtering by sequenceNumber < untilSequenceNumber.
    • Build ReadConsistencyManager state at recovery to initialize SequenceNumberGenerator.
    • Allow intra-page parallel recovery using scan, BulkConsume interface.

Phase 6: Testing & Validation

  • 6.1 Replication base tests passing with multi-log enabled
  • 6.2 Replication diskless sync tests passing with multi-log enabled

NOTES

Prefix Consistent Single Key Read Protocol

  • Each session tracks the maximum observed sequence number $T_{ms}$ and only proceeds when the key frontier $T_k$ (max of key and sublog sequence numbers) exceeds that value, guaranteeing visibility of earlier writes.
  • After the read, refresh $T_ms$ with the key's latest sequence number; timestamps are strictly increasing, so doing this post-read remains safe even though freshness validation occurred beforehand, and boundary reads never slip through.

Prefix Consistent Batch Read

  • For every key $K_i$ in the batch, ensure $T_{ms} &lt; T_{k_i}$, then compute $T_max = max(T_{k_1}..T_{k_n})$ before issuing the batched read.
  • Once the batch returns, verify each key still satisfies $T_{k_i} \leq T_max$; if any key advanced beyond $T_max$, redo the batch since a concurrent update happened. Because freshness gating blocks boundary reads, caching just $T_max$ is sufficient to detect drift.

TODO

  • Ensure transaction replay releases locks in the event of an exception
  • Add timestamp tracking at primary per physical sublog.
  • Ensure timestamp tracking is consistent with recovery.
  • Ensure commit recovery does not recover on boundaries.
  • Failed Garnet.test.cluster.ClusterReplicationAsyncReplay.ClusterReplicationManualCheckpointing [CI]
  • Failed Garnet.test.cluster.ClusterReplicationTLS.ClusterSRNoCheckpointRestartSecondary(False,False)[CI]
  • Failed Garnet.test.RespAdminCommandsTests.SeSaveRecoverMultipleKeysTest("63k","15k")[CI]
  • Failed Garnet.test.cluster.ClusterMigrateTests(False).ClusterMigrateWrite[CI]
  • Failed Garnet.test.cluster.ClusterReplicationShardedLog.ClusterReplicationShardedLogRecover[CI]
  • Validate special case where maximumSessionSequenceNumber is 0 and FrontierSequenceNumber is also 0.
  • ClusterResetHardDuringDisklessReplicationAttach [CI]
  • ClusterReplicationCheckpointCleanupTest [CI]

TedHartMS and others added 30 commits May 5, 2025 20:15
… for SpanByteAllocator. The AddressType change is a breaking on-disk format change: it shuffles bits around in RecordInfo to add an additional bit adjacent to the old ReadCache bit to mark an address as:

- 00: Reserved
- 11: ReadCache
- 10: InMemory portion of the main log
- 01: On-Disk
* wip

* wip

* wip

* Added unified store session

* Correcting generic typing

* Added MEMORY USAGE + TYPE to unified ops

* Added TTL, EXPIRETIME and EXISTS to unified ops

* implemented DEL in unified ops

* wip - expire & persist (broken)

* wip - adding expire to unified ops

* wip - expire

* add cref to server-side replication inter-node commands

* fix server-side BeginRecoverReplica

* wip

* Fix transaction key locking

* format

* Some test fixes

* Fixing tests

* reverting a couple of unnecessary changes

* Eliminating more multi-context methods from API

* Removed some unnecessary stuff

* Some more cleanup to TransactionManager

* merge tedhar/storage-v2 (ObjectAllocator serialization updates)

* Updating memory usage values

* format

* Handling wrong type ops

* Revert "Updating memory usage values"

This reverts commit 88ba307.

* fix no-object-log case

* Fixes for Tsavorite UTs

* Fixes for Tsavorite UTs (mostly ReadCache, TsavoriteLog, Compaction)

* Tsavorite Iterator work and UT re-enabling

* Fixes to Object iteration, LogRecord.ToString()

* Add RecordMetadata.ETag

* Readding --no-obj config

* fix

* test fix

* Prep for Recovery

* wip

* wip

* fix

* More fixes for UT (mostly Recovery, Migration)

* Moving DELIFEXPIM to unified store

* ObjectLogTail in Recovery, and more UT fixes (Migration record serialization, ReadCache size and tailAddress verification calculations, etc.)

* More Tsavorite recovery tests

* Removing unnecessary isObject flag from record serialization

---------

Co-authored-by: Vasileios Zois <vazois@microsoft.com>
Co-authored-by: TedHartMS <15467143+TedHartMS@users.noreply.github.com>
… correctly initialized SegmentSize for ObjectLogDevice; other fixes to UTs to work with UnifiedStore
@vazois vazois requested a review from Copilot March 3, 2026 19:05
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 165 out of 169 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@vazois
Copy link
Contributor Author

vazois commented Mar 5, 2026

@copilot, investigate why ClusterMigrateWrite would be failing
https://github.com/microsoft/garnet/actions/runs/22695023356/job/65799614188#step:7:712

Copy link
Contributor

Copilot AI commented Mar 5, 2026

@vazois I've opened a new pull request, #1609, to work on those changes. Once the pull request is ready, I'll request review from you.

@vazois vazois requested review from TedHartMS and badrishc March 5, 2026 19:07
vazois and others added 6 commits March 10, 2026 15:57
…k-based replica sync (#1633)

In TryReplicateDiskbasedSync, ExecuteClusterInitiateReplicaSync was
sending beginAddress.Span for both the aofBeginAddress and aofTailAddress
parameters. This was introduced in commit 6fb99e5 when converting from
ToByteArray() to Span-based calls.

The primary uses the replica's tail address to compute the AOF sync replay
range. With both parameters being the begin address, the primary couldn't
determine where the replica's AOF actually ended, causing the replica to
never receive AOF records and remain stuck at offset 64 (kFirstValidAofAddress).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants