Skip to content

Flink: Fix HashKeyGenerator SelectorKey cache ignoring writeParallelism and distributionMode#15740

Merged
pvary merged 1 commit into
apache:mainfrom
Below0:fix-selector-key-cache-missing-fields
Mar 25, 2026
Merged

Flink: Fix HashKeyGenerator SelectorKey cache ignoring writeParallelism and distributionMode#15740
pvary merged 1 commit into
apache:mainfrom
Below0:fix-selector-key-cache-missing-fields

Conversation

@Below0

@Below0 Below0 commented Mar 23, 2026

Copy link
Copy Markdown
Contributor

Problem

HashKeyGenerator.SelectorKey was missing writeParallelism and distributionMode from its equals() and hashCode() methods. As a result, computeIfAbsent always hit the cache after the first record for a given table, silently reusing a stale KeySelector even when these values changed.

This contradicts the class-level Javadoc which states:

"Caching ensures that a new key selector is also created when … the user-provided metadata changes (e.g. distribution mode, write parallelism)."

Fix

Add writeParallelism and distributionMode to SelectorKey's fields, equals(), hashCode(), and toString(). The effective values passed to the cache key match those used in the computeIfAbsent lambda — distributionMode normalized via firstNonNull(..., NONE) and writeParallelism capped at maxWriteParallelism.

Note

writeParallelism and distributionMode should remain stable per table during a streaming job. Changing these values mid-stream — especially when equality fields are set — can cause routing changes that break equality delete co-location, as the subtask assignment is not monotonic across different writeParallelism values (i.e., the subtask set for parallelism N is not guaranteed to be a subset of the set for parallelism N+1).

Making the subtask assignment monotonic (e.g., via a consistent ordering based on maxWriteParallelism) could address this limitation in a follow-up.

Testing

Added two regression tests to TestHashKeyGenerator:

  • testCacheMissOnWriteParallelismChange
  • testCacheMissOnDistributionModeChange

Closes #15731

@github-actions github-actions Bot added the flink label Mar 23, 2026
@Below0 Below0 force-pushed the fix-selector-key-cache-missing-fields branch 3 times, most recently from 2a44c8b to 68dc0d9 Compare March 23, 2026 17:08
@Below0 Below0 marked this pull request as draft March 23, 2026 17:12
@Below0 Below0 marked this pull request as ready for review March 23, 2026 17:34

@mxm mxm left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the PR @Below0! Changes look good. Could you keep only the Flink 2.1 changes for now? We backport to the other Flink versions in a separate step.

@Below0

Below0 commented Mar 25, 2026

Copy link
Copy Markdown
Contributor Author

@mxm
Sure! I'll remove the changes for the other Flink versions and keep only the Flink 2.1 changes.

@Below0 Below0 force-pushed the fix-selector-key-cache-missing-fields branch 2 times, most recently from c148ead to 9fea422 Compare March 25, 2026 02:10
@Below0 Below0 force-pushed the fix-selector-key-cache-missing-fields branch from 9fea422 to d1c0d5b Compare March 25, 2026 02:11
@Below0 Below0 requested a review from mxm March 25, 2026 05:08

@mxm mxm left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Comment on lines +91 to +92
MoreObjects.firstNonNull(dynamicRecord.distributionMode(), DistributionMode.NONE),
Math.min(dynamicRecord.writeParallelism(), maxWriteParallelism));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you added the same logic as for creating the actual KeySelector below. Should we consolidate the SelectorKey and the getKeySelector (below) parameters?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do that in a follow-up.

@pvary pvary merged commit 027f088 into apache:main Mar 25, 2026
16 checks passed
@pvary

pvary commented Mar 25, 2026

Copy link
Copy Markdown
Contributor

Merged to main.
Thanks @Below0 for the fix and @mxm for the review!

@mxm

mxm commented Mar 25, 2026

Copy link
Copy Markdown
Contributor

Thanks @Below0! Could you create a PR for backporting to 1.20 and 2.0?

@Below0

Below0 commented Mar 25, 2026

Copy link
Copy Markdown
Contributor Author

@mxm
Sure! I'll create backport PRs for v1.20 and v2.0.

manuzhang pushed a commit to manuzhang/iceberg that referenced this pull request Mar 30, 2026
ldudas-marx pushed a commit to ldudas-marx/iceberg that referenced this pull request Mar 31, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Flink] DynamicIcebergSink: HashKeyGenerator SelectorKey cache ignores writeParallelism and distributionMode changes

3 participants