Flink: Support Variant to Flink 2.1#15265
Conversation
f2a9efa to
b6fd571
Compare
mxm
left a comment
There was a problem hiding this comment.
Thanks for the PR @Guosmilesmile! I'll take a look. Just checking, did we talk with @talatuyarer that we can continue his work?
|
I noticed it has been closed for a while. So I opened a new PR to continue this work. I will ask him whether it’s appropriate for me to keep pushing this forward. |
|
It just closed due to inactivity I was waiting someone review like @mxm :) You can use my code build on top of it. Having variant type support is important for me. I am happy to give a review also whatever works for you @Guosmilesmile |
|
@talatuyarer Thank you for your reply. I noticed that the issues raised by Steven and Aihuaxu's reviews in the previous PR were not resolved, and it was closed due to prolonged inactivity, so I created a new PR to continue this feature. This PR fixes those issues and some bug,also adds some end-to-end test cases. Which PR to continue this feature in is up to you. |
|
Thank you two for your replies! Looks like @talatuyarer is fine with continuing here. I'll continue reviewing. |
mxm
left a comment
There was a problem hiding this comment.
Looks mostly good to me @Guosmilesmile / @talatuyarer! Nice work.
| public static final VariantPrimitive<?>[] UNSUPPORTED_PRIMITIVES = | ||
| new VariantPrimitive[] { | ||
| Variants.ofIsoTime("12:33:54.123456"), | ||
| Variants.ofIsoTimestamptzNanos("2024-11-07T12:33:54.123456789+00:00"), | ||
| Variants.ofIsoTimestampntzNanos("2024-11-07T12:33:54.123456789"), | ||
| }; |
There was a problem hiding this comment.
In which sense are those unsupported? Only in Spark?
There was a problem hiding this comment.
In Flink is also unsupported.
The assertions in this test case weren’t written well, so I adjusted them now.
| icebergVariant.metadata().writeTo(metadataBuffer, 0); | ||
|
|
||
| byte[] valueBytes = new byte[icebergVariant.value().sizeInBytes()]; | ||
| ByteBuffer valueBuffer = ByteBuffer.wrap(valueBytes).order(java.nio.ByteOrder.LITTLE_ENDIAN); |
|
@Guosmilesmile Please add @talatuyarer as a co-author if you are using his code. |
|
@manuzhang Thank you very much for your reminder. Let me check how to add co-authors. |
a2bb54c to
92a4043
Compare
|
What are our plans for Shredded variants? There are cases when some part of the Variant is stored in its own column. See: #13219 |
Co-authored-by: Talat UYARER <talat@apache.org>
92a4043 to
fcc115a
Compare
In this PR, for the read path, the shredding-related work is implemented in the Iceberg Parquet layer. This logic is aligned with the Spark side, so it should be supported. For the write path, we haven’t implemented shredding yet. On the Flink side, Variant type support is currently limited to This is just my own preliminary understanding. |
|
Merged to main. |
Co-authored-by: Talat UYARER <talat@apache.org>
Co-authored-by: Talat UYARER <talat@apache.org>
Starting from Flink 2.1, variant is supported. This PR adds variant support based on that.
Overall, the logic in this PR relies on the existing Parquet variant reader/writer to handle most of the work.
For test cases, this PR adds validations for type conversions and an end-to-end verification from the Flink SQL side.
This PR is developed based on #14259.
Here’s a summary of the changes:
Type Mapping Additions:
VariantTypeand Iceberg’sVariantTypein both directions (FlinkTypeToTypeandTypeToFlinkType).Parquet Read/Write Integration:
BinaryVariantobjects for Flink and as Variant objects for Iceberg internal representation.Schema Visitor Enhancements:
ParquetWithFlinkSchemaVisitor) to recognize and properly handle Variant logical types and annotations.Enhanced Testing:
TestFlinkVariantType).TestFlinkVariants), parameterized for multiple Variant subtypes.VariantTestHelperfor reusable Variant test cases across both Flink and Spark integration tests.Co-authored-by: talatuyarer talat@apache.org