Skip to content

Spark Writes🔗

To use Iceberg in Spark, first configure Spark catalogs.

Some plans are only available when using Iceberg SQL extensions.

Iceberg uses Apache Spark's DataSourceV2 API for data source and catalog implementations. Spark DSv2 is an evolving API with different levels of support in Spark versions:

Feature support Spark Notes
SQL insert into ✔️ ⚠ Requires spark.sql.storeAssignmentPolicy=ANSI (default since Spark 3.0)
SQL merge into ✔️ ⚠ Requires Iceberg Spark extensions
SQL insert overwrite ✔️ ⚠ Requires spark.sql.storeAssignmentPolicy=ANSI (default since Spark 3.0)
SQL delete from ✔️ ⚠ Row-level delete requires Iceberg Spark extensions
SQL update ✔️ ⚠ Requires Iceberg Spark extensions
DataFrame append ✔️
DataFrame overwrite ✔️
DataFrame CTAS and RTAS ✔️ ⚠ Requires DSv2 API
DataFrame merge into ✔️ ⚠ Requires DSv2 API (Spark 4.0 and later)

Writing with SQL🔗

Spark supports SQL INSERT INTO, MERGE INTO, and INSERT OVERWRITE, as well as the new DataFrameWriterV2 API.

INSERT INTO🔗

To append new data to a table, use INSERT INTO.

INSERT INTO prod.db.table VALUES (1, 'a'), (2, 'b')
INSERT INTO prod.db.table SELECT ...

MERGE INTO🔗

Spark supports MERGE INTO queries that can express row-level updates.

Iceberg supports MERGE INTO by rewriting data files that contain rows that need to be updated in an overwrite commit.

MERGE INTO is recommended instead of INSERT OVERWRITE because Iceberg can replace only the affected data files, and because the data overwritten by a dynamic overwrite may change if the table's partitioning changes.

MERGE INTO syntax🔗

MERGE INTO updates a table, called the target table, using a set of updates from another query, called the source. The update for a row in the target table is found using the ON clause that is like a join condition.

MERGE INTO prod.db.target t   -- a target table
USING (SELECT ...) s          -- the source updates
ON t.id = s.id                -- condition to find updates for target rows
WHEN ...                      -- updates

Updates to rows in the target table are listed using WHEN MATCHED ... THEN .... Multiple MATCHED clauses can be added with conditions that determine when each match should be applied. The first matching expression is used.

WHEN MATCHED AND s.op = 'delete' THEN DELETE
WHEN MATCHED AND t.count IS NULL AND s.op = 'increment' THEN UPDATE SET t.count = 0
WHEN MATCHED AND s.op = 'increment' THEN UPDATE SET t.count = t.count + 1

Source rows (updates) that do not match can be inserted:

WHEN NOT MATCHED THEN INSERT *

Inserts also support additional conditions:

WHEN NOT MATCHED AND s.event_time > still_valid_threshold THEN INSERT (id, count) VALUES (s.id, 1)

Only one record in the source data can update any given row of the target table, or else an error will be thrown.

Spark 3.5 added support for WHEN NOT MATCHED BY SOURCE ... THEN ... to update or delete rows that are not present in the source data:

WHEN NOT MATCHED BY SOURCE THEN UPDATE SET status = 'invalid'

Snapshot summary🔗

After a MERGE INTO commit, the snapshot summary may include the following fields. Each value is the string form of a non-negative count. A field is omitted when the value is unknown (e.g., not reported by Spark).

Info

Only available in Spark 4.1 and higher.

Field Description
spark.merge-into.num-target-rows-copied Number of target rows copied unmodified because they did not match any action
spark.merge-into.num-target-rows-deleted Number of target rows deleted
spark.merge-into.num-target-rows-updated Number of target rows updated
spark.merge-into.num-target-rows-inserted Number of target rows inserted
spark.merge-into.num-target-rows-matched-updated Number of target rows updated by a MATCHED clause
spark.merge-into.num-target-rows-matched-deleted Number of target rows deleted by a MATCHED clause
spark.merge-into.num-target-rows-not-matched-by-source-updated Number of target rows updated by a NOT MATCHED BY SOURCE clause
spark.merge-into.num-target-rows-not-matched-by-source-deleted Number of target rows deleted by a NOT MATCHED BY SOURCE clause

INSERT OVERWRITE🔗

INSERT OVERWRITE can replace data in the table with the result of a query. Overwrites are atomic operations for Iceberg tables.

The partitions that will be replaced by INSERT OVERWRITE depends on Spark's partition overwrite mode and the partitioning of a table. MERGE INTO can rewrite only affected data files and has more easily understood behavior, so it is recommended instead of INSERT OVERWRITE.

Overwrite behavior🔗

Spark's default overwrite mode is static, but dynamic overwrite mode is recommended when writing to Iceberg tables. Static overwrite mode determines which partitions to overwrite in a table by converting the PARTITION clause to a filter, but the PARTITION clause can only reference table columns.

Dynamic overwrite mode is configured by setting spark.sql.sources.partitionOverwriteMode=dynamic.

To demonstrate the behavior of dynamic and static overwrites, consider a logs table defined by the following DDL:

CREATE TABLE prod.my_app.logs (
    uuid string NOT NULL,
    level string NOT NULL,
    ts timestamp NOT NULL,
    message string)
USING iceberg
PARTITIONED BY (level, hours(ts))

Dynamic overwrite🔗

When Spark's overwrite mode is dynamic, partitions that have rows produced by the SELECT query will be replaced.

For example, this query removes duplicate log events from the example logs table.

INSERT OVERWRITE prod.my_app.logs
SELECT uuid, first(level), first(ts), first(message)
FROM prod.my_app.logs
WHERE cast(ts as date) = '2020-07-01'
GROUP BY uuid

In dynamic mode, this will replace any partition with rows in the SELECT result. Because the date of all rows is restricted to 1 July, only hours of that day will be replaced.

Static overwrite🔗

When Spark's overwrite mode is static, the PARTITION clause is converted to a filter that is used to delete from the table. If the PARTITION clause is omitted, all partitions will be replaced.

Because there is no PARTITION clause in the query above