Flink: Add the possibility to use Coordinator Lock when using Flink SQL#15459
Conversation
| Preconditions.checkArgument(lockTime != null, "Lock time is null, Can't release lock"); | ||
| if (lockTime == null) { | ||
| LOG.warn("Lock time is null, Can't release lock"); | ||
| return; | ||
| } |
There was a problem hiding this comment.
When we meet max watermark case, this will error, so open a new pr to deal with it .
#15458
There was a problem hiding this comment.
I've left a comment on that PR.
There was a problem hiding this comment.
Thanks for pointing it out, I have change the way
9e40643 to
b1262dd
Compare
| env.execute("Table Maintenance Job"); | ||
| ``` | ||
|
|
||
| Use Coordinator Lock |
There was a problem hiding this comment.
| Use Coordinator Lock | |
| #### Managing table locking via Flink |
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | ||
|
|
||
| TableLoader tableLoader = TableLoader.fromCatalog( | ||
| CatalogLoader.hive("my_catalog", configuration, properties), | ||
| TableIdentifier.of("database", "table") | ||
| ); | ||
|
|
||
| TableMaintenance.forTable(env, tableLoader) | ||
| .uidSuffix("my-maintenance-job") | ||
| .rateLimit(Duration.ofMinutes(10)) | ||
| .lockCheckDelay(Duration.ofSeconds(10)) | ||
| .add(ExpireSnapshots.builder() | ||
| .scheduleOnCommitCount(10) | ||
| .maxSnapshotAge(Duration.ofMinutes(10)) | ||
| .retainLast(5) | ||
| .deleteBatchSize(5) | ||
| .parallelism(8)) | ||
| .add(RewriteDataFiles.builder() | ||
| .scheduleOnDataFileCount(10) | ||
| .targetFileSizeBytes(128 * 1024 * 1024) | ||
| .partialProgressEnabled(true) | ||
| .partialProgressMaxCommits(10)) | ||
| .append(); | ||
|
|
||
| env.execute("Table Maintenance Job"); |
There was a problem hiding this comment.
Everything except line 184 is identical (no lock parameter). Maybe consolidate the two sections and just explain that the builder can either be
TableMaintenance.forTable(env, tableLoader, lockFactory)
or
TableMaintenance.forTable(env, tableLoader)
There was a problem hiding this comment.
Right, I combine two part code
| Preconditions.checkArgument(lockTime != null, "Lock time is null, Can't release lock"); | ||
| if (lockTime == null) { | ||
| LOG.warn("Lock time is null, Can't release lock"); | ||
| return; | ||
| } |
There was a problem hiding this comment.
I've left a comment on that PR.
|
|
||
| | Key | Description | Default | | ||
| |-----|----------------------|---------| | ||
| | `flink-maintenance.lock.type` | Set to `` or not set | | |
There was a problem hiding this comment.
Would it make sense to have this default to flink? (which will be using the coordinator)
There was a problem hiding this comment.
If the coordinator lock will become the default later and the other lock options are removed, I’d prefer not to require users to set this parameter here, so it’s easier and more consistent with their future usage habits.
e15c09d to
3ae3a07
Compare
3ae3a07 to
7193f0b
Compare
| if (!Watermark.MAX_WATERMARK.equals(mark)) { | ||
| operatorEventGateway.sendEventToCoordinator( | ||
| new LockReleaseEvent(tableName, mark.getTimestamp())); | ||
| } |
There was a problem hiding this comment.
Let's compare the timestamp, not the object.
There was a problem hiding this comment.
Ok,make it compare with timestamp.
| } | ||
|
|
||
| @Test | ||
| @TestTemplate |
There was a problem hiding this comment.
In one of my PRs it was suggested not to use TestTemplate as it was only introduced for backward compatibility reasons.
I was pointed to use @ParameterizedTest for every test instead:
@ParameterizedTest
@FieldSource("FILE_FORMATS")
There was a problem hiding this comment.
Get it. Change TestIcebergSinkCompact use ParameterizedTest
| jdbcProps // JDBC connection properties | ||
| ); | ||
|
|
||
| // Option 1: With external lock factory |
There was a problem hiding this comment.
Shall we mention that we plan to deprecate Option 1?
Do we plan to do it?
There was a problem hiding this comment.
Yes, as we add doc in TableMaintenance, deprecated since 1.12.0. Add the comment in the doc .
|
Merged to main. |
…g Flink SQL to 2.0 and 1.20 (apache#15562) Backports apache#15459
…g Flink SQL to 2.0 and 1.20 (apache#15562) Backports apache#15459
We already support the Coordinator Lock, but it hasn’t been introduced into Sink Table Maintenance yet. This PR adds support for configuring the Coordinator Lock in Sink Table Maintenance.