-
Notifications
You must be signed in to change notification settings - Fork 285
Description
Original thread: https://discord.com/channels/1314801574169673738/1443136035847405639
We want to support processing a single shard out of N for sources. Users can configure a shard number N and the current shard ID. CocoIndex divides all source keys evenly into N shards (by running a deterministic hash function % N), and only picks source rows in the current shard for processing. It'll be helpful for both distributed processing (run CocoIndex on multiple workers) and sampling (only run CocoIndex on a subset of source rows).
Note that the row tracking table (in CocoIndex's internal storage) is NOT sharded. We still only have a single tracking table for each flow. Only processing is sharded - which is the stateless part, so resharding will be trivial.
Implementation plan:
-
Add additional fields
num_shardsandcurrent_shard_idintoGlobalExecutionOptions. The corresponding type on Python side needs to be updated too. -
Wire the new fields all the way down to the
SourceIndexingContext, and skip processing a row if not in the current shard, in the following loops:- In
SourceIndexingContext::load()when loading rows from the tracking table - In
SourceIndexingContext::update_with_stream()when traversing row keys from the source.
To decide if a row is within the current shard, compute a fingerprint on the row key and mod
num_shards(since it's a[u8; 16]which has 128-bit, we can convert the first 64-bit to ani64and modnum_shardswith it). - In
❤️ Contributors, please refer to 📙Contributing Guide.
Unless the PR can be sent immediately (e.g. just a few lines of code), we recommend you to leave a comment on the issue like I'm working on it or Can I work on this issue? to avoid duplicating work. Our Discord server is always open and friendly.