Skip to content

[SPARK-57688][SQL] Add spark.sql.execution.bypassPartialAggregation to skip partial agg#56777

Open
xumingming wants to merge 1 commit into
apache:masterfrom
xumingming:bypass-partial-agg
Open

[SPARK-57688][SQL] Add spark.sql.execution.bypassPartialAggregation to skip partial agg#56777
xumingming wants to merge 1 commit into
apache:masterfrom
xumingming:bypass-partial-agg

Conversation

@xumingming

Copy link
Copy Markdown

What changes were proposed in this pull request?

Adds a new SQL config spark.sql.execution.bypassPartialAggregation (default false). When set to true, planAggregateWithoutDistinct skips the pre-shuffle Partial-mode aggregation and runs a single Complete-mode aggregation after the shuffle instead. This can improve performance when group cardinality is high and the pre-shuffle reduction ratio is low.

The bypass is suppressed when a session_window grouping key is present, since MergingSessionsExec must be inserted in the Partial+Merge+Final path to correctly merge overlapping sessions.

The config has no effect on queries containing DISTINCT aggregate functions, where the partial aggregation phases are required for correctness and are always applied.

Why are the changes needed?

The standard two-phase aggregation plan (Partial → shuffle → Final) assumes that pre-shuffle partial aggregation meaningfully reduces data volume. This assumption breaks down in two scenarios.

Scenario 1: High group cardinality. When group cardinality is high relative to partition size, every input row maps to a distinct key, so the partial aggregation produces one output row per input row and adds CPU and memory overhead with zero shuffle benefit.

SELECT user_id, SUM(amount), COUNT(order_id), AVG(price)
FROM orders
GROUP BY user_id   – high-cardinality key: millions of distinct users

On a table with 500M rows and 200M distinct user_id values, the pre-shuffle HashAggregateExec in Partial mode churns through the full dataset, spills when the hash map overflows, and still emits ~200M rows into the shuffle. The partial phase wastes wall-clock time and memory without reducing shuffle write volume.

Scenario 2: Skewed input data. Even when partial aggregation can reduce data volume on average, skewed input partitions can make it harmful. If one partition contains a disproportionate share of rows for a small number of keys, the partial HashAggregateExec on that partition must hold a large hash map in memory, triggering spills. The skewed partition becomes the bottleneck and dominates wall-clock time — worse than if the data had been shuffled first and aggregated on already-partitioned, evenly distributed data.

SELECT country_code, SUM(revenue)
FROM orders
GROUP BY country_code   – a few dominant countries hold 80% of rows

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added Unit Test.

Was this patch authored or co-authored using generative AI tooling?

No.

…tion to skip pre-shuffle partial agg

Adds a new SQL config spark.sql.execution.bypassPartialAggregation
(default false). When set to true, planAggregateWithoutDistinct skips
the pre-shuffle Partial-mode aggregation and runs a single Complete-mode
aggregation after the shuffle instead. This can improve performance when
group cardinality is high and the pre-shuffle reduction ratio is low.

The bypass is suppressed when a session_window grouping key is present,
since MergingSessionsExec must be inserted in the Partial+Merge+Final
path to correctly merge overlapping sessions.

The config has no effect on queries containing DISTINCT aggregate
functions, where the partial aggregation phases are required for
correctness and are always applied.
"When false (default), uses a two-phase Partial+Final aggregation across a shuffle. " +
"This setting has no effect on queries containing DISTINCT aggregate functions, where " +
"the partial aggregation phases are required for correctness and are always applied.")
.version("3.3.1")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.version("3.3.1")
.version("4.3.0")

.booleanConf
.createWithDefault(true)

val BYPASS_PARTIAL_AGGREGATION = buildConf("spark.sql.execution.bypassPartialAggregation")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SparkConfigBindingPolicySuite requires every new config to declare a policy, please make sure to add withBindingPolicy.

"the partial aggregation phases are required for correctness and are always applied.")
.version("3.3.1")
.booleanConf
.createWithDefault(false)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be internal?

s"Expected:\n${expected.mkString("\n")}\nActual:\n${actual.mkString("\n")}")
}
}
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test gap: no test with AQE enabled.

Also, no TypedImperativeAggregate bypass test.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @cloud-fan @viirya @ueshin for AggUtils/AQE interaction

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants