Flink 2.1: Support UUID type in Avro and Parquet readers and writers#16097
Conversation
…iters Co-authored-by: Joy Haldar <joy.haldar@target.com>
Co-authored-by: Joy Haldar <joy.haldar@target.com>
|
Could we please check that the UUID written out by Flink is the same than the UUID written out by the Generic/Spark writer? IIRC there were some issues that the Parquet UUID reader and the Avro UUID reader were different in some cases, and I'm not sure that this was fixed |
|
CC: @mxm, @Guosmilesmile |
There was a problem hiding this comment.
Thanks for the PR! Could we add an e2e UT to prove that this type is usable? Since there is no UUID type in Flink, I looked at the type conversion and it treats UUID as BinaryType(16). From what it looks like, Flink should be able to read it, but writing would probably fail . May be we should do more than this.
| required(116, "dec_38_10", Types.DecimalType.of(38, 10)), // maximum precision | ||
| required(117, "time", Types.TimeType.get())); | ||
| required(117, "time", Types.TimeType.get()), | ||
| required(118, "uuid", Types.UUIDType.get())); |
There was a problem hiding this comment.
E2E UT may be something like this. This is just for my own testing purposes, and I hope it can provide some inspiration.
class TestFlinkUuidType extends CatalogTestBase {
private static final String TABLE_NAME = "test_table";
private Table icebergTable;
@TempDir private Path warehouseDir;
@Parameters(name = "catalogName={0}, baseNamespace={1}")
protected static List<Object[]> parameters() {
return Arrays.asList(
// For now hive metadata is not supported variant, so we only test hadoop catalog
new Object[] {"testhadoop", Namespace.empty()},
new Object[] {"testhadoop_basenamespace", Namespace.of("l0", "l1")});
}
@Override
@BeforeEach
public void before() {
super.before();
sql("CREATE DATABASE %s", flinkDatabase);
sql("USE CATALOG %s", catalogName);
sql("USE %s", DATABASE);
Schema schema =
new Schema(
Types.NestedField.required(1, "id", Types.IntegerType.get()),
Types.NestedField.optional(2, "data", Types.UUIDType.get()));
icebergTable =
validationCatalog.createTable(
TableIdentifier.of(icebergNamespace, TABLE_NAME),
schema,
PartitionSpec.unpartitioned(),
ImmutableMap.of(
"format-version",
"3",
"write.format.default",
FileFormat.AVRO.name()));
}
@TestTemplate
public void testInsertUuidFromFlink() throws Exception {
UUID expectedUuid = UUID.fromString("123e4567-e89b-12d3-a456-426614174000");
sql(
"INSERT INTO %s VALUES (1, CAST(X'%s' AS BINARY(16)))",
TABLE_NAME,
"123e4567e89b12d3a456426614174000");
List<Record> records = SimpleDataUtil.tableRecords(icebergTable);
assertThat(records).hasSize(1);
assertThat(records.get(0).getField("data")).isInstanceOf(UUID.class).isEqualTo(expectedUuid);
}
@TestTemplate
public void testReadUuidFromFlink() throws Exception {
UUID expectedUuid = UUID.fromString("0f8fad5b-d9cb-469f-a165-70867728950e");
ImmutableList.Builder<Record> builder = ImmutableList.builder();
builder.add(GenericRecord.create(icebergTable.schema()).copy("id", 1, "data", expectedUuid));
new GenericAppenderHelper(icebergTable, FileFormat.PARQUET, warehouseDir)
.appendToTable(builder.build());
icebergTable.refresh();
List<GenericRowData> genericRowData = Lists.newArrayList();
try (CloseableIterable<CombinedScanTask> combinedScanTasks =
icebergTable.newScan().planTasks()) {
for (CombinedScanTask combinedScanTask : combinedScanTasks) {
try (DataIterator<RowData> dataIterator =
ReaderUtil.createDataIterator(
combinedScanTask, icebergTable.schema(), icebergTable.schema())) {
while (dataIterator.hasNext()) {
GenericRowData rowData = (GenericRowData) dataIterator.next();
genericRowData.add(rowData);
}
}
}
}
assertThat(genericRowData).hasSize(1);
assertThat(genericRowData.get(0).getField(1)).isInstanceOf(byte[].class);
byte[] uuidBytes = (byte[]) genericRowData.get(0).getField(1);
assertThat(uuidBytes).hasSize(16);
ByteBuffer bb = ByteBuffer.wrap(uuidBytes);
UUID actualUuid = new UUID(bb.getLong(), bb.getLong());
assertThat(actualUuid).isEqualTo(expectedUuid);
}
}Cannot write incompatible dataset to table with schema:
table {
1: id: required int
2: data: optional uuid
}
Provided schema:
table {
1: id: required int
2: data: optional fixed[16]
}
Problems:
* data: fixed[16] cannot be promoted to uuid
java.lang.IllegalArgumentException: Cannot write incompatible dataset to table with schema:
table {
1: id: required int
2: data: optional uuid
}
Provided schema:
table {
1: id: required int
2: data: optional fixed[16]
}
Problems:
* data: fixed[16] cannot be promoted to uuid
at org.apache.iceberg.types.TypeUtil.checkSchemaCompatibility(TypeUtil.java:531)
at org.apache.iceberg.types.TypeUtil.validateWriteSchema(TypeUtil.java:480)
at org.apache.iceberg.flink.sink.FlinkSink.toFlinkRowType(FlinkSink.java:755)
at org.apache.iceberg.flink.sink.FlinkSink$Builder.chainIcebergOperators(FlinkSink.java:457)
at org.apache.iceberg.flink.sink.FlinkSink$Builder.append(FlinkSink.java:488)
at org.apache.iceberg.flink.IcebergTableSink.createLegacySink(IcebergTableSink.java:217)
at org.apache.iceberg.flink.IcebergTableSink.lambda$getSinkRuntimeProvider$0(IcebergTableSink.java:164)
at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.applySinkProvider(CommonExecSink.java:409)There was a problem hiding this comment.
Or perhaps we should first clarify what the proper way to handle the UUID type on the Flink write side should be?
There was a problem hiding this comment.
Thank you for your review @Guosmilesmile.
Based on your feedback, I have added TestFlinkUuidType covering:
UUIDround-trip read acrossAvro,Parquet,ORC(Generic writer -> Flink read)UUIDwrite through Flink writer across all three formats (Flink write -> Generic read)- SQL INSERT failing with
fixed[16] cannot be promoted to uuidacross all three formats
The SQL failure documents what you ran into in your comment, it's at Iceberg's schema check before reaching the writers this PR fixes IIUC.
mxm
left a comment
There was a problem hiding this comment.
I think this looks good. +1 on an e2e test to validate the wiring works as intended.
Co-authored-by: Joy Haldar <joy.haldar@target.com>
Thank you for your review @mxm. I have added
The SQL failure documents what @Guosmilesmile ran into in his comment, it's at Iceberg's schema check before reaching the writers this PR fixes IIUC. cc: @pvary |
| @TestTemplate | ||
| public void testUuidWrittenByGenericWriterParquet() throws Exception { | ||
| runReadRoundTripTest(FileFormat.PARQUET); | ||
| } | ||
|
|
||
| @TestTemplate | ||
| public void testUuidWrittenByGenericWriterAvro() throws Exception { | ||
| runReadRoundTripTest(FileFormat.AVRO); | ||
| } | ||
|
|
||
| @TestTemplate | ||
| public void testUuidWrittenByGenericWriterOrc() throws Exception { | ||
| runReadRoundTripTest(FileFormat.ORC); | ||
| } |
There was a problem hiding this comment.
Could we make FileFormat part of the parameters, so that this method only needs to appear once?
| for (DataFile dataFile : writer.dataFiles()) { | ||
| append.appendFile(dataFile); | ||
| } | ||
| append.commit(); |
| @TestTemplate | ||
| public void testWriteUuidViaFlinkWriterParquet() throws Exception { | ||
| runWriteTest(FileFormat.PARQUET); | ||
| } | ||
|
|
||
| @TestTemplate | ||
| public void testWriteUuidViaFlinkWriterAvro() throws Exception { | ||
| runWriteTest(FileFormat.AVRO); | ||
| } | ||
|
|
||
| @TestTemplate | ||
| public void testWriteUuidViaFlinkWriterOrc() throws Exception { | ||
| runWriteTest(FileFormat.ORC); | ||
| } |
| assertThatThrownBy( | ||
| () -> sql("INSERT INTO %s VALUES (1, CAST(X'%s' AS BINARY(16)))", TABLE_NAME, uuidHex)) | ||
| .hasMessageContaining("fixed[16] cannot be promoted to uuid"); |
There was a problem hiding this comment.
Currently, mapping the UUID type in Flink throws an error. However, I'd prefer to make this type actually usable rather than simply catching the error. Otherwise, this type wouldn't be usable for writes in Flink — if it can only be used for reads, its value would be significantly diminished.
@pvary @mxm Flink doesn't have a UUID type. If the underlying table uses UUID and Flink SQL uses BINARY(16) as the corresponding type, would that be appropriate?
There was a problem hiding this comment.
Thank you @Guosmilesmile. IIUC, Flink writes to UUID columns do work via the Java API and the testWriteUuidViaFlinkWriter test in this PR shows this. It's the Flink SQL path that fails since Flink has no UUID type. Please correct me if I am wrong.
Fully agree the SQL path would be much more useful. Let me know how you all want to proceed and I can work on it.
There was a problem hiding this comment.
I think it would be good to make that work. It shouldn't be too hard.
Co-authored-by: Joy Haldar <joy.haldar@target.com>
mxm
left a comment
There was a problem hiding this comment.
@joyhaldar Thanks for the update.
The usual workflow is to merge the changes for the newest Flink version first, then backport to the older versions. It's starting to get hard to review the changes. Could you remove all changes apart from the Flink 2.1 changes?
|
|
||
| @TestTemplate | ||
| public void testWriteUuidViaFlinkWriter() throws Exception { | ||
| runWriteTest(); |
There was a problem hiding this comment.
Can we inline all the separate methods into the test methods? We don't reuse them across the tests.
Co-authored-by: Joy Haldar <joy.haldar@target.com>
Done, thank you for the guidance. Scoped to Flink 2.1 only. Also reverted the change to Sorry about the noise on the earlier scope, I should have started narrower. |
Co-authored-by: Joy Haldar <joy.haldar@target.com>
| TableIdentifier.of(icebergNamespace, TABLE_NAME), | ||
| SCHEMA, | ||
| PartitionSpec.unpartitioned(), | ||
| ImmutableMap.of("format-version", "3", "write.format.default", fileFormat.name())); |
There was a problem hiding this comment.
UUID is not a V3 feature, so we can drop this.
| assertThatThrownBy( | ||
| () -> sql("INSERT INTO %s VALUES (1, CAST(X'%s' AS BINARY(16)))", TABLE_NAME, uuidHex)) | ||
| .hasMessageContaining("fixed[16] cannot be promoted to uuid"); |
There was a problem hiding this comment.
I think it would be good to make that work. It shouldn't be too hard.
| assertThatThrownBy( | ||
| () -> sql("INSERT INTO %s VALUES (1, CAST(X'%s' AS BINARY(16)))", TABLE_NAME, uuidHex)) | ||
| .hasMessageContaining("fixed[16] cannot be promoted to uuid"); |
There was a problem hiding this comment.
It would be good if we could fix this.
There was a problem hiding this comment.
Done.
FlinkSink.toFlinkRowType now calls the two-arg FlinkSchemaUtil.convert(schema, requestedSchema), which runs FlinkFixupTypes to substitute uuid where the table schema declares uuid (IIUC).
I have replaced the negative assertion test with testSqlInsertUuid.
| new Object[] {"testhadoop_basenamespace", Namespace.of("l0", "l1"), FileFormat.PARQUET}, | ||
| new Object[] {"testhadoop_basenamespace", Namespace.of("l0", "l1"), FileFormat.AVRO}, | ||
| new Object[] {"testhadoop_basenamespace", Namespace.of("l0", "l1"), FileFormat.ORC}); |
There was a problem hiding this comment.
Do we need these namespace tests?
There was a problem hiding this comment.
Removed the testhadoop_basenamespace rows.
Co-authored-by: Joy Haldar <joy.haldar@target.com>
Co-authored-by: Joy Haldar <joy.haldar@target.com>
| TableIdentifier.of(icebergNamespace, TABLE_NAME), | ||
| SCHEMA, | ||
| PartitionSpec.unpartitioned(), | ||
| ImmutableMap.of("write.format.default", fileFormat.name())); |
There was a problem hiding this comment.
Use constant DEFAULT_FILE_FORMAT
| TableIdentifier.of(icebergNamespace, TABLE_NAME), | ||
| SCHEMA, | ||
| PartitionSpec.unpartitioned(), | ||
| ImmutableMap.of("write.format.default", fileFormat.name())); |
There was a problem hiding this comment.
Use constant DEFAULT_FILE_FORMAT
| import org.junit.jupiter.api.TestTemplate; | ||
| import org.junit.jupiter.api.io.TempDir; | ||
|
|
||
| public class TestFlinkUuidType extends CatalogTestBase { |
There was a problem hiding this comment.
Do we plan to replace this test with one in the BaseFormatModelTests once all of the Flink versions are updated?
There was a problem hiding this comment.
Thank you for pointing it out. If this PR gets accepted, I'll add the following to my follow-up list:
- Backport to
v2.0andv1.20. - Add
UUIDto DataTestBase.SUPPORTED_PRIMITIVES. - Move this test into
BaseFormatModelTests.
| import org.junit.jupiter.api.TestTemplate; | ||
| import org.junit.jupiter.api.io.TempDir; | ||
|
|
||
| public class TestFlinkUuidType extends CatalogTestBase { |
There was a problem hiding this comment.
package private is enough for the tests
|
|
||
| /** Writes UUID via Generic writer, reads via Flink. */ | ||
| @TestTemplate | ||
| public void testUuidWrittenByGenericWriter() throws Exception { |
There was a problem hiding this comment.
Package private for test methods, and lately we started to avoid adding test to the method name
There was a problem hiding this comment.
Done. Made methods package private and dropped the test prefix.
Co-authored-by: Joy Haldar <joy.haldar@target.com>
| import org.junit.jupiter.api.TestTemplate; | ||
| import org.junit.jupiter.api.io.TempDir; | ||
|
|
||
| class TestFlinkUuidType extends CatalogTestBase { |
There was a problem hiding this comment.
Why is this a separate class?
Will this be part of the BaseFormatModelTests?
There was a problem hiding this comment.
Added it as a standalone class earlier based on @mxm and @Guosmilesmile's suggestion for an e2e test for the UUID changes. I can add it to BaseFormatModelTests as a follow-up, see the list here. Open to your guidance.
|
Could we move this out from the Draft, and make it Ready for review? |
Done. |
|
@mxm, @Guosmilesmile any more questions? |
|
Merged to main. @joyhaldar: Please create the backport PR and tell us if you need to change anything manuallly, or the following command was doing everything without issue: |
Summary
Flink's
RowDatastoresUUIDas bytes, but a few Flink readers/writers weren't aligned with this, causing the crashes in #14330.Fixes
FlinkAvroWriter: was sending UUID to ValueWriters.uuids(), causingClassCastExceptionFlinkParquetWriters: no UUID logicFlinkParquetReaders: no UUID logicAdds
TestFlinkUuidTypecoveringUUIDread/write acrossAvro,Parquet,ORC, and the SQL insert behavior.Addresses #14330