Setting table write order

DATA ENGINEERING

This recipe shows you how to set a table’s write order to instruct all writers — including background services like compaction — on how the table’s rows should be ordered and clustered.

Why table write order?

If you’ve maintained a data pipeline in Apache Spark, there’s a good chance you’ve come across an odd pattern: an ORDER BY in the data passed to an INSERT command.

INSERT INTO logs
SELECT message, level, event_ts, line_no
FROM (...)
ORDER BY level, event_ts

That’s strange because there’s no requirement to actually perform the sort. Once the rows are written to a table, there is no order guarantee for query results unless a query has its own ORDER BY clause. Yet this pattern is not just common, it’s considered a best practice. The reason is that Spark (and other engines) will distribute and sort before writing and the resulting data files are clustered by the sort columns.

Clustering data is good for performance. Apache Iceberg tracks value ranges for columns in each data file and will prune files that don’t contain relevant rows. Clustering makes that pruning much more effective.

While clustering is good, this pattern for configuring it has problems.

  1. When the order for a table changes, jobs that write to the table must be updated.
  2. There may be multiple engines and services (such as compaction) that need to know how to cluster data. Each must be separately configured, which creates work and risks them being out of sync.

Iceberg solves these challenges with a declarative approach. You configure a table’s write order so that all engines and services can use the same ordering to cluster data at write time.

Setting a table’s write order

To set a table’s write order, use the WRITE ORDERED BY clause in an ALTER TABLE command:

ALTER TABLE logs WRITE ORDERED BY level, event_ts

This results in a global sort by the level and event_ts columns. This ordering is automatically applied when writing from Spark and Trino, and is also used by table maintenance procedures, such as rewrite_files that can run simple background compaction. In addition, the actual order adapts when the table’s partitioning changes – another benefit of a declarative approach.

There are a few things to keep in mind when using a write order:

  • The sort order is applied at write time to prepare data. Setting the order for a table doesn’t guarantee order for a result set at read time — that’s why it’s called the write order.
  • Engines use write order as an input but are not bound by it. For example, it doesn’t make sense to sort records in streaming applications, so Flink doesn’t implement write order.
  • You need Iceberg’s Spark SQL extensions enabled in order to use this extended DDL.

Distribution modes

In addition to setting the table’s sort order, this command also sets the table’s distribution mode property, write.distribution-mode to range. The table distribution mode controls how data is balanced across tasks for large writes. The range mode tells Spark to distribute by ranges of values, resulting in a global sort.

Calculating distribution ranges for a global sort can be expensive, so you can also specify either a local ordering (that is, just within tasks) or a hash distribution with a local sort.

ALTER TABLE logs WRITE LOCALLY ORDERED BY level, event_ts
ALTER TABLE logs WRITE DISTRIBUTED BY PARTITION
                       LOCALLY ORDERED BY level, event_ts

Best practices for write order

When setting a write order, there are a few best practices to keep in mind:

  1. Write order clusters data for more efficient queries. Put columns most likely to be used in filters at the start of your write order, and use the lowest cardinality columns first.
  2. End your write order with a high cardinality column, like an ID or event timestamp. This will make filters on that column more effective and also help engines (like older Spark versions) break up data into tasks.
  3. If needed, Iceberg will prepend partition columns to the order. You can omit them in your write order.