Spark Writes🔗
To use Iceberg in Spark, first configure Spark catalogs.
Some plans are only available when using Iceberg SQL extensions in Spark 3.
Iceberg uses Apache Spark's DataSourceV2 API for data source and catalog implementations. Spark DSv2 is an evolving API with different levels of support in Spark versions:
| Feature support | Spark 3 | Notes |
|---|---|---|
| SQL insert into | ✔️ | ⚠ Requires spark.sql.storeAssignmentPolicy=ANSI (default since Spark 3.0) |
| SQL merge into | ✔️ | ⚠ Requires Iceberg Spark extensions |
| SQL insert overwrite | ✔️ | ⚠ Requires spark.sql.storeAssignmentPolicy=ANSI (default since Spark 3.0) |
| SQL delete from | ✔️ | ⚠ Row-level delete requires Iceberg Spark extensions |
| SQL update | ✔️ | ⚠ Requires Iceberg Spark extensions |
| DataFrame append | ✔️ | |
| DataFrame overwrite | ✔️ | |
| DataFrame CTAS and RTAS | ✔️ | ⚠ Requires DSv2 API |
Writing with SQL🔗
Spark 3 supports SQL INSERT INTO, MERGE INTO, and INSERT OVERWRITE, as well as the new DataFrameWriterV2 API.
INSERT INTO🔗
To append new data to a table, use INSERT INTO.
MERGE INTO🔗
Spark 3 added support for MERGE INTO queries that can express row-level updates.
Iceberg supports MERGE INTO by rewriting data files that contain rows that need to be updated in an overwrite commit.
MERGE INTO is recommended instead of INSERT OVERWRITE because Iceberg can replace only the affected data files, and because the data overwritten by a dynamic overwrite may change if the table's partitioning changes.
MERGE INTO syntax🔗
MERGE INTO updates a table, called the target table, using a set of updates from another query, called the source. The update for a row in the target table is found using the ON clause that is like a join condition.
MERGE INTO prod.db.target t -- a target table
USING (SELECT ...) s -- the source updates
ON t.id = s.id -- condition to find updates for target rows
WHEN ... -- updates
Updates to rows in the target table are listed using WHEN MATCHED ... THEN .... Multiple MATCHED clauses can be added with conditions that determine when each match should be applied. The first matching expression is used.
WHEN MATCHED AND s.op = 'delete' THEN DELETE
WHEN MATCHED AND t.count IS NULL AND s.op = 'increment' THEN UPDATE SET t.count = 0
WHEN MATCHED AND s.op = 'increment' THEN UPDATE SET t.count = t.count + 1
Source rows (updates) that do not match can be inserted:
Inserts also support additional conditions:
Only one record in the source data can update any given row of the target table, or else an error will be thrown.
INSERT OVERWRITE🔗
INSERT OVERWRITE can replace data in the table with the result of a query. Overwrites are atomic operations for Iceberg tables.
The partitions that will be replaced by INSERT OVERWRITE depends on Spark's partition overwrite mode and the partitioning of a table. MERGE INTO can rewrite only affected data files and has more easily understood behavior, so it is recommended instead of INSERT OVERWRITE.
Overwrite behavior🔗
Spark's default overwrite mode is static, but dynamic overwrite mode is recommended when writing to Iceberg tables. Static overwrite mode determines which partitions to overwrite in a table by converting the PARTITION clause to a filter, but the PARTITION clause can only reference table columns.
Dynamic overwrite mode is configured by setting spark.sql.sources.partitionOverwriteMode=dynamic.
To demonstrate the behavior of dynamic and static overwrites, consider a logs table defined by the following DDL:
CREATE TABLE prod.my_app.logs (
uuid string NOT NULL,
level string NOT NULL,
ts timestamp NOT NULL,
message string)
USING iceberg
PARTITIONED BY (level, hours(ts))
Dynamic overwrite🔗
When Spark's overwrite mode is dynamic, partitions that have rows produced by the SELECT query will be replaced.
For example, this query removes duplicate log events from the example logs table.
INSERT OVERWRITE prod.my_app.logs
SELECT uuid, first(level), first(ts), first(message)
FROM prod.my_app.logs
WHERE cast(ts as date) = '2020-07-01'
GROUP BY uuid
In dynamic mode, this will replace any partition with rows in the SELECT result. Because the date of all rows is restricted to 1 July, only hours of that day will be replaced.
Static overwrite🔗
When Spark's overwrite mode is static, the PARTITION clause is converted to a filter that is used to delete from the table. If the PARTITION clause is omitted, all partitions will be replaced.
Because there is no PARTITION clause in the query above, it will drop all existing rows in the table when run in static mode, but will only write the logs from 1 July.
To overwrite just the partitions that were loaded, add a PARTITION clause that aligns with the SELECT query filter:
INSERT OVERWRITE prod.my_app.logs
PARTITION (level = 'INFO')
SELECT uuid, first(level), first(ts), first(message)
FROM prod.my_app.logs
WHERE level = 'INFO'
GROUP BY uuid
Note that this mode cannot replace hourly partitions like the dynamic example query because the PARTITION clause can only reference table columns, not hidden partitions.
DELETE FROM🔗
Spark 3 added support for DELETE FROM queries to remove data from tables.
Delete queries accept a filter to match rows to delete.
DELETE FROM prod.db.table
WHERE ts >= '2020-05-01 00:00:00' and ts < '2020-06-01 00:00:00'
DELETE FROM prod.db.all_events
WHERE session_time < (SELECT min(session_time) FROM prod.db.good_events)
DELETE FROM prod.db.orders AS t1
WHERE EXISTS (SELECT oid FROM prod.db.returned_orders WHERE t1.oid = oid)
If the delete filter matches entire partitions of the table, Iceberg will perform a metadata-only delete. If the filter matches individual rows of a table, then Iceberg will rewrite only the affected data files.
UPDATE🔗
Update queries accept a filter to match rows to update.
UPDATE prod.db.table
SET c1 = 'update_c1', c2 = 'update_c2'
WHERE ts >= '2020-05-01 00:00:00' and ts < '2020-06-01 00:00:00'
UPDATE prod.db.all_events
SET session_time = 0, ignored = true
WHERE session_time < (SELECT min(session_time) FROM prod.db.good_events)
UPDATE prod.db.orders AS t1
SET order_status = 'returned'
WHERE EXISTS (SELECT oid FROM prod.db.returned_orders WHERE t1.oid = oid)
For more complex row-level updates based on incoming data, s