Kafka Connect🔗
Kafka Connect is a popular framework for moving data in and out of Apache Kafka via connectors. There are many different connectors available, such as the S3 sink for writing data from Kafka to S3 and Debezium source connectors for writing change data capture records from relational databases to Kafka.
It has a straightforward, decentralized, distributed architecture. A cluster consists of a number of worker processes, and a connector runs tasks on these processes to perform the work. Connector deployment is configuration driven, so generally no code needs to be written to run a connector.
Apache Iceberg Sink Connector🔗
The Apache Iceberg Sink Connector for Kafka Connect is a sink connector for writing data from Kafka into Iceberg tables.
Features🔗
- Commit coordination for centralized Iceberg commits
- Exactly-once delivery semantics
- Multi-table fan-out
- Automatic table creation and schema evolution
- Field name mapping via Iceberg’s column mapping functionality
Installation🔗
The connector zip archive is created as part of the Iceberg build. You can run the build via:
The zip archive will be found under./kafka-connect/kafka-connect-runtime/build/distributions. There is
one distribution that bundles the Hive Metastore client and related dependencies, and one that does not.
Copy the distribution archive into the Kafka Connect plugins directory on all nodes.
Requirements🔗
The sink relies on KIP-447 for exactly-once semantics. This requires Kafka 2.5 or later.
Configuration🔗
| Property | Description |
|---|---|
| iceberg.tables | Comma-separated list of destination tables |
| iceberg.tables.dynamic-enabled | Set to true to route to a table specified in routeField instead of using routeRegex, default is false |
| iceberg.tables.route-field | For multi-table fan-out, the name of the field used to route records to tables |
| iceberg.tables.default-commit-branch | Default branch for commits, main is used if not specified |
| iceberg.tables.default-id-columns | Default comma-separated list of columns that identify a row in tables (primary key) |
| iceberg.tables.default-partition-by | Default comma-separated list of partition field names to use when creating tables |
| iceberg.tables.auto-create-enabled | Set to true to automatically create destination tables, default is false |
| iceberg.tables.evolve-schema-enabled | Set to true to add any missing record fields to the table schema, default is false |
| iceberg.tables.schema-force-optional | Set to true to set columns as optional during table create and evolution, default is false to respect schema |
| iceberg.tables.schema-case-insensitive | Set to true to look up table columns by case-insensitive name, default is false for case-sensitive |
| iceberg.tables.auto-create-props.* | Properties set on new tables during auto-create |
| iceberg.tables.write-props.* | Properties passed through to Iceberg writer initialization, these take precedence |
| iceberg.table.\<table name>.commit-branch | Table-specific branch for commits, use iceberg.tables.default-commit-branch if not specified |
| iceberg.table.\<table name>.id-columns | Comma-separated list of columns that identify a row in the table (primary key) |
| iceberg.table.\<table name>.partition-by | Comma-separated list of partition fields to use when creating the table |
| iceberg.table.\<table name>.route-regex | The regex used to match a record's routeField to a table |
| iceberg.control.topic | Name of the control topic, default is control-iceberg |
| iceberg.control.group-id-prefix | Prefix for the control consumer group, default is cg-control |
| iceberg.control.commit.interval-ms | Commit interval in msec, default is 300,000 (5 min) |
| iceberg.control.commit.timeout-ms | Commit timeout interval in msec, default is 30,000 (30 sec) |
| iceberg.control.commit.threads | Number of threads to use for commits, default is (cores * 2) |
| iceberg.coordinator.transactional.prefix | Prefix for the transactional id to use for the coordinator producer, default is to use no/empty prefix |
| iceberg.catalog | Name of the catalog, default is iceberg |
| iceberg.catalog.* | Properties passed through to Iceberg catalog initialization |
| iceberg.hadoop-conf-dir | If specified, Hadoop config files in this directory will be loaded |
| iceberg.hadoop.* | Properties passed through to the Hadoop configuration |
| iceberg.kafka.* | Properties passed through to control topic Kafka client initialization |
If iceberg.tables.dynamic-enabled is false (the default) then you must specify iceberg.tables. If
iceberg.tables.dynamic-enabled is true then you must specify iceberg.tables.route-field which will
contain the name of the table.
Kafka configuration🔗
By default the connector will attempt to use Kafka client config from the worker properties for connecting to
the control topic. If that config cannot be read for some reason, Kafka client settings
can be set explicitly using iceberg.kafka.* properties.
Message format🔗
Messages should be converted to a struct or map using the appropriate Kafka Connect converter.
Catalog configuration🔗
The iceberg.catalog.* properties are required for connecting to the Iceberg catalog. The core catalog
types are included in the default distribution, including REST, Glue, DynamoDB, Hadoop, Nessie,
JDBC, Hive and BigQuery Metastore. JDBC drivers are not included in the default distribution, so you will need to include
those if needed. When using a Hive catalog, you can use the distribution that includes the Hive metastore client,
otherwise you will need to include that yourself.
To set the catalog type, you can set iceberg.catalog.type to rest, hive, or hadoop. For other
catalog types, you need to instead set iceberg.catalog.catalog-impl to the name of the catalog class.
REST example🔗
"iceberg.catalog.type": "rest",
"iceberg.catalog.uri": "https://catalog-service",
"iceberg.catalog.credential": "<credential>",
"iceberg.catalog.warehouse": "<warehouse>",
Hive example🔗
NOTE: Use the distribution that includes the HMS client (or include the HMS client yourself). Use S3FileIO when
using S3 for storage and GCSFileIO when using GCS (the default is HadoopFileIO with HiveCatalog).