refactor(hash-aggr): Use EmitTo to output#23055
Conversation
Amazing |
alamb
left a comment
There was a problem hiding this comment.
This is amazing @2010YOUY01 -- thank you. I found this code really easy to follow and understand. While it is complicated, I think it much more closely mirrors the complexity of the problem being solved now and setting up the control flow logic in this way means we will be in a much better place to improve the performance / featuers going forward
👏
cc @Rachelint
| AggregateExec, PhysicalGroupBy, aggregate_expressions, evaluate_group_by, | ||
| }; | ||
|
|
||
| /// Marker for raw rows -> partial state aggregation. |
There was a problem hiding this comment.
I like this structure and how it makes it clearer what is going on with the state here
| pub(super) accumulator_args: Vec<EvaluatedHashAggregateAccumulator>, | ||
| } | ||
|
|
||
| /// Hash table state while grouped aggregation is consuming input. |
There was a problem hiding this comment.
These comments seem a little out of date as this structure also seems to be used while emitting (in addition to building / consuming input)
pub(super) enum AggregateHashTableState {
Building(BuildingHashTableState),
Outputting(BuildingHashTableState), <--- suggests that "Building" state is also used for outputting
Done,
}Maybe a name like AggregateHashTableStateInner would be more generic 🤷
| pub(super) _mode: PhantomData<AggrMode>, | ||
| } | ||
|
|
||
| pub(super) struct HashAggregateAccumulator { |
There was a problem hiding this comment.
A few sentences that describe what this structure is might help future readers
something like
/// State and argument information for a single Aggregate
///
/// For example, for `SELECT COUNT(x), SUM(y WHERE z > 10) ...` there would be two
/// `HashAggregateAccumulator`, one each for `COUNT(x)` and `SUM(y WHERE z > 10)`
pub(super) struct HashAggregateAccumulator {| } | ||
| } | ||
|
|
||
| pub(super) fn empty_like(&self) -> Result<Self> { |
There was a problem hiding this comment.
can you add some comments about what this is used for?
| accumulator: Box<dyn GroupsAccumulator>, | ||
| } | ||
|
|
||
| pub(super) struct EvaluatedHashAggregateAccumulator { |
There was a problem hiding this comment.
Nit -- this seems liess like an "accumulator" and more like "evaluated arguments"
Maybe it would be better called EvaluatedHashAggregateArgs?
Or maybe I mis understand 🤔
In either event, some comments would also help
There was a problem hiding this comment.
Minor is that the structuis called final but the module is called final_table.rs -- should we keep it consistent with final.rs?
There was a problem hiding this comment.
likewise here, the struct is named Partial but the module partial_table.rs -- recommend partial.rs to be consistent
| ) -> Result<Option<RecordBatch>> { | ||
| let output_schema = Arc::clone(&self.output_schema); | ||
| let batch_size = self.batch_size; | ||
| match &mut self.state { |
There was a problem hiding this comment.
this state match and some of the outputtting state is duplicated across the types of tables, but I think it is ok
| .all(|acc| acc.supports_convert_to_state()) | ||
| } | ||
|
|
||
| /// In skip-partial-aggregation optimization, when a decision has made to skip |
There was a problem hiding this comment.
| /// In skip-partial-aggregation optimization, when a decision has made to skip | |
| /// In skip-partial-aggregation optimization, when a decision has been made to skip |
| /// In skip-partial-aggregation optimization, when a decision has made to skip | ||
| /// partial stage, build a typed hash table only for aggregation state conversion | ||
| /// row-by-row. | ||
| pub(in crate::aggregates) fn partial_skip_table( |
There was a problem hiding this comment.
I wonder if we could avoid some clones below if this consumed self rather than took it by reference
Maybe it doesn't matter
| .building() | ||
| .accumulators | ||
| .iter() | ||
| .all(|acc| acc.supports_convert_to_state()) |
There was a problem hiding this comment.
I think we should try and remove this "supports_convert_to_state" API (as a follow on PR / project) to simplify the hash aggregate code and ensure all our groups accumulators have the high performance APIs.
I filed a ticket
| )) | ||
| } | ||
|
|
||
| fn evaluate(&self, batch: &RecordBatch) -> Result<EvaluatedHashAggregateAccumulator> { |
There was a problem hiding this comment.
How about name it evaluate_acc_args like evaluate_group_by ?
| .merge_batch(&values.arguments, group_indices, total_num_groups) | ||
| } | ||
|
|
||
| pub(super) fn evaluate_final(&mut self, emit_to: EmitTo) -> Result<ArrayRef> { |
There was a problem hiding this comment.
And can just name it evaluate after renaming above.
| } | ||
| } | ||
|
|
||
| /// Methods shared by all aggregate hash table modes. |
There was a problem hiding this comment.
Seems move method impls near where it define may be clearer?
| acc.update_batch(values, group_indices, total_num_groups)?; | ||
| } | ||
| } | ||
| drop(timer); |
There was a problem hiding this comment.
Explicit timer drop here seems can be removed, but not really matter.
Which issue does this PR close?
Part of #22710
Rationale for this change
Regarding the EPIC issue: I have drafted all the migrations locally, and verified that after deleting the old implementation, UTs are passing.
We are now about 4 feature migration PRs away from completing the EPIC. Before continuing with those migrations, this PR performs some cleanup and refactoring.
What changes are included in this PR?
This PR can be read commit by commit:
hash_table.rsinto small filesAre these changes tested?
Are there any user-facing changes?