Skip to content

Core, Spark: Fix equality deletes non-deterministic schema ordering (#13873)#15514

Merged
amogh-jahagirdar merged 4 commits into
apache:mainfrom
RussellSpitzer:FixEqualityCacheIssue
Mar 12, 2026
Merged

Core, Spark: Fix equality deletes non-deterministic schema ordering (#13873)#15514
amogh-jahagirdar merged 4 commits into
apache:mainfrom
RussellSpitzer:FixEqualityCacheIssue

Conversation

@RussellSpitzer

@RussellSpitzer RussellSpitzer commented Mar 4, 2026

Copy link
Copy Markdown
Member

Equality delete schemas constructed in DeleteFilter.applyEqDeletes relied on the field order of requiredSchema, which varies depending on the query's projection. When the SparkExecutorCache returned delete records read with one field ordering to a reader expecting another, StructProjection silently misinterpreted the positional data, causing deletes to be skipped.

We fix this by Canonicalize the deleteSchema by sorting fields by field ID. Now every reader produces the same schema for deletes regardless of projection, ensuring cache hits return correctly ordered records.

Coded with the help of Cursor and claude-4.6.opus-high

--
TLDR;

First call read with schema [a,b] for deletes:
read+project(file, [a,b]) → cache_projected_deletes(file) → applyDeletes with [a,b]

Second call reads with Schema [b, a] for deletes:
load_cache_projected_deletes(file) → applyDeletes with [b,a] → mismatch

Fixes #13873

…hema ordering (apache#13873)

Equality delete schemas constructed in DeleteFilter.applyEqDeletes relied on
the field order of requiredSchema, which varies depending on the query's
projection. When the SparkExecutorCache returned delete records read with one
field ordering to a reader expecting another, StructProjection silently
misinterpreted the positional data, causing deletes to be skipped.

We fix this by Canonicalize the deleteSchema by sorting fields by field ID. Now
every reader produces the same schema for deletes regardless of projection,
ensuring cache hits return correctly ordered records.

Coded with the help of Cursor and claude-4.6.opus-high
Comment thread data/src/main/java/org/apache/iceberg/data/DeleteFilter.java

@mxm mxm left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Looks like the cache key is the delete file itself, which makes sense, as long as we use a deterministic approach for the schema projection.

eqTestTable.newRowDelta().addDeletes(eqFile).commit();

String tableRef = TableIdentifier.of("default", EQ_CACHE_TABLE).toString();
int expectedRows = 7;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe a little bit easier to understand?

Suggested change
int expectedRows = 7;
int expectedRows = data.size() - delete.size();

Types.NestedField.optional(1, "id", Types.IntegerType.get()),
Types.NestedField.optional(2, "a", Types.IntegerType.get()),
Types.NestedField.optional(3, "b", Types.IntegerType.get()));
PartitionSpec spec = PartitionSpec.builderFor(eqDeleteTestSchema).bucket("id", 1).build();

@pvary pvary Mar 6, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we not use an unpartitioned table here?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason, I originally had this in a different file where all the tables were partitioned and I didn't want to feel left out. Now that it's here we can remove it

@pvary pvary left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Marked a few places where the code could made cleaner

@stevenzwu stevenzwu left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. thanks for adding the unit test. just a couple of nit comments

Comment thread data/src/main/java/org/apache/iceberg/data/DeleteFilter.java Outdated
* order than the table schema, which can cause different deleteSchema orderings, poisoning the
* cache.
*/
@TestTemplate

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The actual test makes sense, but I don't see why it is necessary to run this test of a core feature (DeleteFilter) in Spark. This should be done in the tests for that class.

@RussellSpitzer RussellSpitzer Mar 6, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main reason is I couldn't find a place to fit it in any of those places. I did try but it basically involved writing a custom test cache implementation as well. So while it's possible it basically involves re-implementing everything to get the same behavior. We could add a unit test for sorting though, that's pretty easy

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it looks like we currently don't have a great place for testing out just the logic within DeleteFilter. I think in the current structure, we'd have to have our own implementation of DeleteFilter for the test and as @RussellSpitzer said by the time we do all that it's basically just re-implementing what we have in the engine integrations like Spark. I think I agree with the current split of tests, but maybe the fact that we want to test DeleteFilter in isolation means we need to do some refactoring.

@RussellSpitzer RussellSpitzer Mar 6, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://gist.github.com/sfc-gh-rspitzer/e53841222262010847a68aa2ed59dd39 < Here is what Claude thinks that should look like. I haven't really gone through it in depth I just figured if I was writing an implementation of DeleteFilter and DeleteLoader we are going way off track.

Basically we mock just about everything around the load, just to test that the schema is always canonicalized.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't realize that the caching part was tied to Spark. This should be okay.

@rdblue rdblue added this to the Iceberg 1.11.0 milestone Mar 6, 2026
@rdblue

rdblue commented Mar 6, 2026

Copy link
Copy Markdown
Contributor

@amogh-jahagirdar FYI. Here's another one for the next 1.10.x release.

@github-actions github-actions Bot added the API label Mar 6, 2026

@amogh-jahagirdar amogh-jahagirdar left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks right to me, thank you @RussellSpitzer !

* order than the table schema, which can cause different deleteSchema orderings, poisoning the
* cache.
*/
@TestTemplate

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it looks like we currently don't have a great place for testing out just the logic within DeleteFilter. I think in the current structure, we'd have to have our own implementation of DeleteFilter for the test and as @RussellSpitzer said by the time we do all that it's basically just re-implementing what we have in the engine integrations like Spark. I think I agree with the current split of tests, but maybe the fact that we want to test DeleteFilter in isolation means we need to do some refactoring.

Comment thread api/src/main/java/org/apache/iceberg/types/TypeUtil.java Outdated
@RussellSpitzer

Copy link
Copy Markdown
Member Author

@rdblue Any final thoughts? Are you onboard with the test even though I agree it is quite suboptimal. We may just want to raise another issue to refactor this whole bit of code into something more testable ...

@amogh-jahagirdar

Copy link
Copy Markdown
Contributor

I'll go ahead and merge, thanks @RussellSpitzer and all for reviewing!

@amogh-jahagirdar amogh-jahagirdar merged commit f865bac into apache:main Mar 12, 2026
35 checks passed
amogh-jahagirdar pushed a commit that referenced this pull request Mar 12, 2026
… schema ordering

Equality delete schemas constructed in DeleteFilter.applyEqDeletes relied on
the field order of requiredSchema, which varies depending on the query's
projection. When the SparkExecutorCache returned delete records read with one
field ordering to a reader expecting another, StructProjection silently
misinterpreted the positional data, causing deletes to be skipped.

We fix this by Canonicalize the deleteSchema by sorting fields by field ID. Now
every reader produces the same schema for deletes regardless of projection,
ensuring cache hits return correctly ordered records.

Coded with the help of Cursor and claude-4.6.opus-high
@RussellSpitzer RussellSpitzer deleted the FixEqualityCacheIssue branch April 23, 2026 14:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

API bug Something isn't working data spark

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Undetermined behavior when fetching from iceberg table

7 participants