Skip to content

refactor(hash-aggr): Use EmitTo to output#23055

Open
2010YOUY01 wants to merge 3 commits into
apache:mainfrom
2010YOUY01:split-aggr-refactor-output
Open

refactor(hash-aggr): Use EmitTo to output#23055
2010YOUY01 wants to merge 3 commits into
apache:mainfrom
2010YOUY01:split-aggr-refactor-output

Conversation

@2010YOUY01

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Part of #22710

Rationale for this change

Regarding the EPIC issue: I have drafted all the migrations locally, and verified that after deleting the old implementation, UTs are passing.

We are now about 4 feature migration PRs away from completing the EPIC. Before continuing with those migrations, this PR performs some cleanup and refactoring.

What changes are included in this PR?

This PR can be read commit by commit:

  • commit 1: use EmitTo for incremental outputting
  • commit 2: split hash_table.rs into small files

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions Bot added the physical-plan Changes to the physical-plan crate label Jun 20, 2026
@2010YOUY01 2010YOUY01 marked this pull request as draft June 21, 2026 01:16
@2010YOUY01 2010YOUY01 marked this pull request as ready for review June 21, 2026 01:16
@2010YOUY01 2010YOUY01 closed this Jun 21, 2026
@2010YOUY01 2010YOUY01 reopened this Jun 21, 2026
@alamb

alamb commented Jun 22, 2026

Copy link
Copy Markdown
Contributor

Regarding the EPIC issue: I have drafted all the migrations locally, and verified that after deleting the old implementation, UTs are passing.

Amazing

@alamb alamb left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is amazing @2010YOUY01 -- thank you. I found this code really easy to follow and understand. While it is complicated, I think it much more closely mirrors the complexity of the problem being solved now and setting up the control flow logic in this way means we will be in a much better place to improve the performance / featuers going forward

👏

cc @Rachelint

AggregateExec, PhysicalGroupBy, aggregate_expressions, evaluate_group_by,
};

/// Marker for raw rows -> partial state aggregation.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this structure and how it makes it clearer what is going on with the state here

pub(super) accumulator_args: Vec<EvaluatedHashAggregateAccumulator>,
}

/// Hash table state while grouped aggregation is consuming input.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These comments seem a little out of date as this structure also seems to be used while emitting (in addition to building / consuming input)

pub(super) enum AggregateHashTableState {
    Building(BuildingHashTableState),
    Outputting(BuildingHashTableState),  <--- suggests that "Building" state is also used for outputting
    Done,
}

Maybe a name like AggregateHashTableStateInner would be more generic 🤷

pub(super) _mode: PhantomData<AggrMode>,
}

pub(super) struct HashAggregateAccumulator {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few sentences that describe what this structure is might help future readers

something like

/// State and argument information for a single Aggregate
///
/// For example, for `SELECT COUNT(x), SUM(y WHERE z > 10) ...`  there would be two
/// `HashAggregateAccumulator`, one each for `COUNT(x)` and `SUM(y WHERE z > 10)`
pub(super) struct HashAggregateAccumulator {

}
}

pub(super) fn empty_like(&self) -> Result<Self> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add some comments about what this is used for?

accumulator: Box<dyn GroupsAccumulator>,
}

pub(super) struct EvaluatedHashAggregateAccumulator {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit -- this seems liess like an "accumulator" and more like "evaluated arguments"

Maybe it would be better called EvaluatedHashAggregateArgs?

Or maybe I mis understand 🤔

In either event, some comments would also help

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor is that the structuis called final but the module is called final_table.rs -- should we keep it consistent with final.rs?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

likewise here, the struct is named Partial but the module partial_table.rs -- recommend partial.rs to be consistent

) -> Result<Option<RecordBatch>> {
let output_schema = Arc::clone(&self.output_schema);
let batch_size = self.batch_size;
match &mut self.state {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this state match and some of the outputtting state is duplicated across the types of tables, but I think it is ok

.all(|acc| acc.supports_convert_to_state())
}

/// In skip-partial-aggregation optimization, when a decision has made to skip

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// In skip-partial-aggregation optimization, when a decision has made to skip
/// In skip-partial-aggregation optimization, when a decision has been made to skip

/// In skip-partial-aggregation optimization, when a decision has made to skip
/// partial stage, build a typed hash table only for aggregation state conversion
/// row-by-row.
pub(in crate::aggregates) fn partial_skip_table(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could avoid some clones below if this consumed self rather than took it by reference

Maybe it doesn't matter

.building()
.accumulators
.iter()
.all(|acc| acc.supports_convert_to_state())

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should try and remove this "supports_convert_to_state" API (as a follow on PR / project) to simplify the hash aggregate code and ensure all our groups accumulators have the high performance APIs.

I filed a ticket

))
}

fn evaluate(&self, batch: &RecordBatch) -> Result<EvaluatedHashAggregateAccumulator> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about name it evaluate_acc_args like evaluate_group_by ?

.merge_batch(&values.arguments, group_indices, total_num_groups)
}

pub(super) fn evaluate_final(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And can just name it evaluate after renaming above.

}
}

/// Methods shared by all aggregate hash table modes.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems move method impls near where it define may be clearer?

acc.update_batch(values, group_indices, total_num_groups)?;
}
}
drop(timer);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explicit timer drop here seems can be removed, but not really matter.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants