Flink: SQL support for dynamic iceberg sink#15279
Conversation
mxm
left a comment
There was a problem hiding this comment.
Thanks @swapna267! This looks great.
| } | ||
|
|
||
| @TestTemplate | ||
| public void testCreateDynamicIcebergSink() throws DatabaseAlreadyExistException { |
There was a problem hiding this comment.
Could we verify this test works with both the old FlinkSink and the new IcebergSink?
There was a problem hiding this comment.
This test in particular is testing the DynamicIcebergSink only by setting use-dynamic-iceberg-sink to true.
But i also see, TestIcebergConnector is not testing the new IcebergSink code path. Partially it's covered in TestFlinkTableSink (where iceberg tables are created in Iceberg catalog).
If my understanding is right, i prefer to put that into separate PR.
There was a problem hiding this comment.
That makes sense. The test is fine as-is.
| String dynamicRecordGeneratorImpl = | ||
| flinkConf.get(FlinkCreateTableOptions.DYNAMIC_RECORD_GENERATOR_IMPL); | ||
| Preconditions.checkNotNull( | ||
| dynamicRecordGeneratorImpl, | ||
| "%s must be specified when use-dynamic-iceberg-sink is true", | ||
| FlinkCreateTableOptions.DYNAMIC_RECORD_GENERATOR_IMPL.key()); |
There was a problem hiding this comment.
Should we add a test to verify these conditions?
There was a problem hiding this comment.
Sure can add. Don't see such detailed one's in general . Also concerned about the time tests take today to complete.
There was a problem hiding this comment.
We have many such tests for Dynamic Sink. Not specifying the record generator will probably error when it's being created, but it would still be nice to check for the particular error message reported back to the user. I'll leave it up to you to add it or not.
Guosmilesmile
left a comment
There was a problem hiding this comment.
Thanks for the Pr!Left some comments.
|
|
||
| private TableCreator createTableCreator() { | ||
| final Map<String, String> tableProperties = | ||
| org.apache.iceberg.util.PropertyUtil.propertiesWithPrefix(writeProps, "table.props."); |
There was a problem hiding this comment.
If I’m not mistaken, if we want to set the table property write.parquet.row-group-size-bytes, do we need to specify it here as table.props.write.parquet.row-group-size-bytes? I think this should be documented and we should add a corresponding test case.
There was a problem hiding this comment.
Yes right. When doing CREATE TABLE in flink catalog , we pass in catalog configuration here.
table.props prefix is used to separate out the physical Iceberg table properties.
Basic documentation about the connector is here,
https://iceberg.apache.org/docs/nightly/flink-connector/
Once we have all functionality (dynamic record generator impl is coming in next PR), will add details there.
I combined this in existing test case , https://github.com/swapna267/iceberg/blob/bd2d500f07fb24d05111b6dabc9a8e77637a922c/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java#L393
I can pull it out into another one if we think it's required.
|
Thanks @mxm and @Guosmilesmile for the review. Replied on some comments. |
| .getCatalogLoader() | ||
| .loadCatalog() | ||
| .loadTable(TableIdentifier.of(databaseName(), tableName())); | ||
| assertThat(table.properties()).containsEntry("key1", "val1"); |
There was a problem hiding this comment.
Could we also verify the records written to the table?
| String dynamicRecordGeneratorImpl = | ||
| flinkConf.get(FlinkCreateTableOptions.DYNAMIC_RECORD_GENERATOR_IMPL); | ||
| Preconditions.checkNotNull( | ||
| dynamicRecordGeneratorImpl, | ||
| "%s must be specified when use-dynamic-iceberg-sink is true", | ||
| FlinkCreateTableOptions.DYNAMIC_RECORD_GENERATOR_IMPL.key()); |
There was a problem hiding this comment.
We have many such tests for Dynamic Sink. Not specifying the record generator will probably error when it's being created, but it would still be nice to check for the particular error message reported back to the user. I'll leave it up to you to add it or not.
|
LGTM, just some minor comments. |
|
|
||
| private static FlinkCatalog createCatalogLoader( | ||
| Map<String, String> tableProps, String catalogName) { | ||
| Preconditions.checkNotNull( |
There was a problem hiding this comment.
use checkArgument and "standard error message"
There was a problem hiding this comment.
I see that this is only a move for this check. Do you think it would cause any issues if we change this to the new standard?
There was a problem hiding this comment.
Don't see any tests also covering that. No impact on users, unless they are relying specifically on NPE here to do something specific.
checkNotNull is being used at multiple places in this class though. To keep it consistent, we need to apply at all places though.
There was a problem hiding this comment.
Please change this to:
Preconditions.checkArgument(catalogName, "Invalid catalog name: null. Set %s table property.");
There was a problem hiding this comment.
Sure. Cleaned up other nonnull checks and removed following 2 as they will never happen.
Preconditions.checkNotNull(catalog, "Flink catalog cannot be null"); -> createtableloader is only called when catalog != null.
Preconditions.checkNotNull(catalogTable, "The iceberg table name cannot be null"); -> Table name can never be null as create table statement will not compile without that.
|
Nice stuff @swapna267! Could you please update the documentation too? |
|
Thanks @pvary . Working on adding some usable dynamic record generators which can use VARIANT column type in Flink SQL to generate the dynamic records.Should have the PR by end of this week. Would like to add the documentation post that PR review. |
| .orElseGet(ImmutableList::of); | ||
| return (DataStreamSinkProvider) | ||
| (providerContext, dataStream) -> { | ||
| if (catalogLoader != null && dynamicRecordGeneratorImpl != null) { |
There was a problem hiding this comment.
Do we need a check somewhere that these things are set together?
There was a problem hiding this comment.
Added a check by using useDynamicSink field to determine.
| (providerContext, dataStream) -> { | ||
| if (Boolean.TRUE.equals( | ||
| readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_V2_SINK))) { | ||
| return IcebergSink.forRowData(dataStream) |
There was a problem hiding this comment.
Could we just create a method of all of the sink creation, like
return createDynamicIcebergSink(dataStream);
return legacySink(dataStream, resolvedSchema, equalityColumns);
return icebergSink(dataStream, resolvedSchema, equalityColumns);
mxm
left a comment
There was a problem hiding this comment.
LGTM. Great work @swapna267! Thanks for reviewing, @pvary!
|
Merged to main. |
|
@swapna267: Please backport with something like this: |
|
Thanks @pvary for review and merging. Will do the backport. |
This PR introduces a SQL table connector for using the dynamic iceberg sink.
Two new configuration options have been added to FlinkCreateTableOptions:
Example SQL,
Planning to provide a CustomVariantToDynamicRecordGenerator that can handle Flink VARIANT type column to generate records of different schemas landing in tables of corresponding schema.
Will add that in a different PR.