Skip to content

Spark: Add Variant read support for Spark Iceberg tables #13219

Merged
amogh-jahagirdar merged 10 commits into
apache:mainfrom
aihuaxu:aixu-spark-basic-variant
Jul 29, 2025
Merged

Spark: Add Variant read support for Spark Iceberg tables #13219
amogh-jahagirdar merged 10 commits into
apache:mainfrom
aihuaxu:aixu-spark-basic-variant

Conversation

@aihuaxu

@aihuaxu aihuaxu commented Jun 2, 2025

Copy link
Copy Markdown
Contributor

This PR is to add the support for Spark to read Variant data against Iceberg tables. Basically when reading the Variant data (unshredded or shredded), Spark VariantReader reads an Iceberg Variant and converts to Spark VariantVal. The Iceberg VariantReader handles reading shredded/unshredded Iceberg Variant. Currently VariantWriter handles writing unshredded Iceberg Variant only.

@aihuaxu

aihuaxu commented Jun 2, 2025

Copy link
Copy Markdown
Contributor Author

@aokolnychyi, @szehon-ho Can you help to check if it's the right direction? Thanks.

@amogh-jahagirdar amogh-jahagirdar self-requested a review June 2, 2025 19:20

@amogh-jahagirdar amogh-jahagirdar left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was chatting with @danielcweeks and he brought up a good point that one of the implications of only releasing writing without shredding is that in the future when that is added, older readers wouldn't be able to read those datasets.

It may be worth trying to add the support upfront so we avoid situations where it's like "Spark4-iceberg 1.10 has the limitation that it is not able to read shredded datasets" for example. Though, that would be a considerable amount of work we need to think through how to do properly (the whole write path for shredded columns is a bit unclear, probably requiring some buffering/resetting based on the records to even figure out what schema to write with)

Comment thread parquet/src/main/java/org/apache/iceberg/parquet/TripleIterator.java Outdated
@danielcweeks

Copy link
Copy Markdown
Contributor

@aihuaxu I think it's important that we have a path forward for writing shredded columns before we introduce this. If we release a version that doesn't support reading shredded columns, it will be incompatible with future writers that produce shredded data.

I also think we want to produce shredded values initially so that all readers accommodate shredding to being with. I don't think it's safe to do this in isolation.

Comment thread spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkTypeVisitor.java Outdated
@aihuaxu

aihuaxu commented Jun 4, 2025

Copy link
Copy Markdown
Contributor Author

@aihuaxu I think it's important that we have a path forward for writing shredded columns before we introduce this. If we release a version that doesn't support reading shredded columns, it will be incompatible with future writers that produce shredded data.

I also think we want to produce shredded values initially so that all readers accommodate shredding to being with. I don't think it's safe to do this in isolation.

Thanks @danielcweeks and @amogh-jahagirdar for the suggestion. That makes sense. Initially I thought I can break the changes to get feedback earlier. Let me incorporate shredding as well.

@aihuaxu aihuaxu force-pushed the aixu-spark-basic-variant branch from 51ee01d to 9efae80 Compare June 5, 2025 18:23
@rdblue

rdblue commented Jun 26, 2025

Copy link
Copy Markdown
Contributor

@aihuaxu, I caught up with @danielcweeks about this yesterday and I think his concern was that we need to support reading shredded values. It would be nice to be able to write them as well, but I think as long as this can read them (and we have a test to validate it) then we should be able to move forward with this. Thanks for your patience on this while I was out at conferences!

Comment thread core/src/test/java/org/apache/iceberg/RandomVariants.java Outdated
Comment thread spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java Outdated
Comment thread spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java Outdated
Comment thread spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java Outdated
@aihuaxu aihuaxu force-pushed the aixu-spark-basic-variant branch from 9efae80 to 9993c35 Compare July 13, 2025 06:17
@aihuaxu aihuaxu changed the title Spark: Add basic Variant read/write support for Spark Iceberg tables without shredding Spark: Add Variant read support for Spark Iceberg tables Jul 13, 2025
@aihuaxu aihuaxu force-pushed the aixu-spark-basic-variant branch from 9993c35 to e8e6736 Compare July 14, 2025 18:47
@aihuaxu

aihuaxu commented Jul 14, 2025

Copy link
Copy Markdown
Contributor Author

@aihuaxu, I caught up with @danielcweeks about this yesterday and I think his concern was that we need to support reading shredded values. It would be nice to be able to write them as well, but I think as long as this can read them (and we have a test to validate it) then we should be able to move forward with this. Thanks for your patience on this while I was out at conferences!

I have added a test case to read from shredded variant. I didn't add multiple test cases like TestVariantReaders but just added one. We will use ParquetVariantReaders to read and convert to VariantVal so the logic should have been covered by the existing TestVariantReaders.

Shredded writer is not included yet.

Comment thread spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java Outdated
Comment thread spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java Outdated
Comment thread spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java Outdated
Comment thread spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java Outdated
Comment thread spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java Outdated
Comment thread spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java Outdated
Types.VariantType icebergVariantType = Types.VariantType.get();
DataType sparkVariantType = SparkSchemaUtil.convert(icebergVariantType);

assertThat(sparkVariantType).isEqualTo(VariantType$.MODULE$);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be instanceof VariantType right? Instances are equivalent so when we are accepting an object we typically use the instanceof check.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's even further to make sure it's returning the singleton of VariantType. Would this be better?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there can be instances of VariantType other than VariantType$.MODULE$ then I think it is better not to be overly restrictive.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the implementation, it can only be VariantType$.MODULE$ in TypeToSparkType.java. It should be fine to check against the instance. That means it will not be other instance. Let me know if I misunderstand here.

  @Override
  public DataType variant(Types.VariantType variant) {
    return VariantType$.MODULE$;
  }

@aihuaxu aihuaxu force-pushed the aixu-spark-basic-variant branch from 551e941 to 58dfa24 Compare July 26, 2025 06:14
@aihuaxu aihuaxu requested a review from rdblue July 26, 2025 06:16
@rdblue

rdblue commented Jul 28, 2025

Copy link
Copy Markdown
Contributor

@aihuaxu, it looks like the test failures are only checkstyle. Can you update this to fix them?

@rdblue

rdblue commented Jul 28, 2025

Copy link
Copy Markdown
Contributor

This is missing an annotation and tests are failing, but this should be ready when those are fixed. Thanks, @aihuaxu!

@aihuaxu

aihuaxu commented Jul 28, 2025

Copy link
Copy Markdown
Contributor Author

@aihuaxu, it looks like the test failures are only checkstyle. Can you update this to fix them?

Sorry about that. Let me fix that.

@amogh-jahagirdar amogh-jahagirdar left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stepped through this locally, all the changes look good to me @aihuaxu !

@amogh-jahagirdar amogh-jahagirdar merged commit 5710569 into apache:main Jul 29, 2025
42 checks passed
@amogh-jahagirdar

Copy link
Copy Markdown
Contributor

Thanks @aihuaxu @rdblue !

// verify that the dataframe matches
assertThat(rows).hasSameSizeAs(records);
Iterator<GenericData.Record> recordIter = records.iterator();
assertThat(rows.size()).isEqualTo(records.size());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is generally better to use assertThat(rows).hasSameSizeAs(records); as that will show you the content of rows/records when the assertion ever fails. @aihuaxu was there a particular reason why this check was changed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nastra I checked the usage and assertThat(rows).hasSameSizeAs(records); does show the content when the number of records doesn't match compare to isEqualTo(), helping for debug. Let me change that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants