Core, Spark: Fix equality deletes non-deterministic schema ordering (#13873)#15514
Conversation
…hema ordering (apache#13873) Equality delete schemas constructed in DeleteFilter.applyEqDeletes relied on the field order of requiredSchema, which varies depending on the query's projection. When the SparkExecutorCache returned delete records read with one field ordering to a reader expecting another, StructProjection silently misinterpreted the positional data, causing deletes to be skipped. We fix this by Canonicalize the deleteSchema by sorting fields by field ID. Now every reader produces the same schema for deletes regardless of projection, ensuring cache hits return correctly ordered records. Coded with the help of Cursor and claude-4.6.opus-high
mxm
left a comment
There was a problem hiding this comment.
LGTM. Looks like the cache key is the delete file itself, which makes sense, as long as we use a deterministic approach for the schema projection.
| eqTestTable.newRowDelta().addDeletes(eqFile).commit(); | ||
|
|
||
| String tableRef = TableIdentifier.of("default", EQ_CACHE_TABLE).toString(); | ||
| int expectedRows = 7; |
There was a problem hiding this comment.
nit: maybe a little bit easier to understand?
| int expectedRows = 7; | |
| int expectedRows = data.size() - delete.size(); |
| Types.NestedField.optional(1, "id", Types.IntegerType.get()), | ||
| Types.NestedField.optional(2, "a", Types.IntegerType.get()), | ||
| Types.NestedField.optional(3, "b", Types.IntegerType.get())); | ||
| PartitionSpec spec = PartitionSpec.builderFor(eqDeleteTestSchema).bucket("id", 1).build(); |
There was a problem hiding this comment.
Why do we not use an unpartitioned table here?
There was a problem hiding this comment.
No reason, I originally had this in a different file where all the tables were partitioned and I didn't want to feel left out. Now that it's here we can remove it
pvary
left a comment
There was a problem hiding this comment.
Marked a few places where the code could made cleaner
stevenzwu
left a comment
There was a problem hiding this comment.
LGTM. thanks for adding the unit test. just a couple of nit comments
| * order than the table schema, which can cause different deleteSchema orderings, poisoning the | ||
| * cache. | ||
| */ | ||
| @TestTemplate |
There was a problem hiding this comment.
The actual test makes sense, but I don't see why it is necessary to run this test of a core feature (DeleteFilter) in Spark. This should be done in the tests for that class.
There was a problem hiding this comment.
Main reason is I couldn't find a place to fit it in any of those places. I did try but it basically involved writing a custom test cache implementation as well. So while it's possible it basically involves re-implementing everything to get the same behavior. We could add a unit test for sorting though, that's pretty easy
There was a problem hiding this comment.
Yeah it looks like we currently don't have a great place for testing out just the logic within DeleteFilter. I think in the current structure, we'd have to have our own implementation of DeleteFilter for the test and as @RussellSpitzer said by the time we do all that it's basically just re-implementing what we have in the engine integrations like Spark. I think I agree with the current split of tests, but maybe the fact that we want to test DeleteFilter in isolation means we need to do some refactoring.
There was a problem hiding this comment.
https://gist.github.com/sfc-gh-rspitzer/e53841222262010847a68aa2ed59dd39 < Here is what Claude thinks that should look like. I haven't really gone through it in depth I just figured if I was writing an implementation of DeleteFilter and DeleteLoader we are going way off track.
Basically we mock just about everything around the load, just to test that the schema is always canonicalized.
There was a problem hiding this comment.
I didn't realize that the caching part was tied to Spark. This should be okay.
|
@amogh-jahagirdar FYI. Here's another one for the next 1.10.x release. |
amogh-jahagirdar
left a comment
There was a problem hiding this comment.
This looks right to me, thank you @RussellSpitzer !
| * order than the table schema, which can cause different deleteSchema orderings, poisoning the | ||
| * cache. | ||
| */ | ||
| @TestTemplate |
There was a problem hiding this comment.
Yeah it looks like we currently don't have a great place for testing out just the logic within DeleteFilter. I think in the current structure, we'd have to have our own implementation of DeleteFilter for the test and as @RussellSpitzer said by the time we do all that it's basically just re-implementing what we have in the engine integrations like Spark. I think I agree with the current split of tests, but maybe the fact that we want to test DeleteFilter in isolation means we need to do some refactoring.
|
@rdblue Any final thoughts? Are you onboard with the test even though I agree it is quite suboptimal. We may just want to raise another issue to refactor this whole bit of code into something more testable ... |
|
I'll go ahead and merge, thanks @RussellSpitzer and all for reviewing! |
… schema ordering Equality delete schemas constructed in DeleteFilter.applyEqDeletes relied on the field order of requiredSchema, which varies depending on the query's projection. When the SparkExecutorCache returned delete records read with one field ordering to a reader expecting another, StructProjection silently misinterpreted the positional data, causing deletes to be skipped. We fix this by Canonicalize the deleteSchema by sorting fields by field ID. Now every reader produces the same schema for deletes regardless of projection, ensuring cache hits return correctly ordered records. Coded with the help of Cursor and claude-4.6.opus-high
Equality delete schemas constructed in DeleteFilter.applyEqDeletes relied on the field order of requiredSchema, which varies depending on the query's projection. When the SparkExecutorCache returned delete records read with one field ordering to a reader expecting another, StructProjection silently misinterpreted the positional data, causing deletes to be skipped.
We fix this by Canonicalize the deleteSchema by sorting fields by field ID. Now every reader produces the same schema for deletes regardless of projection, ensuring cache hits return correctly ordered records.
Coded with the help of Cursor and claude-4.6.opus-high
--
TLDR;
First call read with schema [a,b] for deletes:
read+project(file, [a,b]) → cache_projected_deletes(file) → applyDeletes with [a,b]
Second call reads with Schema [b, a] for deletes:
load_cache_projected_deletes(file) → applyDeletes with [b,a] → mismatch
Fixes #13873