Skip to content

Flink: SQL support for dynamic iceberg sink#15279

Merged
pvary merged 11 commits into
apache:mainfrom
swapna267:dynamic_sink_flink_sql
Feb 23, 2026
Merged

Flink: SQL support for dynamic iceberg sink#15279
pvary merged 11 commits into
apache:mainfrom
swapna267:dynamic_sink_flink_sql

Conversation

@swapna267

Copy link
Copy Markdown
Contributor

This PR introduces a SQL table connector for using the dynamic iceberg sink.

Two new configuration options have been added to FlinkCreateTableOptions:

  • use-dynamic-iceberg-sink (boolean): Enable/disable dynamic sink functionality
  • dynamic-record-generator-impl (string): Fully qualified class name of the DynamicTableRecordGenerator implementation

Example SQL,

  CREATE TABLE dynamic_sink_table (
      id BIGINT,
      data STRING,
      database_name STRING,
      table_name STRING
  ) WITH (
      'connector' = 'iceberg',
      'catalog-type' = 'hadoop',
      'catalog-name' = 'my_catalog',
      'warehouse' = 's3://my-warehouse/',
      'use-dynamic-iceberg-sink' = 'true',
      'dynamic-record-generator-impl' = 'com.example.MyDynamicRecordGenerator',
      'table.props.write.format.default' = 'parquet',
      'table.props.write.target-file-size-bytes' = '134217728'
  );

  -- Insert data that will be routed to different tables based on database_name and table_name
  INSERT INTO dynamic_sink_table VALUES
      (1, 'record1', 'sales', 'orders'),
      (2, 'record2', 'sales', 'customers'),
      (3, 'record3', 'inventory', 'products');

Planning to provide a CustomVariantToDynamicRecordGenerator that can handle Flink VARIANT type column to generate records of different schemas landing in tables of corresponding schema.
Will add that in a different PR.

@github-actions github-actions Bot added the flink label Feb 9, 2026

@mxm mxm left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @swapna267! This looks great.

}

@TestTemplate
public void testCreateDynamicIcebergSink() throws DatabaseAlreadyExistException {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we verify this test works with both the old FlinkSink and the new IcebergSink?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test in particular is testing the DynamicIcebergSink only by setting use-dynamic-iceberg-sink to true.

But i also see, TestIcebergConnector is not testing the new IcebergSink code path. Partially it's covered in TestFlinkTableSink (where iceberg tables are created in Iceberg catalog).

If my understanding is right, i prefer to put that into separate PR.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. The test is fine as-is.

Comment on lines +145 to +150
String dynamicRecordGeneratorImpl =
flinkConf.get(FlinkCreateTableOptions.DYNAMIC_RECORD_GENERATOR_IMPL);
Preconditions.checkNotNull(
dynamicRecordGeneratorImpl,
"%s must be specified when use-dynamic-iceberg-sink is true",
FlinkCreateTableOptions.DYNAMIC_RECORD_GENERATOR_IMPL.key());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a test to verify these conditions?

@swapna267 swapna267 Feb 11, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure can add. Don't see such detailed one's in general . Also concerned about the time tests take today to complete.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have many such tests for Dynamic Sink. Not specifying the record generator will probably error when it's being created, but it would still be nice to check for the particular error message reported back to the user. I'll leave it up to you to add it or not.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

@Guosmilesmile Guosmilesmile left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the Pr!Left some comments.

Comment thread flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java Outdated

private TableCreator createTableCreator() {
final Map<String, String> tableProperties =
org.apache.iceberg.util.PropertyUtil.propertiesWithPrefix(writeProps, "table.props.");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I’m not mistaken, if we want to set the table property write.parquet.row-group-size-bytes, do we need to specify it here as table.props.write.parquet.row-group-size-bytes? I think this should be documented and we should add a corresponding test case.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes right. When doing CREATE TABLE in flink catalog , we pass in catalog configuration here.
table.props prefix is used to separate out the physical Iceberg table properties.

Basic documentation about the connector is here,
https://iceberg.apache.org/docs/nightly/flink-connector/
Once we have all functionality (dynamic record generator impl is coming in next PR), will add details there.

I combined this in existing test case , https://github.com/swapna267/iceberg/blob/bd2d500f07fb24d05111b6dabc9a8e77637a922c/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java#L393
I can pull it out into another one if we think it's required.

@swapna267

Copy link
Copy Markdown
Contributor Author

Thanks @mxm and @Guosmilesmile for the review. Replied on some comments.

.getCatalogLoader()
.loadCatalog()
.loadTable(TableIdentifier.of(databaseName(), tableName()));
assertThat(table.properties()).containsEntry("key1", "val1");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also verify the records written to the table?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

Comment on lines +145 to +150
String dynamicRecordGeneratorImpl =
flinkConf.get(FlinkCreateTableOptions.DYNAMIC_RECORD_GENERATOR_IMPL);
Preconditions.checkNotNull(
dynamicRecordGeneratorImpl,
"%s must be specified when use-dynamic-iceberg-sink is true",
FlinkCreateTableOptions.DYNAMIC_RECORD_GENERATOR_IMPL.key());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have many such tests for Dynamic Sink. Not specifying the record generator will probably error when it's being created, but it would still be nice to check for the particular error message reported back to the user. I'll leave it up to you to add it or not.

@mxm

mxm commented Feb 12, 2026

Copy link
Copy Markdown
Contributor

LGTM, just some minor comments.


private static FlinkCatalog createCatalogLoader(
Map<String, String> tableProps, String catalogName) {
Preconditions.checkNotNull(

@pvary pvary Feb 17, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use checkArgument and "standard error message"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that this is only a move for this check. Do you think it would cause any issues if we change this to the new standard?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't see any tests also covering that. No impact on users, unless they are relying specifically on NPE here to do something specific.
checkNotNull is being used at multiple places in this class though. To keep it consistent, we need to apply at all places though.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change this to:

Preconditions.checkArgument(catalogName, "Invalid catalog name: null. Set %s table property.");

@swapna267 swapna267 Feb 19, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Cleaned up other nonnull checks and removed following 2 as they will never happen.

Preconditions.checkNotNull(catalog, "Flink catalog cannot be null"); -> createtableloader is only called when catalog != null.

Preconditions.checkNotNull(catalogTable, "The iceberg table name cannot be null"); -> Table name can never be null as create table statement will not compile without that.

@pvary

pvary commented Feb 17, 2026

Copy link
Copy Markdown
Contributor

Nice stuff @swapna267!

Could you please update the documentation too?

@swapna267

Copy link
Copy Markdown
Contributor Author

Thanks @pvary . Working on adding some usable dynamic record generators which can use VARIANT column type in Flink SQL to generate the dynamic records.Should have the PR by end of this week. Would like to add the documentation post that PR review.

.orElseGet(ImmutableList::of);
return (DataStreamSinkProvider)
(providerContext, dataStream) -> {
if (catalogLoader != null && dynamicRecordGeneratorImpl != null) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a check somewhere that these things are set together?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a check by using useDynamicSink field to determine.

Comment thread flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java Outdated
(providerContext, dataStream) -> {
if (Boolean.TRUE.equals(
readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_V2_SINK))) {
return IcebergSink.forRowData(dataStream)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just create a method of all of the sink creation, like

return createDynamicIcebergSink(dataStream);

return legacySink(dataStream, resolvedSchema, equalityColumns);

return icebergSink(dataStream, resolvedSchema, equalityColumns);

@mxm mxm left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Great work @swapna267! Thanks for reviewing, @pvary!

@pvary pvary merged commit 86defce into apache:main Feb 23, 2026
14 checks passed
@pvary

pvary commented Feb 23, 2026

Copy link
Copy Markdown
Contributor

Merged to main.
Thanks @swapna267 for the PR, and @mxm and @Guosmilesmile for the review!

@pvary

pvary commented Feb 23, 2026

Copy link
Copy Markdown
Contributor

@swapna267: Please backport with something like this:

g d HEAD^ HEAD spark/v4.1 |sed "s/v4.1/v4.0/g">/tmp/patch;g apply -3 -p1 /tmp/patch

@swapna267

Copy link
Copy Markdown
Contributor Author

Thanks @pvary for review and merging. Will do the backport.

pvary pushed a commit that referenced this pull request Feb 27, 2026
RjLi13 pushed a commit to RjLi13/iceberg that referenced this pull request Mar 12, 2026
RjLi13 pushed a commit to RjLi13/iceberg that referenced this pull request Mar 12, 2026
talatuyarer pushed a commit to talatuyarer/iceberg that referenced this pull request Apr 1, 2026
talatuyarer pushed a commit to talatuyarer/iceberg that referenced this pull request Apr 1, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants