Flink: Prevent setting endTag/endSnapshotId for streaming source#10207
Conversation
| snapshotId == null, "Cannot set snapshot-id option for streaming reader"); | ||
| Preconditions.checkArgument( | ||
| asOfTimestamp == null, "Cannot set as-of-timestamp option for streaming reader"); | ||
| Preconditions.checkArgument( |
There was a problem hiding this comment.
where were end snapshot (id or tag) silently overridden? I thought it might be a valid scenario. E.g., during backfill, maybe it can be streaming mode and discover splits snapshot by snapshot with an end snapshot.
There was a problem hiding this comment.
There was a problem hiding this comment.
Here is the code:
Snapshot toSnapshotInclusive =
toSnapshotInclusive(
lastConsumedSnapshotId, currentSnapshot, scanContext.maxPlanningSnapshotCount());
IcebergEnumeratorPosition newPosition =
IcebergEnumeratorPosition.of(
toSnapshotInclusive.snapshotId(), toSnapshotInclusive.timestampMillis());
ScanContext incrementalScan =
scanContext.copyWithAppendsBetween(
lastPosition.snapshotId(), toSnapshotInclusive.snapshotId());
The toSnapshotInclusive reads until the current snapshot. Its only role is to prevent reading more snapshot than maxPlanningSnapshotCount. And we set this as a last snapshot for the scan.
There was a problem hiding this comment.
this is to set the end snapshot per incremental scan/discovery. The source doesn't check/support end snapshot (like Kafka source's bounded end position).
There was a problem hiding this comment.
Yeah, I got it. This is why we need the new validations to prevent the wrong parametrization of the source.
76cdc62 to
c2815f6
Compare
…d add timeout to the previously failing test
| this.watermarkColumnTimeUnit = watermarkColumnTimeUnit; | ||
|
|
||
| validate(); | ||
| if (!skipValidate) { |
There was a problem hiding this comment.
is this only for the endSnapshotId? if yes, should we remove the validation on endSnapshotId?
There was a problem hiding this comment.
The TestStreamScanSql was stuck because we set snapshotId too.
So this is for snapshotId and for endSnapshotId.
On the philosophical level, the question is:
- Do we want to make sure that the created
ScanContextis always valid?
If yes, then we change the copyWithAppendsBetween and the copyWithSnapshotId to remove the streaming flag, as those are not a streaming scans anymore.
If no, then we accept that the programmatically creates ScanContext objects don't need validation.
I opted for the first solution as I'm not sure where we depend on the streaming settings, and it was the least disruptive.
There was a problem hiding this comment.
If yes, then we change the copyWithAppendsBetween and the copyWithSnapshotId to remove the streaming flag, as those are not a streaming scans anymore.
that is also not correct, because copy should meant copy. The main problem is that ScanContext is used for both user intention (via source builder) and internal incremental scan. I agree that internal incremental scan shouldn't have the streaming setting.
note that ScanContext is not a public class. Users can't construct this object directly. Maybe the validate() method shouldn't be called by the constructor and only be called by the ScanContext#Builder#build() method? or move some more intrinsic validation to IcebergSource builder?
There was a problem hiding this comment.
Went with your suggestion, and removed the generic validation, and called the validation explicitly from the source.
cef9372 to
789191a
Compare
|
Merged to main. |
Co-authored-by: Peter Vary <peter_vary4@apple.com>
Co-authored-by: Peter Vary <peter_vary4@apple.com>
Co-authored-by: Peter Vary <peter_vary4@apple.com>
Tried to hack around issues with setting
endSnapshotIdfor a streaming job. It turns out the code is silently overwriting the values set in the context.Since it doesn't make sense to set
endfor the streaming case, I propose to add validation to prevent these cases.The
Preconditionsare copied fromStreamingMonitorFunctionused by the old FlinkSource implementation.