Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ int generateKey(
tableSpec != null ? tableSpec.specId() : null,
dynamicRecord.schema(),
dynamicRecord.spec(),
dynamicRecord.equalityFields());
dynamicRecord.equalityFields(),
MoreObjects.firstNonNull(dynamicRecord.distributionMode(), DistributionMode.NONE),
Math.min(dynamicRecord.writeParallelism(), maxWriteParallelism));
Comment on lines +91 to +92

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you added the same logic as for creating the actual KeySelector below. Should we consolidate the SelectorKey and the getKeySelector (below) parameters?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do that in a follow-up.

KeySelector<RowData, Integer> keySelector =
keySelectorCache.computeIfAbsent(
cacheKey,
Expand Down Expand Up @@ -322,6 +324,8 @@ static class SelectorKey {
private final Schema schema;
private final PartitionSpec spec;
private final Set<String> equalityFields;
private final DistributionMode distributionMode;
private final int writeParallelism;

SelectorKey(
String tableName,
Expand All @@ -330,14 +334,18 @@ static class SelectorKey {
@Nullable Integer tableSpecId,
Schema schema,
PartitionSpec spec,
Set<String> equalityFields) {
Set<String> equalityFields,
DistributionMode distributionMode,
int writeParallelism) {
this.tableName = tableName;
this.branch = branch;
this.schemaId = tableSchemaId;
this.specId = tableSpecId;
this.schema = tableSchemaId == null ? schema : null;
this.spec = tableSpecId == null ? spec : null;
this.equalityFields = equalityFields;
this.distributionMode = distributionMode;
this.writeParallelism = writeParallelism;
}

@Override
Expand All @@ -357,12 +365,23 @@ public boolean equals(Object other) {
&& Objects.equals(specId, that.specId)
&& Objects.equals(schema, that.schema)
&& Objects.equals(spec, that.spec)
&& Objects.equals(equalityFields, that.equalityFields);
&& Objects.equals(equalityFields, that.equalityFields)
&& distributionMode == that.distributionMode
&& writeParallelism == that.writeParallelism;
}

@Override
public int hashCode() {
return Objects.hash(tableName, branch, schemaId, specId, schema, spec, equalityFields);
return Objects.hash(
tableName,
branch,
schemaId,
specId,
schema,
spec,
equalityFields,
distributionMode,
writeParallelism);
}

@Override
Expand All @@ -375,6 +394,8 @@ public String toString() {
.add("schema", schema)
.add("spec", spec)
.add("equalityFields", equalityFields)
.add("distributionMode", distributionMode)
.add("writeParallelism", writeParallelism)
.toString();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,74 @@ void testCaching() throws Exception {
assertThat(writeKey1).isEqualTo(writeKey3);
}

@Test
void testCacheMissOnWriteParallelismChange() throws Exception {
int maxWriteParallelism = 8;
HashKeyGenerator generator = new HashKeyGenerator(10, maxWriteParallelism);
Map<HashKeyGenerator.SelectorKey, KeySelector<RowData, Integer>> keySelectorCache =
generator.getKeySelectorCache();

PartitionSpec unpartitioned = PartitionSpec.unpartitioned();
DynamicRecord record1 =
new DynamicRecord(
TABLE_IDENTIFIER,
BRANCH,
SCHEMA,
GenericRowData.of(1, StringData.fromString("foo")),
unpartitioned,
DistributionMode.NONE,
2);
DynamicRecord record2 =
new DynamicRecord(
TABLE_IDENTIFIER,
BRANCH,
SCHEMA,
GenericRowData.of(1, StringData.fromString("foo")),
unpartitioned,
DistributionMode.NONE,
4);

generator.generateKey(record1);
assertThat(keySelectorCache).hasSize(1);

generator.generateKey(record2);
assertThat(keySelectorCache).hasSize(2);
}

@Test
void testCacheMissOnDistributionModeChange() throws Exception {
int maxWriteParallelism = 8;
HashKeyGenerator generator = new HashKeyGenerator(10, maxWriteParallelism);
Map<HashKeyGenerator.SelectorKey, KeySelector<RowData, Integer>> keySelectorCache =
generator.getKeySelectorCache();

PartitionSpec partitioned = PartitionSpec.builderFor(SCHEMA).identity("id").build();
DynamicRecord record1 =
new DynamicRecord(
TABLE_IDENTIFIER,
BRANCH,
SCHEMA,
GenericRowData.of(1, StringData.fromString("foo")),
partitioned,
DistributionMode.NONE,
2);
DynamicRecord record2 =
new DynamicRecord(
TABLE_IDENTIFIER,
BRANCH,
SCHEMA,
GenericRowData.of(1, StringData.fromString("foo")),
partitioned,
DistributionMode.HASH,
2);

generator.generateKey(record1);
assertThat(keySelectorCache).hasSize(1);

generator.generateKey(record2);
assertThat(keySelectorCache).hasSize(2);
}

private static int getWriteKey(
HashKeyGenerator generator,
PartitionSpec spec,
Expand Down
Loading