The Apache Iceberg community just released version 0.13 and there’s a lot of great new additions! In this post, I’ll cover some of the highlights that make this a great release.
- Improved multi-engine support
- Declarative data engineering
- Clustering and sorting as configuration
- Merge and update improvements
- Delete deltas
- And more!
Improved multi-engine support
Iceberg 0.13 adds support for Spark 3.2 and all supported versions of Flink, up through 1.14. There have been significant improvements to both Spark and Flink and it is great to have support for the latest versions. I’ll cover more about those new features shortly, but first I want to point out how the community is improving support across all versions.
Before 0.13, the community maintained compatibility by sharing modules across Spark or Flink versions. That’s why the
iceberg-spark3-runtime module worked with both Spark 3.0 and 3.1. This required fewer lines of code overall, but introduced far too many headaches. For example, to use the same Jar in multiple Spark versions, Iceberg had to use shim classes and reflection to call the right Spark methods. Worse, it was harder to implement new features because required interfaces weren’t in every supported version.
On this path, Iceberg was left with a bad trade-off: old versions would have to be dropped to support new features.
Instead, the community decided to maintain a new module for each engine version. That leads to more code duplication, but makes supporting new features far simpler. It also makes Iceberg more reliable by replacing reflective calls with compile-time validation.
This new structure for Spark support came just in time. Spark 3.2 is a big step forward, mostly in features that support Iceberg’s vision for declarative data engineering.
Declarative data engineering
Declarative data engineering is the (not very original) idea that you should tell your data infrastructure what to do, not how to do it. This same principle underlies many of other frameworks and is extremely prevalent in the DevOps space. What’s funny is that SQL is a declarative language—the problem is that it isn’t used that way in data engineering.
For example, consider a simple command to load data into a Hive-like table:
INSERT INTO logs SELECT event_ts, level, message, date_format(event_ts, 'yyyy-MM-dd') as event_day FROM log_source ORDER BY date_format(event_ts, 'yyyy-MM-dd'), event_ts
Notice the extra column for partitioning,
event_day. One of the early improvements Iceberg made was to make partitioning declarative using hidden partitions. In Iceberg, the table is configured to partition by
days(event_ts) automatically. Readers don’t need to tell Spark how to find files with extra predicates. And writers don’t need to tell Spark how to derive the partition values.
This approach is a big win. Did you notice the bug in the insert above? It uses the local time zone so when I ran that on my laptop, it inserted data in the wrong partition. Oops! That doesn’t happen with hidden partitioning.
Clustering and sorting as configuration
Spark 3.2 adds support for tables to request distribution and ordering and Iceberg 0.13 uses this feature to make a table’s write-time clustering and ordering declarative. The table’s configured write order is now used by inserts, updates, merges, and deletes.
This takes care of the annoying problem of needing to add a global
ORDER BY to avoid creating too many small files. Writes into bucketed tables will automatically cluster records to reduce both the number of files created and memory consumption.
To take advantage of of this, just set a table’s write order:
ALTER TABLE logs WRITE ORDERED BY event_ts
Combining hidden partitioning and table write order, the insert above looks much better:
INSERT INTO logs SELECT event_ts, level, message FROM log_source
As you can imagine, the benefits of a declarative pattern add up over time. If you want to start partitioning by log level, run a simple DDL statement and Iceberg will automatically pick up the configuration change and request the right incoming data clustering. No scheduled code needs to change!
Merge and update improvements
The newer MERGE and UPDATE commands are important for declarative data engineering because they offer a flexible way to tell Spark what changes to make to individual rows, instead of manually reading, joining, filtering, and overwriting sections of a table to accomplish the same task. MERGE and UPDATE aren’t supported by Spark, so the Iceberg community maintains SQL extensions that add the commands. Spark 3.2 released several features that help make those commands more native.
First, there is a new interface for dynamic pruning that Iceberg 0.13 now implements. Dynamic pruning uses join relationships to create and push down extra filters to tables. This can be really helpful when running large joins, like those used for MERGE statements. Iceberg support for older versions uses custom dynamic pruning for MERGE, but 3.2 and later will use the Spark-native feature for MERGE. Even better, this update also adds dynamic pruning for all joins with Iceberg tables, not just MERGE and UPDATE.
MERGE, UPDATE, and DELETE have also been overhauled to request distribution and ordering, to better support multiple partition specs in a table, and to use metadata columns to minimize data changes.
Metadata columns are also a new feature in Spark 3.2 that allow selecting metadata about each row, like the source data file (
_file) and position in that file (
Iceberg v2 added delete files that contain soft deletes—these tell readers which rows to skip because they’ve been replaced or removed. Delete files are used to make more targeted changes instead of rewriting large data files to update or remove a few rows (i.e. to reduce “write amplification”). The v2 spec was finalized and adopted last year and both Spark and Flink have been adding new uses.
0.13 adds a new Spark implementation of DELETE FROM that produces delete deltas instead of rewriting whole data files. This should be a huge help to anyone that wants to soft-delete rows immediately and accumulate those deletes for fewer big (and expensive) rewrites.
Similarly, 0.13 adds UPSERT support to Flink using the table identifier fields added in 0.12.
There are more additions than I can cover in detail here, like a stored procedure for data and delete compaction and new integrations with cloud providers. Check out the full release notes for the list. And thanks to the 90+ contributors that made 0.13 happen!