Skip to content

Flink: Prevent setting endTag/endSnapshotId for streaming source#10207

Merged
pvary merged 5 commits into
apache:mainfrom
pvary:prevent_end_on_stream
Apr 26, 2024
Merged

Flink: Prevent setting endTag/endSnapshotId for streaming source#10207
pvary merged 5 commits into
apache:mainfrom
pvary:prevent_end_on_stream

Conversation

@pvary

@pvary pvary commented Apr 23, 2024

Copy link
Copy Markdown
Contributor

Tried to hack around issues with setting endSnapshotId for a streaming job. It turns out the code is silently overwriting the values set in the context.

Since it doesn't make sense to set end for the streaming case, I propose to add validation to prevent these cases.
The Preconditions are copied from StreamingMonitorFunction used by the old FlinkSource implementation.

@github-actions github-actions Bot added the flink label Apr 23, 2024
snapshotId == null, "Cannot set snapshot-id option for streaming reader");
Preconditions.checkArgument(
asOfTimestamp == null, "Cannot set as-of-timestamp option for streaming reader");
Preconditions.checkArgument(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where were end snapshot (id or tag) silently overridden? I thought it might be a valid scenario. E.g., during backfill, maybe it can be streaming mode and discover splits snapshot by snapshot with an end snapshot.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the code:

      Snapshot toSnapshotInclusive =
          toSnapshotInclusive(
              lastConsumedSnapshotId, currentSnapshot, scanContext.maxPlanningSnapshotCount());
      IcebergEnumeratorPosition newPosition =
          IcebergEnumeratorPosition.of(
              toSnapshotInclusive.snapshotId(), toSnapshotInclusive.timestampMillis());
      ScanContext incrementalScan =
          scanContext.copyWithAppendsBetween(
              lastPosition.snapshotId(), toSnapshotInclusive.snapshotId());

The toSnapshotInclusive reads until the current snapshot. Its only role is to prevent reading more snapshot than maxPlanningSnapshotCount. And we set this as a last snapshot for the scan.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is to set the end snapshot per incremental scan/discovery. The source doesn't check/support end snapshot (like Kafka source's bounded end position).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I got it. This is why we need the new validations to prevent the wrong parametrization of the source.

@pvary pvary force-pushed the prevent_end_on_stream branch from 76cdc62 to c2815f6 Compare April 24, 2024 11:14
…d add timeout to the previously failing test
this.watermarkColumnTimeUnit = watermarkColumnTimeUnit;

validate();
if (!skipValidate) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this only for the endSnapshotId? if yes, should we remove the validation on endSnapshotId?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TestStreamScanSql was stuck because we set snapshotId too.
So this is for snapshotId and for endSnapshotId.

On the philosophical level, the question is:

  • Do we want to make sure that the created ScanContext is always valid?

If yes, then we change the copyWithAppendsBetween and the copyWithSnapshotId to remove the streaming flag, as those are not a streaming scans anymore.

If no, then we accept that the programmatically creates ScanContext objects don't need validation.

I opted for the first solution as I'm not sure where we depend on the streaming settings, and it was the least disruptive.

@stevenzwu stevenzwu Apr 25, 2024

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If yes, then we change the copyWithAppendsBetween and the copyWithSnapshotId to remove the streaming flag, as those are not a streaming scans anymore.

that is also not correct, because copy should meant copy. The main problem is that ScanContext is used for both user intention (via source builder) and internal incremental scan. I agree that internal incremental scan shouldn't have the streaming setting.

note that ScanContext is not a public class. Users can't construct this object directly. Maybe the validate() method shouldn't be called by the constructor and only be called by the ScanContext#Builder#build() method? or move some more intrinsic validation to IcebergSource builder?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Went with your suggestion, and removed the generic validation, and called the validation explicitly from the source.

@pvary pvary force-pushed the prevent_end_on_stream branch from cef9372 to 789191a Compare April 26, 2024 11:16
@pvary pvary merged commit 646440a into apache:main Apr 26, 2024
@pvary pvary deleted the prevent_end_on_stream branch April 26, 2024 17:19
@pvary

pvary commented Apr 26, 2024

Copy link
Copy Markdown
Contributor Author

Merged to main.
Thanks for the review @stevenzwu!

pvary pushed a commit to pvary/iceberg that referenced this pull request Apr 27, 2024
stevenzwu pushed a commit that referenced this pull request Apr 27, 2024
Co-authored-by: Peter Vary <peter_vary4@apple.com>
sasankpagolu pushed a commit to sasankpagolu/iceberg that referenced this pull request Oct 27, 2024
sasankpagolu pushed a commit to sasankpagolu/iceberg that referenced this pull request Oct 27, 2024
Co-authored-by: Peter Vary <peter_vary4@apple.com>
zachdisc pushed a commit to zachdisc/iceberg that referenced this pull request Dec 23, 2024
zachdisc pushed a commit to zachdisc/iceberg that referenced this pull request Dec 23, 2024
Co-authored-by: Peter Vary <peter_vary4@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants