Skip to content

[FEATURE] support processing a single shard out of N for sources (for multiple workers and sampling) #1333

@georgeh0

Description

@georgeh0

Original thread: https://discord.com/channels/1314801574169673738/1443136035847405639

We want to support processing a single shard out of N for sources. Users can configure a shard number N and the current shard ID. CocoIndex divides all source keys evenly into N shards (by running a deterministic hash function % N), and only picks source rows in the current shard for processing. It'll be helpful for both distributed processing (run CocoIndex on multiple workers) and sampling (only run CocoIndex on a subset of source rows).

Note that the row tracking table (in CocoIndex's internal storage) is NOT sharded. We still only have a single tracking table for each flow. Only processing is sharded - which is the stateless part, so resharding will be trivial.

Implementation plan:

  • Add additional fields num_shards and current_shard_id into GlobalExecutionOptions. The corresponding type on Python side needs to be updated too.

  • Wire the new fields all the way down to the SourceIndexingContext, and skip processing a row if not in the current shard, in the following loops:

    To decide if a row is within the current shard, compute a fingerprint on the row key and mod num_shards (since it's a [u8; 16] which has 128-bit, we can convert the first 64-bit to an i64 and mod num_shards with it).


❤️ Contributors, please refer to 📙Contributing Guide.
Unless the PR can be sent immediately (e.g. just a few lines of code), we recommend you to leave a comment on the issue like I'm working on it or Can I work on this issue? to avoid duplicating work. Our Discord server is always open and friendly.

Metadata

Metadata

Assignees

Labels

No labels
No labels

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions