Avoid record duplication in JOIN processor #1771
Closed
chubei
started this conversation in
Feature Requests
Replies: 0 comments
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
-
Avoid record duplication in JOIN processor
Problem
The JOIN processor has to keep track of all its history inputs, so when a new record arrives, it can produce the cartesian product of the new record with the history, and filter based on the join condition.
Currently we're storing the history record as
Vec<Field>. Imagine the case where a JOIN processor (call it JOIN2) uses the output of a previous JOIN processor (call it JOIN1) as a input. JOIN2 will have to store all the records that JOIN1 produced. However, JOIN1 also has all the records in its own history. Effectively, memory usage is doubled.This goes worse when there're more cascaded JOIN processors.
Solution
We can store record as
Vec<RefOrField>instead ofVec<Field>.RefOrFieldis an enum that can be either a reference to a record, or a direct field.One may wonder how a
RefOrFieldcan be more memory efficient than aFieldbecause most of the fields are small. The point here is thatRefOrFieldreferences a record, so it can reference many fields at once.The information of which fields are actually referenced in a
RefOrFieldis stored in the correspondingRefSchema, a referencing version ofSchema. WhileSchemastoresVec<FieldDefinition>,RefSchemastoresVec<RefOrFieldDefinition>.RefOrFieldDefinitionis an enum that can be either a reference to a schema and some of the referee's field definitions, or a direct field definition.Note that we only allow one level of reference, meaning that all the fields referenced in a
RefOrFieldDefinition::Refmust be direct field definitions.Code
Field indexing
To index into such
RefSchemas andRefRecords, there has to be two levels of indexing. The first level is to index into theVec<RefOrFieldDefinition>orVec<RefOrField>, and the second level is to index into the direct fields of the referencedRefSchemaorRefRecord.Dereferencing through cloning
RefSchema/RefRecordare not suitable for serialization because if we directly deriveSerialize, all the referencedRefSchema/RefRecordwill be serialized recursively. For ease of implementation, instead of implementing a custom serializer, we can "dereference" theRefSchema/RefRecordto aSchema/Record, cloning all the referencedFieldDefinition/Fieldin the process.Use in JOIN
In JOIN processor, we produce the output schema/record by referencing the left and right input schemas/records. Here we define two helper functions to do that.
Note that in above functions, columns are reordered so we don't keep duplicate references to the same
RefSchema/RefRecord.Now the JOIN processor can implement joining using referencing.
@mediuminvader @v3g42 @snork-alt
Beta Was this translation helpful? Give feedback.
All reactions