Skip to content

Flink 2.1: Support UUID type in Avro and Parquet readers and writers#16097

Merged
pvary merged 9 commits into
apache:mainfrom
joyhaldar:flink-uuid-support
May 14, 2026
Merged

Flink 2.1: Support UUID type in Avro and Parquet readers and writers#16097
pvary merged 9 commits into
apache:mainfrom
joyhaldar:flink-uuid-support

Conversation

@joyhaldar

@joyhaldar joyhaldar commented Apr 24, 2026

Copy link
Copy Markdown
Contributor

Summary

Flink's RowData stores UUID as bytes, but a few Flink readers/writers weren't aligned with this, causing the crashes in #14330.

Fixes

  • FlinkAvroWriter: was sending UUID to ValueWriters.uuids(), causing ClassCastException
  • FlinkParquetWriters: no UUID logic
  • FlinkParquetReaders: no UUID logic

Adds TestFlinkUuidType covering UUID read/write across Avro, Parquet, ORC, and the SQL insert behavior.

Addresses #14330

…iters

Co-authored-by: Joy Haldar <joy.haldar@target.com>
Co-authored-by: Joy Haldar <joy.haldar@target.com>
@joyhaldar joyhaldar changed the title Core, Flink 2.1: Support UUID type in Avro and Parquet readers and writers Core, Flink: Support UUID type in Avro and Parquet readers and writers Apr 24, 2026
@pvary

pvary commented Apr 24, 2026

Copy link
Copy Markdown
Contributor

Could we please check that the UUID written out by Flink is the same than the UUID written out by the Generic/Spark writer?

IIRC there were some issues that the Parquet UUID reader and the Avro UUID reader were different in some cases, and I'm not sure that this was fixed

@pvary

pvary commented Apr 24, 2026

Copy link
Copy Markdown
Contributor

CC: @mxm, @Guosmilesmile

@Guosmilesmile Guosmilesmile left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! Could we add an e2e UT to prove that this type is usable? Since there is no UUID type in Flink, I looked at the type conversion and it treats UUID as BinaryType(16). From what it looks like, Flink should be able to read it, but writing would probably fail . May be we should do more than this.

case UUID:
// UUID length is 16
return new BinaryType(16);

required(116, "dec_38_10", Types.DecimalType.of(38, 10)), // maximum precision
required(117, "time", Types.TimeType.get()));
required(117, "time", Types.TimeType.get()),
required(118, "uuid", Types.UUIDType.get()));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E2E UT may be something like this. This is just for my own testing purposes, and I hope it can provide some inspiration.

class TestFlinkUuidType extends CatalogTestBase {

  private static final String TABLE_NAME = "test_table";
  private Table icebergTable;
  @TempDir private Path warehouseDir;

  @Parameters(name = "catalogName={0}, baseNamespace={1}")
  protected static List<Object[]> parameters() {
    return Arrays.asList(
        // For now hive metadata is not supported variant, so we only test hadoop catalog
        new Object[] {"testhadoop", Namespace.empty()},
        new Object[] {"testhadoop_basenamespace", Namespace.of("l0", "l1")});
  }

  @Override
  @BeforeEach
  public void before() {
    super.before();
    sql("CREATE DATABASE %s", flinkDatabase);
    sql("USE CATALOG %s", catalogName);
    sql("USE %s", DATABASE);

    Schema schema =
            new Schema(
                    Types.NestedField.required(1, "id", Types.IntegerType.get()),
                    Types.NestedField.optional(2, "data", Types.UUIDType.get()));
    icebergTable =
            validationCatalog.createTable(
                    TableIdentifier.of(icebergNamespace, TABLE_NAME),
                    schema,
                    PartitionSpec.unpartitioned(),
                    ImmutableMap.of(
                            "format-version",
                            "3",
                            "write.format.default",
                            FileFormat.AVRO.name()));
  }

  @TestTemplate
  public void testInsertUuidFromFlink() throws Exception {
    UUID expectedUuid = UUID.fromString("123e4567-e89b-12d3-a456-426614174000");
    sql(
            "INSERT INTO %s VALUES (1, CAST(X'%s' AS BINARY(16)))",
            TABLE_NAME,
            "123e4567e89b12d3a456426614174000");

    List<Record> records = SimpleDataUtil.tableRecords(icebergTable);
    assertThat(records).hasSize(1);
    assertThat(records.get(0).getField("data")).isInstanceOf(UUID.class).isEqualTo(expectedUuid);
  }

  @TestTemplate
  public void testReadUuidFromFlink() throws Exception {
    UUID expectedUuid = UUID.fromString("0f8fad5b-d9cb-469f-a165-70867728950e");
    ImmutableList.Builder<Record> builder = ImmutableList.builder();
    builder.add(GenericRecord.create(icebergTable.schema()).copy("id", 1, "data", expectedUuid));
    new GenericAppenderHelper(icebergTable, FileFormat.PARQUET, warehouseDir)
            .appendToTable(builder.build());
    icebergTable.refresh();

    List<GenericRowData> genericRowData = Lists.newArrayList();
    try (CloseableIterable<CombinedScanTask> combinedScanTasks =
                 icebergTable.newScan().planTasks()) {
      for (CombinedScanTask combinedScanTask : combinedScanTasks) {
        try (DataIterator<RowData> dataIterator =
                     ReaderUtil.createDataIterator(
                             combinedScanTask, icebergTable.schema(), icebergTable.schema())) {
          while (dataIterator.hasNext()) {
            GenericRowData rowData = (GenericRowData) dataIterator.next();
            genericRowData.add(rowData);
          }
        }
      }
    }

    assertThat(genericRowData).hasSize(1);
    assertThat(genericRowData.get(0).getField(1)).isInstanceOf(byte[].class);
    byte[] uuidBytes = (byte[]) genericRowData.get(0).getField(1);
    assertThat(uuidBytes).hasSize(16);

    ByteBuffer bb = ByteBuffer.wrap(uuidBytes);
    UUID actualUuid = new UUID(bb.getLong(), bb.getLong());
    assertThat(actualUuid).isEqualTo(expectedUuid);
  }
}
Cannot write incompatible dataset to table with schema:
table {
  1: id: required int
  2: data: optional uuid
}
Provided schema:
table {
  1: id: required int
  2: data: optional fixed[16]
}
Problems:
* data: fixed[16] cannot be promoted to uuid
java.lang.IllegalArgumentException: Cannot write incompatible dataset to table with schema:
table {
  1: id: required int
  2: data: optional uuid
}
Provided schema:
table {
  1: id: required int
  2: data: optional fixed[16]
}
Problems:
* data: fixed[16] cannot be promoted to uuid
	at org.apache.iceberg.types.TypeUtil.checkSchemaCompatibility(TypeUtil.java:531)
	at org.apache.iceberg.types.TypeUtil.validateWriteSchema(TypeUtil.java:480)
	at org.apache.iceberg.flink.sink.FlinkSink.toFlinkRowType(FlinkSink.java:755)
	at org.apache.iceberg.flink.sink.FlinkSink$Builder.chainIcebergOperators(FlinkSink.java:457)
	at org.apache.iceberg.flink.sink.FlinkSink$Builder.append(FlinkSink.java:488)
	at org.apache.iceberg.flink.IcebergTableSink.createLegacySink(IcebergTableSink.java:217)
	at org.apache.iceberg.flink.IcebergTableSink.lambda$getSinkRuntimeProvider$0(IcebergTableSink.java:164)
	at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.applySinkProvider(CommonExecSink.java:409)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or perhaps we should first clarify what the proper way to handle the UUID type on the Flink write side should be?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your review @Guosmilesmile.

Based on your feedback, I have added TestFlinkUuidType covering:

  • UUID round-trip read across Avro, Parquet, ORC (Generic writer -> Flink read)
  • UUID write through Flink writer across all three formats (Flink write -> Generic read)
  • SQL INSERT failing with fixed[16] cannot be promoted to uuid across all three formats

The SQL failure documents what you ran into in your comment, it's at Iceberg's schema check before reaching the writers this PR fixes IIUC.

@mxm mxm left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this looks good. +1 on an e2e test to validate the wiring works as intended.

Co-authored-by: Joy Haldar <joy.haldar@target.com>
@joyhaldar

Copy link
Copy Markdown
Contributor Author

I think this looks good. +1 on an e2e test to validate the wiring works as intended.

Thank you for your review @mxm.

I have added TestFlinkUuidType covering:

  • UUID round-trip read across Avro, Parquet, ORC (Generic writer -> Flink read)
  • UUID write through Flink writer across all three formats (Flink write -> Generic read)
  • SQL INSERT failing with fixed[16] cannot be promoted to uuid across all three formats

The SQL failure documents what @Guosmilesmile ran into in his comment, it's at Iceberg's schema check before reaching the writers this PR fixes IIUC.

cc: @pvary

Comment on lines +127 to +140
@TestTemplate
public void testUuidWrittenByGenericWriterParquet() throws Exception {
runReadRoundTripTest(FileFormat.PARQUET);
}

@TestTemplate
public void testUuidWrittenByGenericWriterAvro() throws Exception {
runReadRoundTripTest(FileFormat.AVRO);
}

@TestTemplate
public void testUuidWrittenByGenericWriterOrc() throws Exception {
runReadRoundTripTest(FileFormat.ORC);
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make FileFormat part of the parameters, so that this method only needs to appear once?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

for (DataFile dataFile : writer.dataFiles()) {
append.appendFile(dataFile);
}
append.commit();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newline

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines +182 to +195
@TestTemplate
public void testWriteUuidViaFlinkWriterParquet() throws Exception {
runWriteTest(FileFormat.PARQUET);
}

@TestTemplate
public void testWriteUuidViaFlinkWriterAvro() throws Exception {
runWriteTest(FileFormat.AVRO);
}

@TestTemplate
public void testWriteUuidViaFlinkWriterOrc() throws Exception {
runWriteTest(FileFormat.ORC);
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same above.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines +211 to +213
assertThatThrownBy(
() -> sql("INSERT INTO %s VALUES (1, CAST(X'%s' AS BINARY(16)))", TABLE_NAME, uuidHex))
.hasMessageContaining("fixed[16] cannot be promoted to uuid");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, mapping the UUID type in Flink throws an error. However, I'd prefer to make this type actually usable rather than simply catching the error. Otherwise, this type wouldn't be usable for writes in Flink — if it can only be used for reads, its value would be significantly diminished.

@pvary @mxm Flink doesn't have a UUID type. If the underlying table uses UUID and Flink SQL uses BINARY(16) as the corresponding type, would that be appropriate?

@joyhaldar joyhaldar Apr 26, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @Guosmilesmile. IIUC, Flink writes to UUID columns do work via the Java API and the testWriteUuidViaFlinkWriter test in this PR shows this. It's the Flink SQL path that fails since Flink has no UUID type. Please correct me if I am wrong.

Fully agree the SQL path would be much more useful. Let me know how you all want to proceed and I can work on it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be good to make that work. It shouldn't be too hard.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replied here.

Co-authored-by: Joy Haldar <joy.haldar@target.com>

@mxm mxm left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@joyhaldar Thanks for the update.

The usual workflow is to merge the changes for the newest Flink version first, then backport to the older versions. It's starting to get hard to review the changes. Could you remove all changes apart from the Flink 2.1 changes?


@TestTemplate
public void testWriteUuidViaFlinkWriter() throws Exception {
runWriteTest();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we inline all the separate methods into the test methods? We don't reuse them across the tests.

@joyhaldar joyhaldar Apr 27, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Co-authored-by: Joy Haldar <joy.haldar@target.com>
@joyhaldar joyhaldar changed the title Core, Flink: Support UUID type in Avro and Parquet readers and writers Flink 2.1: Support UUID type in Avro and Parquet readers and writers Apr 27, 2026
@joyhaldar

Copy link
Copy Markdown
Contributor Author

@joyhaldar Thanks for the update.

The usual workflow is to merge the changes for the newest Flink version first, then backport to the older versions. It's starting to get hard to review the changes. Could you remove all changes apart from the Flink 2.1 changes?

Done, thank you for the guidance. Scoped to Flink 2.1 only.

Also reverted the change to DataTestBase.SUPPORTED_PRIMITIVES for now, since adding UUID there would regress v2.0 and v1.20 until they receive the same fixes. If this PR is accepted, I will follow up with backport PRs for v2.0 and v1.20, and then a separate PR to add UUID to SUPPORTED_PRIMITIVES.

Sorry about the noise on the earlier scope, I should have started narrower.

Co-authored-by: Joy Haldar <joy.haldar@target.com>
TableIdentifier.of(icebergNamespace, TABLE_NAME),
SCHEMA,
PartitionSpec.unpartitioned(),
ImmutableMap.of("format-version", "3", "write.format.default", fileFormat.name()));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UUID is not a V3 feature, so we can drop this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Comment on lines +211 to +213
assertThatThrownBy(
() -> sql("INSERT INTO %s VALUES (1, CAST(X'%s' AS BINARY(16)))", TABLE_NAME, uuidHex))
.hasMessageContaining("fixed[16] cannot be promoted to uuid");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be good to make that work. It shouldn't be too hard.

Comment on lines +193 to +195
assertThatThrownBy(
() -> sql("INSERT INTO %s VALUES (1, CAST(X'%s' AS BINARY(16)))", TABLE_NAME, uuidHex))
.hasMessageContaining("fixed[16] cannot be promoted to uuid");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good if we could fix this.

@joyhaldar joyhaldar May 13, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

FlinkSink.toFlinkRowType now calls the two-arg FlinkSchemaUtil.convert(schema, requestedSchema), which runs FlinkFixupTypes to substitute uuid where the table schema declares uuid (IIUC).

I have replaced the negative assertion test with testSqlInsertUuid.

Comment on lines +81 to +83
new Object[] {"testhadoop_basenamespace", Namespace.of("l0", "l1"), FileFormat.PARQUET},
new Object[] {"testhadoop_basenamespace", Namespace.of("l0", "l1"), FileFormat.AVRO},
new Object[] {"testhadoop_basenamespace", Namespace.of("l0", "l1"), FileFormat.ORC});

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need these namespace tests?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the testhadoop_basenamespace rows.

joyhaldar and others added 2 commits May 13, 2026 09:32
Co-authored-by: Joy Haldar <joy.haldar@target.com>
Co-authored-by: Joy Haldar <joy.haldar@target.com>
TableIdentifier.of(icebergNamespace, TABLE_NAME),
SCHEMA,
PartitionSpec.unpartitioned(),
ImmutableMap.of("write.format.default", fileFormat.name()));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use constant DEFAULT_FILE_FORMAT

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

TableIdentifier.of(icebergNamespace, TABLE_NAME),
SCHEMA,
PartitionSpec.unpartitioned(),
ImmutableMap.of("write.format.default", fileFormat.name()));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use constant DEFAULT_FILE_FORMAT

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.io.TempDir;

public class TestFlinkUuidType extends CatalogTestBase {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we plan to replace this test with one in the BaseFormatModelTests once all of the Flink versions are updated?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for pointing it out. If this PR gets accepted, I'll add the following to my follow-up list:

  1. Backport to v2.0 and v1.20.
  2. Add UUID to DataTestBase.SUPPORTED_PRIMITIVES.
  3. Move this test into BaseFormatModelTests.

import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.io.TempDir;

public class TestFlinkUuidType extends CatalogTestBase {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

package private is enough for the tests

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


/** Writes UUID via Generic writer, reads via Flink. */
@TestTemplate
public void testUuidWrittenByGenericWriter() throws Exception {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Package private for test methods, and lately we started to avoid adding test to the method name

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Made methods package private and dropped the test prefix.

Co-authored-by: Joy Haldar <joy.haldar@target.com>
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.io.TempDir;

class TestFlinkUuidType extends CatalogTestBase {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a separate class?
Will this be part of the BaseFormatModelTests?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added it as a standalone class earlier based on @mxm and @Guosmilesmile's suggestion for an e2e test for the UUID changes. I can add it to BaseFormatModelTests as a follow-up, see the list here. Open to your guidance.

@pvary

pvary commented May 13, 2026

Copy link
Copy Markdown
Contributor

Could we move this out from the Draft, and make it Ready for review?

@joyhaldar joyhaldar marked this pull request as ready for review May 13, 2026 15:51
@joyhaldar

Copy link
Copy Markdown
Contributor Author

Could we move this out from the Draft, and make it Ready for review?

Done.

@pvary

pvary commented May 13, 2026

Copy link
Copy Markdown
Contributor

@mxm, @Guosmilesmile any more questions?

@Guosmilesmile Guosmilesmile left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM ! Thanks .

@mxm mxm left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @joyhaldar! LGTM.

@pvary pvary merged commit 1919ae6 into apache:main May 14, 2026
18 checks passed
@pvary

pvary commented May 14, 2026

Copy link
Copy Markdown
Contributor

Merged to main.
Thanks @joyhaldar for the PR and @Guosmilesmile, @mxm for the reviews!

@joyhaldar: Please create the backport PR and tell us if you need to change anything manuallly, or the following command was doing everything without issue:

git diff 1919ae6^ 1919ae6 flink/v2.1 | sed "s/v2.1/v2.0/g">/tmp/patch;g apply -3 -p1 /tmp/patch
git diff 1919ae6^ 1919ae6 flink/v2.1 | sed "s/v2.1/v1.20/g">/tmp/patch;g apply -3 -p1 /tmp/patch

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants