Skip to content

Flink: Support Variant to Flink 2.1#15265

Merged
pvary merged 5 commits into
apache:mainfrom
Guosmilesmile:variant
Feb 26, 2026
Merged

Flink: Support Variant to Flink 2.1#15265
pvary merged 5 commits into
apache:mainfrom
Guosmilesmile:variant

Conversation

@Guosmilesmile

@Guosmilesmile Guosmilesmile commented Feb 8, 2026

Copy link
Copy Markdown
Contributor

Starting from Flink 2.1, variant is supported. This PR adds variant support based on that.

Overall, the logic in this PR relies on the existing Parquet variant reader/writer to handle most of the work.

For test cases, this PR adds validations for type conversions and an end-to-end verification from the Flink SQL side.

This PR is developed based on #14259.

Here’s a summary of the changes:

  1. Type Mapping Additions:

    • Added support for converting between Flink’s VariantType and Iceberg’s VariantType in both directions (FlinkTypeToType and TypeToFlinkType).
  2. Parquet Read/Write Integration:

    • Implemented Parquet readers and writers for the Variant type. This enables seamless reading and writing of Variant data as BinaryVariant objects for Flink and as Variant objects for Iceberg internal representation.
  3. Schema Visitor Enhancements:

    • Updated Parquet schema visitors (ParquetWithFlinkSchemaVisitor) to recognize and properly handle Variant logical types and annotations.
  4. Enhanced Testing:

    • Introduced new and improved tests for Variant support in Flink, including:
      • End-to-end tests for reading and writing Variant columns via Flink SQL (TestFlinkVariantType).
      • Comprehensive Parquet round-trip and type-conversion tests (TestFlinkVariants), parameterized for multiple Variant subtypes.
      • Created a shared VariantTestHelper for reusable Variant test cases across both Flink and Spark integration tests.
    • Refactored Spark tests to use the new shared Variant test data.

Co-authored-by: talatuyarer talat@apache.org

@Guosmilesmile Guosmilesmile marked this pull request as draft February 8, 2026 10:25
@Guosmilesmile Guosmilesmile marked this pull request as ready for review February 8, 2026 13:36
@pvary

pvary commented Feb 8, 2026

Copy link
Copy Markdown
Contributor

CC: @mxm, @gyfora

@mxm mxm left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @Guosmilesmile! I'll take a look. Just checking, did we talk with @talatuyarer that we can continue his work?

@Guosmilesmile

Copy link
Copy Markdown
Contributor Author

I noticed it has been closed for a while. So I opened a new PR to continue this work. I will ask him whether it’s appropriate for me to keep pushing this forward.

@talatuyarer

Copy link
Copy Markdown
Contributor

It just closed due to inactivity I was waiting someone review like @mxm :) You can use my code build on top of it. Having variant type support is important for me. I am happy to give a review also whatever works for you @Guosmilesmile

@Guosmilesmile

Copy link
Copy Markdown
Contributor Author

@talatuyarer Thank you for your reply. I noticed that the issues raised by Steven and Aihuaxu's reviews in the previous PR were not resolved, and it was closed due to prolonged inactivity, so I created a new PR to continue this feature. This PR fixes those issues and some bug,also adds some end-to-end test cases.

Which PR to continue this feature in is up to you.

@mxm

mxm commented Feb 12, 2026

Copy link
Copy Markdown
Contributor

Thank you two for your replies! Looks like @talatuyarer is fine with continuing here. I'll continue reviewing.

@mxm mxm left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks mostly good to me @Guosmilesmile / @talatuyarer! Nice work.

Comment on lines +62 to +67
public static final VariantPrimitive<?>[] UNSUPPORTED_PRIMITIVES =
new VariantPrimitive[] {
Variants.ofIsoTime("12:33:54.123456"),
Variants.ofIsoTimestamptzNanos("2024-11-07T12:33:54.123456789+00:00"),
Variants.ofIsoTimestampntzNanos("2024-11-07T12:33:54.123456789"),
};

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which sense are those unsupported? Only in Spark?

@Guosmilesmile Guosmilesmile Feb 12, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Flink is also unsupported.

https://github.com/apache/flink/blob/1779d20a28f5d43410138314fe3292d0e64f3148/flink-core/src/main/java/org/apache/flink/types/variant/BinaryVariantUtil.java#L315-L376

The assertions in this test case weren’t written well, so I adjusted them now.

icebergVariant.metadata().writeTo(metadataBuffer, 0);

byte[] valueBytes = new byte[icebergVariant.value().sizeInBytes()];
ByteBuffer valueBuffer = ByteBuffer.wrap(valueBytes).order(java.nio.ByteOrder.LITTLE_ENDIAN);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch on the encoding.

@manuzhang

Copy link
Copy Markdown
Member

@Guosmilesmile Please add @talatuyarer as a co-author if you are using his code.

@Guosmilesmile

Copy link
Copy Markdown
Contributor Author

@manuzhang Thank you very much for your reminder. Let me check how to add co-authors.

Comment thread flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/RowDataConverter.java Outdated
@pvary

pvary commented Feb 25, 2026

Copy link
Copy Markdown
Contributor

What are our plans for Shredded variants? There are cases when some part of the Variant is stored in its own column. See: #13219

Guosmilesmile and others added 3 commits February 25, 2026 22:38
Comment thread flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java Outdated
@Guosmilesmile

Guosmilesmile commented Feb 25, 2026

Copy link
Copy Markdown
Contributor Author

What are our plans for Shredded variants? There are cases when some part of the Variant is stored in its own column. See: #13219

In this PR, for the read path, the shredding-related work is implemented in the Iceberg Parquet layer. This logic is aligned with the Spark side, so it should be supported.

For the write path, we haven’t implemented shredding yet. On the Flink side, Variant type support is currently limited to BinaryVariant, so what Flink writes is only unshredded data. If we want to support shredding, we may be need to add a new config option, such as shreddingSchema, to predefine which fields should be shredded, and possibly introduce something like “auto” as well. We’ll need to align this with Spark’s logic later.

This is just my own preliminary understanding.

@pvary pvary merged commit 4543e03 into apache:main Feb 26, 2026
38 of 67 checks passed
@pvary

pvary commented Feb 26, 2026

Copy link
Copy Markdown
Contributor

Merged to main.
Thanks @talatuyarer and @Guosmilesmile for the PR and @mxm for the review!

@Guosmilesmile Guosmilesmile deleted the variant branch February 26, 2026 13:50
RjLi13 pushed a commit to RjLi13/iceberg that referenced this pull request Mar 12, 2026
Co-authored-by: Talat UYARER <talat@apache.org>
talatuyarer added a commit to talatuyarer/iceberg that referenced this pull request Apr 1, 2026
Co-authored-by: Talat UYARER <talat@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants