diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java index 5eb31a9f705f..fca45bf882e0 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java @@ -87,7 +87,9 @@ int generateKey( tableSpec != null ? tableSpec.specId() : null, dynamicRecord.schema(), dynamicRecord.spec(), - dynamicRecord.equalityFields()); + dynamicRecord.equalityFields(), + MoreObjects.firstNonNull(dynamicRecord.distributionMode(), DistributionMode.NONE), + Math.min(dynamicRecord.writeParallelism(), maxWriteParallelism)); KeySelector keySelector = keySelectorCache.computeIfAbsent( cacheKey, @@ -322,6 +324,8 @@ static class SelectorKey { private final Schema schema; private final PartitionSpec spec; private final Set equalityFields; + private final DistributionMode distributionMode; + private final int writeParallelism; SelectorKey( String tableName, @@ -330,7 +334,9 @@ static class SelectorKey { @Nullable Integer tableSpecId, Schema schema, PartitionSpec spec, - Set equalityFields) { + Set equalityFields, + DistributionMode distributionMode, + int writeParallelism) { this.tableName = tableName; this.branch = branch; this.schemaId = tableSchemaId; @@ -338,6 +344,8 @@ static class SelectorKey { this.schema = tableSchemaId == null ? schema : null; this.spec = tableSpecId == null ? spec : null; this.equalityFields = equalityFields; + this.distributionMode = distributionMode; + this.writeParallelism = writeParallelism; } @Override @@ -357,12 +365,23 @@ public boolean equals(Object other) { && Objects.equals(specId, that.specId) && Objects.equals(schema, that.schema) && Objects.equals(spec, that.spec) - && Objects.equals(equalityFields, that.equalityFields); + && Objects.equals(equalityFields, that.equalityFields) + && distributionMode == that.distributionMode + && writeParallelism == that.writeParallelism; } @Override public int hashCode() { - return Objects.hash(tableName, branch, schemaId, specId, schema, spec, equalityFields); + return Objects.hash( + tableName, + branch, + schemaId, + specId, + schema, + spec, + equalityFields, + distributionMode, + writeParallelism); } @Override @@ -375,6 +394,8 @@ public String toString() { .add("schema", schema) .add("spec", spec) .add("equalityFields", equalityFields) + .add("distributionMode", distributionMode) + .add("writeParallelism", writeParallelism) .toString(); } } diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java index 7c1d3c3d0aeb..c65f96b12cbb 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java @@ -401,6 +401,74 @@ void testCaching() throws Exception { assertThat(writeKey1).isEqualTo(writeKey3); } + @Test + void testCacheMissOnWriteParallelismChange() throws Exception { + int maxWriteParallelism = 8; + HashKeyGenerator generator = new HashKeyGenerator(10, maxWriteParallelism); + Map> keySelectorCache = + generator.getKeySelectorCache(); + + PartitionSpec unpartitioned = PartitionSpec.unpartitioned(); + DynamicRecord record1 = + new DynamicRecord( + TABLE_IDENTIFIER, + BRANCH, + SCHEMA, + GenericRowData.of(1, StringData.fromString("foo")), + unpartitioned, + DistributionMode.NONE, + 2); + DynamicRecord record2 = + new DynamicRecord( + TABLE_IDENTIFIER, + BRANCH, + SCHEMA, + GenericRowData.of(1, StringData.fromString("foo")), + unpartitioned, + DistributionMode.NONE, + 4); + + generator.generateKey(record1); + assertThat(keySelectorCache).hasSize(1); + + generator.generateKey(record2); + assertThat(keySelectorCache).hasSize(2); + } + + @Test + void testCacheMissOnDistributionModeChange() throws Exception { + int maxWriteParallelism = 8; + HashKeyGenerator generator = new HashKeyGenerator(10, maxWriteParallelism); + Map> keySelectorCache = + generator.getKeySelectorCache(); + + PartitionSpec partitioned = PartitionSpec.builderFor(SCHEMA).identity("id").build(); + DynamicRecord record1 = + new DynamicRecord( + TABLE_IDENTIFIER, + BRANCH, + SCHEMA, + GenericRowData.of(1, StringData.fromString("foo")), + partitioned, + DistributionMode.NONE, + 2); + DynamicRecord record2 = + new DynamicRecord( + TABLE_IDENTIFIER, + BRANCH, + SCHEMA, + GenericRowData.of(1, StringData.fromString("foo")), + partitioned, + DistributionMode.HASH, + 2); + + generator.generateKey(record1); + assertThat(keySelectorCache).hasSize(1); + + generator.generateKey(record2); + assertThat(keySelectorCache).hasSize(2); + } + private static int getWriteKey( HashKeyGenerator generator, PartitionSpec spec,