Flink: Fix HashKeyGenerator SelectorKey cache ignoring writeParallelism and distributionMode#15740
Merged
Merged
Conversation
2a44c8b to
68dc0d9
Compare
Contributor
Author
|
@mxm |
c148ead to
9fea422
Compare
…sm and distributionMode
9fea422 to
d1c0d5b
Compare
mxm
approved these changes
Mar 25, 2026
Comment on lines
+91
to
+92
| MoreObjects.firstNonNull(dynamicRecord.distributionMode(), DistributionMode.NONE), | ||
| Math.min(dynamicRecord.writeParallelism(), maxWriteParallelism)); |
Contributor
There was a problem hiding this comment.
I see you added the same logic as for creating the actual KeySelector below. Should we consolidate the SelectorKey and the getKeySelector (below) parameters?
Contributor
There was a problem hiding this comment.
We can do that in a follow-up.
pvary
approved these changes
Mar 25, 2026
Contributor
Contributor
|
Thanks @Below0! Could you create a PR for backporting to 1.20 and 2.0? |
Contributor
Author
|
@mxm |
manuzhang
pushed a commit
to manuzhang/iceberg
that referenced
this pull request
Mar 30, 2026
…sm and distributionMode (apache#15740)
ldudas-marx
pushed a commit
to ldudas-marx/iceberg
that referenced
this pull request
Mar 31, 2026
…sm and distributionMode (apache#15740)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
HashKeyGenerator.SelectorKeywas missingwriteParallelismanddistributionModefrom itsequals()andhashCode()methods. As a result,computeIfAbsentalways hit the cache after the first record for a given table, silently reusing a staleKeySelectoreven when these values changed.This contradicts the class-level Javadoc which states:
Fix
Add
writeParallelismanddistributionModetoSelectorKey's fields,equals(),hashCode(), andtoString(). The effective values passed to the cache key match those used in thecomputeIfAbsentlambda —distributionModenormalized viafirstNonNull(..., NONE)andwriteParallelismcapped atmaxWriteParallelism.Note
writeParallelismanddistributionModeshould remain stable per table during a streaming job. Changing these values mid-stream — especially when equality fields are set — can cause routing changes that break equality delete co-location, as the subtask assignment is not monotonic across differentwriteParallelismvalues (i.e., the subtask set for parallelism N is not guaranteed to be a subset of the set for parallelism N+1).Making the subtask assignment monotonic (e.g., via a consistent ordering based on
maxWriteParallelism) could address this limitation in a follow-up.Testing
Added two regression tests to
TestHashKeyGenerator:testCacheMissOnWriteParallelismChangetestCacheMissOnDistributionModeChangeCloses #15731