Skip to content

Flink: Add the possibility to use Coordinator Lock when using Flink SQL#15459

Merged
pvary merged 6 commits into
apache:mainfrom
Guosmilesmile:table-maintance-sink-coordinator
Mar 9, 2026
Merged

Flink: Add the possibility to use Coordinator Lock when using Flink SQL#15459
pvary merged 6 commits into
apache:mainfrom
Guosmilesmile:table-maintance-sink-coordinator

Conversation

@Guosmilesmile

Copy link
Copy Markdown
Contributor

We already support the Coordinator Lock, but it hasn’t been introduced into Sink Table Maintenance yet. This PR adds support for configuring the Coordinator Lock in Sink Table Maintenance.

Comment on lines +255 to +258
Preconditions.checkArgument(lockTime != null, "Lock time is null, Can't release lock");
if (lockTime == null) {
LOG.warn("Lock time is null, Can't release lock");
return;
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we meet max watermark case, this will error, so open a new pr to deal with it .
#15458

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've left a comment on that PR.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing it out, I have change the way

@Guosmilesmile Guosmilesmile force-pushed the table-maintance-sink-coordinator branch from 9e40643 to b1262dd Compare February 27, 2026 09:08

@mxm mxm left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Guosmilesmile!

Comment thread docs/docs/flink-maintenance.md Outdated
Comment thread docs/docs/flink-maintenance.md Outdated
env.execute("Table Maintenance Job");
```

Use Coordinator Lock

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Use Coordinator Lock
#### Managing table locking via Flink

Comment thread docs/docs/flink-maintenance.md Outdated
Comment on lines +177 to +201
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

TableLoader tableLoader = TableLoader.fromCatalog(
CatalogLoader.hive("my_catalog", configuration, properties),
TableIdentifier.of("database", "table")
);

TableMaintenance.forTable(env, tableLoader)
.uidSuffix("my-maintenance-job")
.rateLimit(Duration.ofMinutes(10))
.lockCheckDelay(Duration.ofSeconds(10))
.add(ExpireSnapshots.builder()
.scheduleOnCommitCount(10)
.maxSnapshotAge(Duration.ofMinutes(10))
.retainLast(5)
.deleteBatchSize(5)
.parallelism(8))
.add(RewriteDataFiles.builder()
.scheduleOnDataFileCount(10)
.targetFileSizeBytes(128 * 1024 * 1024)
.partialProgressEnabled(true)
.partialProgressMaxCommits(10))
.append();

env.execute("Table Maintenance Job");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything except line 184 is identical (no lock parameter). Maybe consolidate the two sections and just explain that the builder can either be

TableMaintenance.forTable(env, tableLoader, lockFactory)

or

TableMaintenance.forTable(env, tableLoader)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I combine two part code

Comment on lines +255 to +258
Preconditions.checkArgument(lockTime != null, "Lock time is null, Can't release lock");
if (lockTime == null) {
LOG.warn("Lock time is null, Can't release lock");
return;
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've left a comment on that PR.


| Key | Description | Default |
|-----|----------------------|---------|
| `flink-maintenance.lock.type` | Set to `` or not set | |

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to have this default to flink? (which will be using the coordinator)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the coordinator lock will become the default later and the other lock options are removed, I’d prefer not to require users to set this parameter here, so it’s easier and more consistent with their future usage habits.

@Guosmilesmile Guosmilesmile force-pushed the table-maintance-sink-coordinator branch 2 times, most recently from e15c09d to 3ae3a07 Compare March 5, 2026 05:26
Comment on lines +108 to +111
if (!Watermark.MAX_WATERMARK.equals(mark)) {
operatorEventGateway.sendEventToCoordinator(
new LockReleaseEvent(tableName, mark.getTimestamp()));
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's compare the timestamp, not the object.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok,make it compare with timestamp.

}

@Test
@TestTemplate

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In one of my PRs it was suggested not to use TestTemplate as it was only introduced for backward compatibility reasons.

I was pointed to use @ParameterizedTest for every test instead:

  @ParameterizedTest
  @FieldSource("FILE_FORMATS")

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Get it. Change TestIcebergSinkCompact use ParameterizedTest

Comment thread docs/docs/flink-maintenance.md Outdated
Comment thread docs/docs/flink-maintenance.md Outdated
jdbcProps // JDBC connection properties
);

// Option 1: With external lock factory

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we mention that we plan to deprecate Option 1?

Do we plan to do it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, as we add doc in TableMaintenance, deprecated since 1.12.0. Add the comment in the doc .

@pvary pvary merged commit b310cba into apache:main Mar 9, 2026
18 checks passed
@pvary

pvary commented Mar 9, 2026

Copy link
Copy Markdown
Contributor

Merged to main.
Thanks @Guosmilesmile for the PR and @mxm for the review

@pvary pvary changed the title Flink: Compact in sink v2 support Coordinator Lock Flink: Add the possibility to use Coordinator Lock when using Flink SQL Mar 9, 2026
@Guosmilesmile Guosmilesmile deleted the table-maintance-sink-coordinator branch March 9, 2026 12:08
pvary pushed a commit that referenced this pull request Mar 9, 2026
RjLi13 pushed a commit to RjLi13/iceberg that referenced this pull request Mar 12, 2026
RjLi13 pushed a commit to RjLi13/iceberg that referenced this pull request Mar 12, 2026
talatuyarer pushed a commit to talatuyarer/iceberg that referenced this pull request Apr 1, 2026
talatuyarer pushed a commit to talatuyarer/iceberg that referenced this pull request Apr 1, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants