Spark 3.4: Support fanout writers in SparkPositionDeltaWrite#7703
Conversation
| } | ||
|
|
||
| public SparkWriteRequirements positionDeltaRequirements(Command command) { | ||
| if (ignoreTableDistributionAndOrdering()) { |
There was a problem hiding this comment.
Matches CoW and insert requirements now (defined above).
| @@ -130,15 +130,16 @@ private static Distribution copyOnWriteDeleteUpdateDistribution( | |||
|
|
|||
| /** Builds requirements for merge-on-read DELETE, UPDATE, MERGE operations. */ | |||
| public static SparkWriteRequirements positionDeltaRequirements( | |||
There was a problem hiding this comment.
There is a note at the top that this class is for internal purposes and has no API guarantees.
| long targetFileSize = context.targetDataFileSize(); | ||
|
|
||
| if (fanoutEnabled && !inputOrdered) { | ||
| return new FanoutDataWriter<>(writers, files, io, targetFileSize); |
There was a problem hiding this comment.
No need to use the fanout writer if input is ordered. The local sort always include partition columns.
|
|
||
| // the spec requires position deletes to be ordered by file and position | ||
| // use a fanout writer if the input is unordered no matter whether fanout writers are enabled | ||
| // clustered writers only validate records for the same spec/paritition are co-located, |
There was a problem hiding this comment.
Optional: clustered writers only validate records for the same spec/paritition are co-located which is not enough for position deletes => clustered writers assume that the position deletes are already ordered by file and position?
(the validation seems a bit orthogonal to previous sentence. I dont mind putting it in a as well somewhere, but doesnt seem its in the other comment in newDataWriter, so hence thinking its better to omit)
There was a problem hiding this comment.
I agree, let me update.
| // ------------------------------------------------------------------------- | ||
| // delete mode is NOT SET -> CLUSTER BY _spec_id, _partition, _file + | ||
| // LOCALLY ORDERED BY _spec_id, _partition, _file, _pos | ||
| // delete mode is NOT SET (fanout) -> CLUSTER BY _spec_id, _partition, _file + empty ordering |
There was a problem hiding this comment.
Probably annoying to add another example with ORDERED BY, but is it still correct with "ORDERED" above? Is empty_ordering not correct?
There was a problem hiding this comment.
If there is a table sort order, we still respect it by default and fanout writer config has no effect.
There was a problem hiding this comment.
Yea I meant , in this comment as it is ORDERED & UNORDERED, fanout should not be 'empty ordering', but rather whatever the order column was?
Not sure if I'm missing something
There was a problem hiding this comment.
Oh, this is DELETE. We only have positions, no data, so the order does not matter.
There was a problem hiding this comment.
Oh I see. ok. Well i guess it matters in the end, but the fanoutWriter here will sort it
amogh-jahagirdar
left a comment
There was a problem hiding this comment.
Looks good to me! Thanks @aokolnychyi
|
Thanks for reviewing, @szehon-ho @amogh-jahagirdar! |
This PR adds support for fanout writers in
SparkPositionDeltaWrite.