refactor(hash-aggr): Migrate ordered partial/final aggregation#23181
Conversation
| assert!(collected_running.len() > 2); | ||
| // Running should produce more chunk than the usual AggregateExec. | ||
| // Otherwise it means that we cannot generate result in running mode. | ||
| assert!(collected_running.len() > collected_usual.len()); |
There was a problem hiding this comment.
This is asserting: we run the same query on OrderedAggregateStream and AggregateStream, the first one should return more number of batches.
This is implementation dependent, and later it will compare the whole result row-by-row, so it's safe to delete
There was a problem hiding this comment.
I think this is actually because the output is not being clamped to the specified record_batch size
I made a PR that targets your branch with a proposed fix here:
There was a problem hiding this comment.
This makes sense with the current implementation.
However, in the future, we might want to accumulate batch_size groups before emitting, so downstream operators can be better vectorized. In that case, this assertion would no longer hold.
So we would need a different assertion to check whether the ordered aggregation variant is used, likely via EXPLAIN. However, let's delay that change until this idea is actually implemented.
| /// `k = 100`, it is safe to emit all groups with keys less than 100 because the | ||
| /// input is ordered. | ||
| /// | ||
| /// ## Implementation Note |
There was a problem hiding this comment.
It's obvious there are many applicable optimizations for this path, here is the explanation why this PR tends to keep it simple.
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn ordered_partial_aggregate_partially_sorted_no_emit_panic() -> Result<()> { |
There was a problem hiding this comment.
This test case is migrated from row_hash.rs's existing UT, and there is a comment left at the original test, so we can check it easier when deleting the old implementation eventually.
|
run benchmarks |
|
run benchmark clickbench_sorted |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing split-aggr-ordered (59864f2) to 8511e18 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing split-aggr-ordered (59864f2) to 8511e18 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing split-aggr-ordered (59864f2) to 8511e18 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing split-aggr-ordered (59864f2) to 8511e18 (merge-base) diff using: clickbench_sorted File an issue against this benchmark runner |
alamb
left a comment
There was a problem hiding this comment.
Thank you @2010YOUY01
Before merging this I think we should:
- revert the change in the fuzz test (I left a proposal about how to do so)
- make sure we don't see any regression in performance benchmarks (I will kick them off)
Longer term it would be nice to have some way to reduce the duplication (I realize some in inevitable, but there is a lot I think that is unecessary)
|
|
||
| /// Buffer for the ordered aggregate table's group keys and accumulator states. | ||
| /// | ||
| /// It accumulates input during aggregation and emits output rows as soon as the |
There was a problem hiding this comment.
these are really nice, easy to understand, comments
| assert!(collected_running.len() > 2); | ||
| // Running should produce more chunk than the usual AggregateExec. | ||
| // Otherwise it means that we cannot generate result in running mode. | ||
| assert!(collected_running.len() > collected_usual.len()); |
There was a problem hiding this comment.
I think this is actually because the output is not being clamped to the specified record_batch size
I made a PR that targets your branch with a proposed fix here:
|
|
||
| /// Methods shared by all aggregate modes | ||
| impl<AggrMode> OrderedAggregateTable<AggrMode> { | ||
| pub(super) fn new_for_mode( |
There was a problem hiding this comment.
One thought I had (for a future PR) is to potentially avoid some of this duplication by using a builder style here instead -- so something like
OrderedAggregateTableBuilder::new()
.with_agg(..)
...
.build<AggrMode>()| fn should_use_ordered_final_aggregate_stream(&self) -> bool { | ||
| matches!( | ||
| self.mode, | ||
| AggregateMode::Final | AggregateMode::FinalPartitioned |
There was a problem hiding this comment.
codex also notes that the exising code doesn't use the partial group by operator when memory limited, as the new tables don't seem to handle the OOM path (though maybe I missed that)
This is somewhat unobviously implemented by OutOfMemoryMode choice here
datafusion/datafusion/physical-plan/src/aggregates/row_hash.rs
Lines 492 to 511 in 59864f2
And a proposed PR to fix
There was a problem hiding this comment.
Yes, the existing behavior is, if there is memory limit, ordered aggregation case fallback to regular hash aggregation path (no ordering optimization) for the spilling.
This is intended for partially ordered case, since it might OOM, and we haven't implement spilling for that.
For fully ordered cases, this is not necessary, since it ensures always use bounded memory, but there is no existing testing coverage for that, probably in a follow-up PR, we can enable ordered fast path even there is a memory limit.
Patch applied in c2ae7dc
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_sorted — base (merge-base)
clickbench_sorted — branch
File an issue against this benchmark runner |
fix: respect batch size in ordered aggregates
|
Thank you for the review, @alamb. The suggestions make sense to me, and I’ll try to address them later (likely tomorrow).
I’m also quite tempted to reuse code here. For example, the partial-reduce variant could probably be implemented by adding a few flags to the existing partial aggregation path. My main concern is that these paths may evolve into very different shapes in the future, due to optimizations specialized for each case. I want to keep that future evolution easy. One example is the ordered aggregation variant. For the fully ordered case, I realized we may not need a hash table at all. We could likely make it faster by finding the group split points first and then aggregating each group directly. So as a middle ground, I think we should:
|
Sounds like a greaet plan to me |
fix: fall back for ordered aggregates with memory limits
|
Shall we merge? |
Which issue does this PR close?
Rationale for this change
Part of #22710
This PRs implements the cases that input is ordered by group keys.
Comments at datafusion/physical-plan/src/aggregates/ordered_partial_stream.rs explains the high-level idea.
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?