[opt](local shuffle) support bucket shuffle for set operation#65129
[opt](local shuffle) support bucket shuffle for set operation#65129924060929 wants to merge 1 commit into
Conversation
|
run buildall |
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
TPC-H: Total hot run time: 29959 ms |
TPC-DS: Total hot run time: 174275 ms |
ClickBench: Total hot run time: 25.48 s |
FE UT Coverage ReportIncrement line coverage |
FE Regression Coverage ReportIncrement line coverage |
morrySnow
left a comment
There was a problem hiding this comment.
Code Review Summary
This PR re-enables bucket shuffle for set operations (UNION/INTERSECT/EXCEPT) by uncommenting previously-disabled planner logic, gated on enableLocalShufflePlanner. The approach is sound — when the FE local shuffle planner is active, the largest storage-bucketed child keeps its distribution while other children are bucket-shuffled onto it. Below are the findings, ranked by severity.
All findings verified against the PR branch (fe_local_shuffle) and master.
| for (Plan child : setOperation.children()) { | ||
| PhysicalPlan childPhysicalPlan = (PhysicalPlan) child; | ||
| if (JoinUtils.isStorageBucketed(childPhysicalPlan.getPhysicalProperties())) { | ||
| setOperationNode.setDistributionMode(DistributionMode.BUCKET_SHUFFLE); |
There was a problem hiding this comment.
Colocate and BUCKET_SHUFFLE can both be set on the same SetOperationNode.
The existing setColocate(true) at line 2496 and the new setDistributionMode(BUCKET_SHUFFLE) here can both fire on the same node. The basic bucket-shuffle child's OlapScan may be directly visible (no exchange wrapper), so findOlapScanNodesByPassExchangeNode finds it and sets colocate=true. Then this new check also finds storage-bucketed children and sets BUCKET_SHUFFLE.
For hash joins, colocate and BUCKET_SHUFFLE are mutually exclusive via an explicit if-else chain (lines 1718 vs 1728). For set operations, no such exclusivity exists. The interaction between colocate scheduling (hasColocatePlanNode) and bucket shuffle scheduling (isBucketShuffle()) on the same set operation node is unclear and untested.
Suggestion: Follow the hash join pattern — make colocate and BUCKET_SHUFFLE mutually exclusive for set operations too, or document the expected interaction.
There was a problem hiding this comment.
For a set operation the two flags are complementary rather than alternative execution modes, unlike hash join: isColocate describes the bucket-aligned scheduling of the fragment (the basic bucket-shuffle child scans buckets directly, so the pre-existing direct-scan rule fires — and it must fire, the fragment needs colocate scheduling for the bucket alignment to hold), while BUCKET_SHUFFLE describes how the other children arrive (bucket-shuffle exchanges). Both routes converge to the same bucket-hash local exchange requirement in SetOperationNode.enforceAndDeriveLocalExchange (isColocated(this) || isBucketShuffle()), so their coexistence is intended. Documented this at the site. Also verified on a 4-BE cluster that this combination produces correct results, including the shape where the basic child is a join output (covered by the new regression case bucket_shuffle_join_as_basic_child).
| // bucket instead of execution-bucketed hash. | ||
| for (Plan child : setOperation.children()) { | ||
| PhysicalPlan childPhysicalPlan = (PhysicalPlan) child; | ||
| if (JoinUtils.isStorageBucketed(childPhysicalPlan.getPhysicalProperties())) { |
There was a problem hiding this comment.
Missing enableLocalShufflePlanner gate.
The translator unconditionally sets BUCKET_SHUFFLE when storage-bucketed children are detected. ChildrenPropertiesRegulator (line 664) and RequestPropertyDeriver (line 348) both explicitly gate their new behavior on enableLocalShufflePlanner, but the translator does not.
While the invariant holds today — storage-bucketed children only appear when the planner gate is active — the dependency is implicit. A defensive guard (or at minimum a comment) would make this explicit and prevent silent breakage if a future planner change produces STORAGE_BUCKETED children without the gate.
// Suggestion: add guard
if (enableLocalShufflePlanner) {
for (Plan child : setOperation.children()) { ... }
}There was a problem hiding this comment.
Added the isEnableLocalShufflePlanner() guard here, so the three sites (RequestPropertyDeriver / ChildrenPropertiesRegulator / translator) are explicitly consistent now.
| if (childDistribution instanceof DistributionSpecHash | ||
| && supportBucketShuffleTypes.contains( | ||
| ((DistributionSpecHash) childDistribution).getShuffleType()) | ||
| && !(isBucketShuffleDownGrade(setOperation.child(i)))) { |
There was a problem hiding this comment.
isEnableBucketShuffleJoin controls set operation bucket shuffle.
The isBucketShuffleDownGrade method (line 294) checks ConnectContext.get().getSessionVariable().isEnableBucketShuffleJoin(), a join-specific session variable. A user who disables bucket shuffle for joins (enable_bucket_shuffle_join=false) also silently loses the set operation bucket shuffle optimization, with no independent control.
This coupling existed in the original commented-out PR #59006 code and is now activated. Consider whether set operations warrant a separate session variable, or at minimum document this coupling in the session variable description.
There was a problem hiding this comment.
This coupling is intentional: bucket shuffle for set operation belongs to the same optimization family as bucket shuffle join (same downgrade trade-off: bucket count vs instance parallelism), so the join switch and bucket_shuffle_downgrade_ratio govern both instead of introducing one more session variable. Documented the coupling at the call site. If you prefer an independent switch I can split it out in a follow-up.
| // shuffle: with the BE-side local-shuffle planner the backend cannot infer the | ||
| // correct local shuffle type for the set sink/probe and computes wrong results, so | ||
| // fall back to EXECUTION_BUCKETED there. | ||
| ConnectContext setOperationContext = ConnectContext.get(); |
There was a problem hiding this comment.
Redundant ConnectContext.get() — use this.connectContext instead.
The class already holds a private final ConnectContext connectContext field (line 102), set in both constructors (lines 107, 112) and used throughout the class (lines 150, 160, 171, 182). The new code calls ConnectContext.get() (thread-local lookup) unnecessarily. Using the existing field would be both simpler and consistent with the rest of the class.
There was a problem hiding this comment.
Good catch, changed to use the existing connectContext field.
9359594 to
8dfeddd
Compare
|
run buildall |
1 similar comment
|
run buildall |
Re-enable bucket shuffle for set operation / union: the largest natural or storage-bucketed child keeps its bucket distribution and every other child is bucket-shuffled to it, avoiding a full reshuffle of the largest input (the same idea as bucket-shuffle join applied to set operations). This is only valid under the FE local-shuffle planner (enable_local_shuffle_planner): only then can the frontend plan the correct local shuffle type for the set sink/probe. With the BE-side local-shuffle planner the backend cannot infer the type and computes wrong results, so the plan falls back to execution-bucketed shuffle there and behavior is unchanged.
8dfeddd to
b847f60
Compare
|
run buildall |
TPC-H: Total hot run time: 30126 ms |
TPC-DS: Total hot run time: 173676 ms |
ClickBench: Total hot run time: 29.09 s |
What problem does this PR solve?
Related PR: #59006, #60823
Problem Summary:
Re-enable bucket shuffle for set operation (union / intersect / except).
This optimization was introduced by #59006 and later disabled by #60823, because at that time the backend could not plan the correct local shuffle type for set operation, which produced wrong results. Now that planning local shuffle in the frontend is supported, the frontend plans the correct local shuffle type, so it is safe to re-enable this feature.
When
enable_local_shuffle_planneris enabled (the default), the optimizer keeps the distribution of the largest natural / storage-bucketed child of a set operation and bucket-shuffles the other children onto it (the same idea as bucket shuffle join), which avoids a full reshuffle of the largest side. When it is disabled, the previous full-shuffle behavior is kept, so this change is gated and only takes effect with the frontend local shuffle planner.Besides re-enabling the planning, two local-exchange alignment problems that the re-enabled shapes expose are fixed (both verified on a 4-BE cluster with a stable wrong-result reproduction before the fix and correct results after):
SetOperationNode.enforceAndDeriveLocalExchangeonly handled the colocate mode. A bucket-shuffle intersect / except whose basic child is a join output (instead of a direct scan) fell into the partitioned branch, so the basic side was locally re-partitioned by execution hash while the other side stayed bucket-distributed, and the set operation lost rows. Now the bucket-shuffle mode takes the samerequireBucketHashpath as colocate, mirroringHashJoinNode.RequireSpecific.autoRequireHash()degradedLOCAL_EXECUTION_HASH_SHUFFLEto the generic hash requirement. Pass-through operators (union / streaming agg / sort) forward this requirement to children while claiming the specific type to their parent, so under a bucket join upgraded to local hash, a bucket-distributed child could satisfy the generic requirement and keep its bucket placement while the parent skipped its re-align local exchange, and the mixed placements computed wrong results. A specific hash requirement is now forwarded as-is.The regression suite disables the bucket shuffle downgrade so the chosen shapes do not depend on the backend count / parallelism of the environment, and adds a case where the basic child of a bucket-shuffle intersect is a join output.
Release note
None
Check List (For Author)
Test
enable_local_shuffle_planneron and off, including the shapes above with a stable wrong-result reproduction before the fixes.Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)