Spark: Add Variant read support for Spark Iceberg tables #13219
Conversation
aa48ff0 to
51ee01d
Compare
|
@aokolnychyi, @szehon-ho Can you help to check if it's the right direction? Thanks. |
amogh-jahagirdar
left a comment
There was a problem hiding this comment.
I was chatting with @danielcweeks and he brought up a good point that one of the implications of only releasing writing without shredding is that in the future when that is added, older readers wouldn't be able to read those datasets.
It may be worth trying to add the support upfront so we avoid situations where it's like "Spark4-iceberg 1.10 has the limitation that it is not able to read shredded datasets" for example. Though, that would be a considerable amount of work we need to think through how to do properly (the whole write path for shredded columns is a bit unclear, probably requiring some buffering/resetting based on the records to even figure out what schema to write with)
|
@aihuaxu I think it's important that we have a path forward for writing shredded columns before we introduce this. If we release a version that doesn't support reading shredded columns, it will be incompatible with future writers that produce shredded data. I also think we want to produce shredded values initially so that all readers accommodate shredding to being with. I don't think it's safe to do this in isolation. |
Thanks @danielcweeks and @amogh-jahagirdar for the suggestion. That makes sense. Initially I thought I can break the changes to get feedback earlier. Let me incorporate shredding as well. |
51ee01d to
9efae80
Compare
|
@aihuaxu, I caught up with @danielcweeks about this yesterday and I think his concern was that we need to support reading shredded values. It would be nice to be able to write them as well, but I think as long as this can read them (and we have a test to validate it) then we should be able to move forward with this. Thanks for your patience on this while I was out at conferences! |
9efae80 to
9993c35
Compare
9993c35 to
e8e6736
Compare
I have added a test case to read from shredded variant. I didn't add multiple test cases like TestVariantReaders but just added one. We will use ParquetVariantReaders to read and convert to VariantVal so the logic should have been covered by the existing TestVariantReaders. Shredded writer is not included yet. |
| Types.VariantType icebergVariantType = Types.VariantType.get(); | ||
| DataType sparkVariantType = SparkSchemaUtil.convert(icebergVariantType); | ||
|
|
||
| assertThat(sparkVariantType).isEqualTo(VariantType$.MODULE$); |
There was a problem hiding this comment.
I think this should be instanceof VariantType right? Instances are equivalent so when we are accepting an object we typically use the instanceof check.
There was a problem hiding this comment.
I think it's even further to make sure it's returning the singleton of VariantType. Would this be better?
There was a problem hiding this comment.
If there can be instances of VariantType other than VariantType$.MODULE$ then I think it is better not to be overly restrictive.
There was a problem hiding this comment.
From the implementation, it can only be VariantType$.MODULE$ in TypeToSparkType.java. It should be fine to check against the instance. That means it will not be other instance. Let me know if I misunderstand here.
@Override
public DataType variant(Types.VariantType variant) {
return VariantType$.MODULE$;
}
551e941 to
58dfa24
Compare
|
@aihuaxu, it looks like the test failures are only checkstyle. Can you update this to fix them? |
|
This is missing an annotation and tests are failing, but this should be ready when those are fixed. Thanks, @aihuaxu! |
Sorry about that. Let me fix that. |
amogh-jahagirdar
left a comment
There was a problem hiding this comment.
Stepped through this locally, all the changes look good to me @aihuaxu !
| // verify that the dataframe matches | ||
| assertThat(rows).hasSameSizeAs(records); | ||
| Iterator<GenericData.Record> recordIter = records.iterator(); | ||
| assertThat(rows.size()).isEqualTo(records.size()); |
There was a problem hiding this comment.
it is generally better to use assertThat(rows).hasSameSizeAs(records); as that will show you the content of rows/records when the assertion ever fails. @aihuaxu was there a particular reason why this check was changed?
There was a problem hiding this comment.
@nastra I checked the usage and assertThat(rows).hasSameSizeAs(records); does show the content when the number of records doesn't match compare to isEqualTo(), helping for debug. Let me change that.
This PR is to add the support for Spark to read Variant data against Iceberg tables. Basically when reading the Variant data (unshredded or shredded), Spark VariantReader reads an Iceberg
Variantand converts to SparkVariantVal. The Iceberg VariantReader handles reading shredded/unshredded IcebergVariant. Currently VariantWriter handles writing unshredded IcebergVariantonly.