Flink: Add possibilit of ordering the splits based on the file sequence number#7661
Conversation
|
|
||
| /** Simple assigner only tracks unassigned splits */ | ||
| @Override | ||
| public synchronized Collection<IcebergSourceSplitState> state() { |
There was a problem hiding this comment.
if we go with the new approach on watermark alignment, IcebergSourceSplitState is probably not needed. this can be Collection<IcebergSourceSplit> pendingSplits(). It can be a separate PR to not distract the purpose of this PR.
There was a problem hiding this comment.
Wouldn't we still need the state to store the current read positions at savepoints/checkpoints?
There was a problem hiding this comment.
IcebergSourceSplitState class just wrapped IcebergSourceSplit with a status flag (like UNASSIGNED, ASSIGNED). the flag was introduced for the watermark alignment assigner, as it needs to track the ASSIGNED (and not completed) splits in addition to the pending UNASSIGNED splits. With the new approach of leveraging Flink for watermark tracking/aggregation, we only need to track pending UNASSIGNED splits.
…itAssigner, checks/messages
|
thanks @pvary for the contribution |
|
Thanks @stevenzwu for the review! |
…ts based on the file sequence number (apache#7661)
…ts based on the file sequence number (apache#7661) (apache#7889)
Flink users often depend on the fact that the IcebergSource reads the Iceberg snapshots in the same order as they were committed. This assumption could be broken on job start or job restart when multiple snapshots are read in one planning cycle.
To ensure that the splits are ordered as expected the PR adds FileSequenceNumberBasedSplitAssignerFactory where the splits are ordered based on the ContentFile.fileSequenceNumber().