refactor: make join projection pushdown schema-aware via ColumnIndex/…#23185
refactor: make join projection pushdown schema-aware via ColumnIndex/…#23185Phoenix500526 wants to merge 3 commits into
Conversation
…JoinSide try_pushdown_through_join decided whether each projected join-output column belonged to the left or right child by comparing its output index against the left child's field count, assuming a `left ++ right` output schema. That is wrong for joins whose output is not a plain concatenation -- mark joins append a synthetic `mark` column (JoinSide::None) belonging to neither child -- which is why apache#22902 had to bypass projection pushdown for LeftMark / RightMark. Group the projected columns by the join's output-origin metadata (ColumnIndex.side) instead of output position: Left/Right columns push to the matching child via the child-relative ColumnIndex.index, and a synthetic (JoinSide::None) column makes the pushdown decline so the projection is embedded into the join instead -- no panic, no mis-routing. This lets the LeftMark / RightMark bypass be removed from HashJoinExec and NestedLoopJoinExec. The shared helpers used by cross / symmetric / sort-merge joins (new_join_children, join_table_borders, update_join_on/filter) keep their signatures; the side-grouped path uses a new new_join_children_from_groups and reuses update_join_on/filter with a zero column-index offset. Mark joins still fall back to embedding (pushing their child columns down is a follow-up). Closes apache#23010 Signed-off-by: Jiawei Zhao <Phoenix500526@163.com>
09a1d17 to
dca264a
Compare
|
Thank you for opening this pull request! Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch). Details |
It is a crate-internal helper, called only by HashJoinExec and NestedLoopJoinExec within datafusion-physical-plan, is not re-exported, and has no external consumers. Reducing its visibility keeps it off the public API surface so future signature changes no longer trip cargo-semver-checks. Signed-off-by: Jiawei Zhao <Phoenix500526@163.com>
There was a problem hiding this comment.
@Phoenix500526
Thanks for working on this!
I think the projection pushdown approach makes sense overall, but I have a couple of API surface concerns that I'd like to address before merging.
| } | ||
|
|
||
| /// column indices | ||
| pub fn column_indices(&self) -> &Vec<ColumnIndex> { |
There was a problem hiding this comment.
I don't think we should add a new public HashJoinExec::column_indices(&self) -> &Vec<ColumnIndex> API just so projection pushdown can pass this internal join metadata to a crate-local helper.
The follow up commit makes try_pushdown_through_join pub(crate) specifically to avoid expanding the public API surface, so this accessor feels a bit at odds with that goal. It also exposes the internal Vec representation, which becomes part of the semver contract.
Since the call sites are within the same impl, could we just pass &self.column_indices or self.column_indices.as_slice() directly? If an accessor is still preferred, I think it should be pub(crate) and return &[ColumnIndex] instead.
| } | ||
|
|
||
| /// column indices | ||
| pub fn column_indices(&self) -> &Vec<ColumnIndex> { |
There was a problem hiding this comment.
Same concern here. This NestedLoopJoinExec::column_indices(&self) -> &Vec<ColumnIndex> accessor is only needed by the internal projection pushdown code, so I'd rather not expose it as a public &Vec API.
Could we either access the field directly from the impl or make the accessor pub(crate) and have it return &[ColumnIndex] instead?
Remove public column_indices accessors from hash and nested-loop join execution nodes. Projection pushdown now passes the internal column_indices slice from within each impl, avoiding new public API surface and keeping the Vec representation private. Signed-off-by: Jiawei Zhao <Phoenix500526@163.com>
There was a problem hiding this comment.
@Phoenix500526
Thanks for the follow-up. The accessor comments look addressed now. I think there is still one blocking semver issue to fix.
| } | ||
|
|
||
| pub fn try_pushdown_through_join( | ||
| pub(crate) fn try_pushdown_through_join( |
There was a problem hiding this comment.
I think we still need to preserve the existing public try_pushdown_through_join API here. cargo-semver-checks is flagging this as function_missing, and downstream users that import datafusion_physical_plan::projection::try_pushdown_through_join would break without a major version bump.
Could we keep the old public function path and signature as a compatibility wrapper, then move the new ColumnIndex aware implementation into a crate-private helper for the join call sites? For example, HashJoinExec and NestedLoopJoinExec could call a renamed pub(crate) helper that accepts &[ColumnIndex], while the existing public API remains available.
Which issue does this PR close?
ColumnIndex/JoinSide#23010Rationale for this change
try_pushdown_through_join(datafusion/physical-plan/src/projection.rs)decided whether each projected join-output column belonged to the left or right
child by comparing its output index against the left child's field count
(
join_table_borders) — i.e. it assumed the join output is a plainleft ++ rightconcatenation.That assumption does not hold for all join types. A
LeftMark/RightMarkjoin appends a synthetic
markboolean column that hasJoinSide::Noneandoriginates from neither child, so reasoning from output position can route the
markcolumn to the wrong child. This is the panic that #22902 worked around bydisabling projection pushdown for
LeftMark/RightMarkinHashJoinExecand
NestedLoopJoinExec— correct, but it left the helper reasoning from outputposition rather than from the join's actual output-origin contract.
What changes are included in this PR?
&[ColumnIndex], already computed bybuild_join_schema) intotry_pushdown_through_join.join_table_borders+index < left_field_count) with grouping byColumnIndex.side:Left/Rightcolumns are collected per child using the child-relativeColumnIndex.index;JoinSide::None(synthetic, e.g.mark) column makes the pushdown decline(
Ok(None)), so the projection is embedded into the join instead — no panic,no mis-routing.
LeftMark | RightMarkbypass fromHashJoinExec::try_swapping_with_projectionandNestedLoopJoinExec::try_swapping_with_projection; they now calltry_pushdown_through_joinuniformly.(
new_join_children,join_table_borders,join_allows_pushdown,update_join_on,update_join_filter) keep their signatures. The side-groupedpath uses a new private
new_join_children_from_groupsand reusesupdate_join_on/update_join_filterwith a zero column-index offset (thegroups already carry child-relative indices).
Are these changes tested?
Yes.
subquery.sltcases place a subset/reordering projection over a mark join— hash
LeftMark(INunderOR), negated mark (NOT EXISTSunderOR),and nested-loop mark (non-equi correlated) — asserting both the query result
and the
EXPLAINplan shape.cargo test -p datafusion-physical-plan projection(incl. the filter-pushdown andjoin_table_bordersunit tests),the join
*.sltsuite, andsubquery.sltall pass.cargo clippy -p datafusion-physical-plan -- -D warningsandcargo fmt --allare clean.
Are there any user-facing changes?
No behavior change: the produced physical plans are unchanged for the supported
cases (mark joins keep falling back to the same embedded-projection plan as
before, regular joins push down as before).
One signature change to a
pubhelper:try_pushdown_through_joingains acolumn_indices: &[ColumnIndex]parameter (all in-tree callers are updated). Ifthe project treats this helper as part of the public API surface, please add the
api changelabel.