, ,

Table Maintenance: The Key To Keeping Your Iceberg Tables Healthy and Performant

snowy mountain ridge

Tables at scale have always required a disciplined approach to maintenance. Skilled data engineers have learned best practices to optimize table performance based on data characteristics and query patterns. In the past, this often involved working around the actual table format and table optimizations either lived with the data pipeline logic that loads the data, or as part of the custom maintenance logic highly tuned to a specific table and its requirements.

Iceberg takes a different approach and provides table maintenance procedures out of the box that allow performing powerful table optimizations in declarative fashion. This post will cover these declarative procedures and how the Iceberg-Spark runtime allows executing them using SQL commands.

To see these procedures in action, you can follow along using the getting-started docker setup described in an earlier post. Using the following docker-compose file, run docker-compose up.

docker-compose.yml

version: "3"

services:
  spark-iceberg:
    image: tabulario/spark-iceberg
    depends_on:
      - postgres
    container_name: spark-iceberg
    volumes:
      - ./warehouse:/home/iceberg/warehouse
      - ./notebooks:/home/iceberg/notebooks/notebooks
    ports:
      - 8888:8888
      - 8080:8080
      - 18080:18080
  postgres:
    image: postgres:13.4-bullseye
    container_name: postgres
    environment:
      - POSTGRES_USER=admin
      - POSTGRES_PASSWORD=password
      - POSTGRES_DB=demo_catalog
    volumes:
      - ./postgres/data:/var/lib/postgresql/data

The notebook server can be found running at localhost:8888 with a fully runnable notebook titled Iceberg - Table Maintenance.ipynb.

Creating a Sample Table

Start by creating a table to load NYC Taxi and Limousine Commision Trip Record Data
contained in the registry of open data on AWS.
In order to simulate a table with small files, set the write.target-file-size-bytes table property to ~5MB.

spark.sql("""
CREATE TABLE demo.nyc.taxis_sample (
  `VendorID` BIGINT,
  `tpep_pickup_datetime` TIMESTAMP,
  `tpep_dropoff_datetime` TIMESTAMP,
  `passenger_count` DOUBLE,
  `trip_distance` DOUBLE,
  `RatecodeID` DOUBLE,
  `store_and_fwd_flag` STRING,
  `PULocationID` BIGINT,
  `DOLocationID` BIGINT,
  `payment_type` BIGINT,
  `fare_amount` DOUBLE,
  `extra` DOUBLE,
  `mta_tax` DOUBLE,
  `tip_amount` DOUBLE,
  `tolls_amount` DOUBLE,
  `improvement_surcharge` DOUBLE,
  `total_amount` DOUBLE,
  `congestion_surcharge` DOUBLE,
  `airport_fee` DOUBLE)
USING iceberg
LOCATION '/home/iceberg/warehouse/nyc/taxis_sample'
TBLPROPERTIES(
  'write.target-file-size-bytes'='5242880'
)
""")

Parquet files for the past year of trip data are included in the tabulario/spark-iceberg image.

ls -Shs /home/iceberg/data/yellow_tripdata_*.parquet

output:

54M /home/iceberg/data/yellow_tripdata_2022-03.parquet
53M /home/iceberg/data/yellow_tripdata_2022-04.parquet
44M /home/iceberg/data/yellow_tripdata_2021-04.parquet
44M /home/iceberg/data/yellow_tripdata_2021-05.parquet
44M /home/iceberg/data/yellow_tripdata_2021-06.parquet
44M /home/iceberg/data/yellow_tripdata_2021-07.parquet
44M /home/iceberg/data/yellow_tripdata_2021-08.parquet
44M /home/iceberg/data/yellow_tripdata_2021-09.parquet
44M /home/iceberg/data/yellow_tripdata_2021-10.parquet
44M /home/iceberg/data/yellow_tripdata_2021-11.parquet
44M /home/iceberg/data/yellow_tripdata_2021-12.parquet
44M /home/iceberg/data/yellow_tripdata_2022-02.parquet
37M /home/iceberg/data/yellow_tripdata_2022-01.parquet

Note: If you are running the shell commands in a jupyter notebook, remember to begin the command with an exclamation point–for example !ls.

Load the data for January, February, and March of 2022 and insert it into the nyc.taxis_sample table.

val df_202201 = spark.read.parquet("/home/iceberg/data/yellow_tripdata_2022-01.parquet")
val df_202202 = spark.read.parquet("/home/iceberg/data/yellow_tripdata_2022-02.parquet")
val df_202203 = spark.read.parquet("/home/iceberg/data/yellow_tripdata_2022-03.parquet")
val df_q1 = df_202201.union(df_202202).union(df_202203)
df_q1.write.insertInto("nyc.taxis_sample")

Rewriting Data Files

Iceberg tables include a metadata files table that includes lots of useful information about the underlying data files. Query the files table to see the data files and file sizes for the nyc.taxis_sample table.

spark.sql("SELECT file_path, file_size_in_bytes FROM nyc.taxis_sample.files").show(100)

output:

+--------------------+------------------+
|           file_path|file_size_in_bytes|
+--------------------+------------------+
|/home/iceberg/war...|           4181875|
|/home/iceberg/war...|           4005835|
|/home/iceberg/war...|           4073450|
|/home/iceberg/war...|           3946420|
|/home/iceberg/war...|           3981032|
|/home/iceberg/war...|           3967325|
|/home/iceberg/war...|           3843492|
|/home/iceberg/war...|           3958067|
|/home/iceberg/war...|           3879571|
|/home/iceberg/war...|           4020438|
|/home/iceberg/war...|           1242185|
|/home/iceberg/war...|           3932652|
|/home/iceberg/war...|           3763271|
|/home/iceberg/war...|           3945090|
|/home/iceberg/war...|           3755812|
|/home/iceberg/war...|           3765442|
|/home/iceberg/war...|           3874506|
|/home/iceberg/war...|           3763194|
|/home/iceberg/war...|           3801471|
|/home/iceberg/war...|           3918764|
|/home/iceberg/war...|           3788822|
|/home/iceberg/war...|           3791834|
|/home/iceberg/war...|           3826818|
|/home/iceberg/war...|           2154343|
|/home/iceberg/war...|           3802070|
|/home/iceberg/war...|           3661636|
|/home/iceberg/war...|           3891395|
|/home/iceberg/war...|           3783518|
|/home/iceberg/war...|           3773159|
|/home/iceberg/war...|           3778178|
|/home/iceberg/war...|           3884718|
|/home/iceberg/war...|           3826138|
|/home/iceberg/war...|           3782716|
|/home/iceberg/war...|           3941177|
|/home/iceberg/war...|           3757253|
|/home/iceberg/war...|           3786523|
|/home/iceberg/war...|           3802603|
|/home/iceberg/war...|           3734727|
|/home/iceberg/war...|           4869774|
+--------------------+------------------+

Looking at the size of the data files shows that they’re only a few MB each. Now, this is because of the write.target-file-size-bytes table property used for the sake of this demo, but
in practice there are a number of ways one could end up with tables containing files that are too small or too large. Small files can have a major impact on query performance by creating large
metadata overhead. Analyzing a query involves using each file’s metadata to calculate how many splits are required and where to schedule each task to maximize data localization. The more files,
the longer this part of query planning will take. Large files, on the other hand, can also cause significantly decreased performance by limiting parallelism. A good post that covers many of these
scenarios is Optimizing data warehouse storage from the Netflix Technology Blog.

Since the table now has small files, the table property can be unset.

spark.sql("ALTER TABLE nyc.taxis_sample UNSET TBLPROPERTIES ('write.target-file-size-bytes')")

Rewriting the data files using the rewrite_data_files procedure is straightforward when accepting the default algorithm of bin-packing small files and splitting large files based on the default write size of the table, in which case only the table name is required. However for this example, also specify a file size target using the target-file-size-bytes option with a value of ~50MB.

spark.sql("CALL demo.system.rewrite_data_files(table => 'nyc.taxis_sample', options => map('target-file-size-bytes','52428800'))").show()

output:

+--------------------------+----------------------+
|rewritten_data_files_count|added_data_files_count|
+--------------------------+----------------------+
|                        39|                     4|
+--------------------------+----------------------+

Take another look at the files table.

spark.sql("SELECT file_path, file_size_in_bytes FROM nyc.taxis_sample.files").show()

output:

+--------------------+------------------+
|           file_path|file_size_in_bytes|
+--------------------+------------------+
|/home/iceberg/war...|          46185713|
|/home/iceberg/war...|           4869774|
|/home/iceberg/war...|          46449079|
|/home/iceberg/war...|          44439258|
+--------------------+------------------+

As described by the output from the rewrite_data_files procedure call, the 39 smaller data files have been compacted to 4 files.

Expiring Snapshots

When a system includes the concept of table snapshots, it’s intuitive to consider the storage implications of keeping data around that may not be a part of the table’s current state. With Iceberg, this problem is greatly minimized by the fact that Iceberg snapshots re-use unchanged data files from previous snapshots. However, new snapshots often require rewriting or removing data files, making these files only relevant to historical snapshots. These files accumulate over time and add to the overhead of keeping older snapshots around indefinitely.

The current state of the nyc.taxis_sample table is a great example of this. After compaction, there are only 4 larger files, much less than the 39 smaller files before the compaction. However, looking at the directory where the data lives shows that those 39 smaller files still exist!

ls -Shs /home/iceberg/warehouse/nyc/taxis_sample/data

output:

total 293M
 46M 00000-28-a4a4fa54-b114-4064-993b-0e6a28411eef-00001.parquet
 45M 00001-29-60657418-5bb1-494f-a1e3-2a8bb5e0242c-00001.parquet
 44M 00002-30-a83491c4-75fe-45f6-8d81-d4eede907d9a-00001.parquet
5.1M 00003-31-a52999e1-a997-415a-8035-442bbe4ccd0b-00001.parquet
5.1M 00019-22-44cb1e14-8a40-4c52-971f-a4d57df6f5c0-00015.parquet
4.1M 00003-6-c5e529b1-7f8c-45e9-a944-80840ed106a4-00001.parquet
4.1M 00003-6-c5e529b1-7f8c-45e9-a944-80840ed106a4-00003.parquet
4.1M 00003-6-c5e529b1-7f8c-45e9-a944-80840ed106a4-00010.parquet
4.1M 00003-6-c5e529b1-7f8c-45e9-a944-80840ed106a4-00002.parquet
4.1M 00003-6-c5e529b1-7f8c-45e9-a944-80840ed106a4-00005.parquet
4.1M 00003-6-c5e529b1-7f8c-45e9-a944-80840ed106a4-00006.parquet
4.1M 00003-6-c5e529b1-7f8c-45e9-a944-80840ed106a4-00008.parquet
4.1M 00003-6-c5e529b1-7f8c-45e9-a944-80840ed106a4-00004.parquet
4.1M 00011-14-48631ab2-d51d-4d6d-ba2c-8aa312148d8c-00003.parquet
4.1M 00019-22-44cb1e14-8a40-4c52-971f-a4d57df6f5c0-00010.parquet
4.1M 00011-14-48631ab2-d51d-4d6d-ba2c-8aa312148d8c-00001.parquet
4.1M 00011-14-48631ab2-d51d-4d6d-ba2c-8aa312148d8c-00009.parquet
4.1M 00019-22-44cb1e14-8a40-4c52-971f-a4d57df6f5c0-00003.parquet
4.1M 00019-22-44cb1e14-8a40-4c52-971f-a4d57df6f5c0-00007.parquet
4.1M 00003-6-c5e529b1-7f8c-45e9-a944-80840ed106a4-00009.parquet
4.1M 00011-14-48631ab2-d51d-4d6d-ba2c-8aa312148d8c-00006.parquet
4.1M 00003-6-c5e529b1-7f8c-45e9-a944-80840ed106a4-00007.parquet
4.1M 00011-14-48631ab2-d51d-4d6d-ba2c-8aa312148d8c-00012.parquet
4.1M 00019-22-44cb1e14-8a40-4c52-971f-a4d57df6f5c0-00008.parquet
4.1M 00019-22-44cb1e14-8a40-4c52-971f-a4d57df6f5c0-00013.parquet
4.1M 00019-22-44cb1e14-8a40-4c52-971f-a4d57df6f5c0-00001.parquet
4.1M 00011-14-48631ab2-d51d-4d6d-ba2c-8aa312148d8c-00008.parquet
4.1M 00011-14-48631ab2-d51d-4d6d-ba2c-8aa312148d8c-00011.parquet
4.1M 00011-14-48631ab2-d51d-4d6d-ba2c-8aa312148d8c-00010.parquet
4.2M 00019-22-44cb1e14-8a40-4c52-971f-a4d57df6f5c0-00012.parquet
4.1M 00019-22-44cb1e14-8a40-4c52-971f-a4d57df6f5c0-00004.parquet
4.3M 00019-22-44cb1e14-8a40-4c52-971f-a4d57df6f5c0-00009.parquet
4.1M 00019-22-44cb1e14-8a40-4c52-971f-a4d57df6f5c0-00006.parquet
4.1M 00019-22-44cb1e14-8a40-4c52-971f-a4d57df6f5c0-00005.parquet
4.1M 00011-14-48631ab2-d51d-4d6d-ba2c-8aa312148d8c-00005.parquet
4.1M 00011-14-48631ab2-d51d-4d6d-ba2c-8aa312148d8c-00002.parquet
4.1M 00011-14-48631ab2-d51d-4d6d-ba2c-8aa312148d8c-00007.parquet
4.1M 00019-22-44cb1e14-8a40-4c52-971f-a4d57df6f5c0-00011.parquet
4.1M 00011-14-48631ab2-d51d-4d6d-ba2c-8aa312148d8c-00004.parquet
4.1M 00019-22-44cb1e14-8a40-4c52-971f-a4d57df6f5c0-00014.parquet
4.2M 00019-22-44cb1e14-8a40-4c52-971f-a4d57df6f5c0-00002.parquet
3.1M 00011-14-48631ab2-d51d-4d6d-ba2c-8aa312148d8c-00013.parquet
1.2M 00003-6-c5e529b1-7f8c-45e9-a944-80840ed106a4-00011.parquet

This is because the previous snapshot that references these files still exists. That means these files must be preserved to allow
for time-travel queries and rolling back to the previous snapshot. The previous snapshot can be seen by looking at the snapshots table.

spark.sql("SELECT committed_at, snapshot_id, operation FROM nyc.taxis_sample.snapshots").show(truncate=false)

output:

+-----------------------+-------------------+---------+
|committed_at           |snapshot_id        |operation|
+-----------------------+-------------------+---------+
|2022-07-16 18:19:43.407|5498522437864970329|append   |
|2022-07-16 18:21:08.215|1469784638096125204|replace  |
+-----------------------+-------------------+---------+

Fortunately, once the older snapshot is expired, the related data files will be cleaned up as well. Older snapshots can be expired using the expire_snapshots procedure. In its simplest form, expire_snapshots takes just the table name with sensible defaults that remove snapshots older than 5 days.

For demonstration purposes, override the default value for the older_than and retain_last options and use the current time for older_than so that all snapshots are included. Also, set retain_last to 1 so that the most recent snapshot is not expired. In other words, the action will expire all snapshots except the latest one.

val now = java.util.Calendar.getInstance().getTime()
val format = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
val now_str = format.format(now)

spark.sql(s"CALL demo.system.expire_snapshots(table => 'nyc.taxis_sample', older_than => TIMESTAMP '$now_str', retain_last => 1)").show()

output:

+------------------------+----------------------------+----------------------------+
|deleted_data_files_count|deleted_manifest_files_count|deleted_manifest_lists_count|
+------------------------+----------------------------+----------------------------+
|                      39|                           1|                           1|
+------------------------+----------------------------+----------------------------+

The output from the expire_snapshots procedure describes that 39 data files were deleted which are all of the data files associated with the previous uncompacted snapshot. A look at the snapshots table now shows only a single snapshot. This is the snapshot created by the compaction event and is the current
snapshot of the table.

spark.sql("SELECT committed_at, snapshot_id, operation FROM nyc.taxis_sample.snapshots").show(truncate=false)

output:

+-----------------------+-------------------+---------+
|committed_at           |snapshot_id        |operation|
+-----------------------+-------------------+---------+
|2022-07-16 18:21:08.215|1469784638096125204|replace  |
+-----------------------+-------------------+---------+

Likewise, a peek at the data directory confirms that the files from the older snapshot no longer exist.

ls -Shs /home/iceberg/warehouse/nyc/taxis_sample/data

output:

total 138M
 46M 00000-28-a4a4fa54-b114-4064-993b-0e6a28411eef-00001.parquet
 45M 00001-29-60657418-5bb1-494f-a1e3-2a8bb5e0242c-00001.parquet
 44M 00002-30-a83491c4-75fe-45f6-8d81-d4eede907d9a-00001.parquet
5.1M 00003-31-a52999e1-a997-415a-8035-442bbe4ccd0b-00001.parquet

Removing Orphan Files

In the previous section, storage was optimized by expiring older snapshots and allowing Iceberg to identify and remove
data files no longer needed by the remaining snapshots. However, what if data files exist in the table directory that are not tracked by Iceberg or referenced by any snapshot? These kind of files are referred to as “orphan files”. There are many ways a table’s directory can end up containing orphan files. One example would be if the original parquet files from the NYC taxi open dataset were stored in the table’s directory before being loaded. Once the dataframe was saved as a table, the table’s metadata would have no knowledge of the original data files used to construct the dataframe.

Determining which files are actually orphan files is not a trivial task and requires considering all snapshots of a table. Iceberg allows removing orphan files by calling the remove_orphan_files procedure. The procedure defaults to only removing orphan files older than three days and returns a table with a single column called orphan_file_location that shows the full file path of each orphan file removed.

Calling remove_orphan_files on the nyc.taxis_sample table does not show any results since the table’s directory does not contain any orphan files.

spark.sql(s"CALL demo.system.remove_orphan_files(table => 'nyc.taxis_sample')").show(truncate=false)

output:

+--------------------+
|orphan_file_location|
+--------------------+
+--------------------+

To simulate the existence of orphan files, manually create three files in the table’s data directory. Also, set the timestamp on the files to make sure they’re older than 24 hours, otherwise the
remove_orphan_files procedure won’t remove them and will instead warn that removing files so recently written could interfere with currently running jobs that may be writing to the table.

touch -d "2022-01-01 00:00:00.000000000" /home/iceberg/warehouse/nyc/taxis_sample/data/i-shouldnt-be-here-1.parquet /home/iceberg/warehouse/nyc/taxis_sample/data/i-shouldnt-be-here-2.parquet /home/iceberg/warehouse/nyc/taxis_sample/data/i-shouldnt-be-here-3.parquet

Now the data directory for the nyc.taxis_sample table contains the orphan files.

ls -Shs /home/iceberg/warehouse/nyc/taxis_sample/data/

output:

total 138M
 46M 00000-28-a4a4fa54-b114-4064-993b-0e6a28411eef-00001.parquet
 45M 00001-29-60657418-5bb1-494f-a1e3-2a8bb5e0242c-00001.parquet
 44M 00002-30-a83491c4-75fe-45f6-8d81-d4eede907d9a-00001.parquet
5.1M 00003-31-a52999e1-a997-415a-8035-442bbe4ccd0b-00001.parquet
4.0K i-shouldnt-be-here-1.parquet
4.0K i-shouldnt-be-here-2.parquet
4.0K i-shouldnt-be-here-3.parquet

Go ahead and try a dry run of the remove_orphan_files procedure by including dry_run => true.

spark.sql(s"CALL demo.system.remove_orphan_files(table => 'nyc.taxis_sample', dry_run => true)").show(truncate=false)

output:

+-------------------------------------------------------------------------------+
|orphan_file_location                                                           |
+-------------------------------------------------------------------------------+
|file:/home/iceberg/warehouse/nyc/taxis_sample/data/i-shouldnt-be-here-1.parquet|
|file:/home/iceberg/warehouse/nyc/taxis_sample/data/i-shouldnt-be-here-2.parquet|
|file:/home/iceberg/warehouse/nyc/taxis_sample/data/i-shouldnt-be-here-3.parquet|
+-------------------------------------------------------------------------------+

The dry run shows that the orphan files we created would be removed by the procedure. When ready to remove them, simply re-run the remove_orphan_files
procedure without the dry_run flag.

spark.sql(s"CALL demo.system.remove_orphan_files(table => 'nyc.taxis_sample')").show(truncate=false)

output:

+-------------------------------------------------------------------------------+
|orphan_file_location                                                           |
+-------------------------------------------------------------------------------+
|file:/home/iceberg/warehouse/nyc/taxis_sample/data/i-shouldnt-be-here-1.parquet|
|file:/home/iceberg/warehouse/nyc/taxis_sample/data/i-shouldnt-be-here-2.parquet|
|file:/home/iceberg/warehouse/nyc/taxis_sample/data/i-shouldnt-be-here-3.parquet|
+-------------------------------------------------------------------------------+

Another look at the data directory shows that the orphan files were removed.

ls -Shs /home/iceberg/warehouse/nyc/taxis_sample/data/

output:

total 138M
 46M 00000-28-a4a4fa54-b114-4064-993b-0e6a28411eef-00001.parquet
 45M 00001-29-60657418-5bb1-494f-a1e3-2a8bb5e0242c-00001.parquet
 44M 00002-30-a83491c4-75fe-45f6-8d81-d4eede907d9a-00001.parquet
5.1M 00003-31-a52999e1-a997-415a-8035-442bbe4ccd0b-00001.parquet

Rewriting Manifest Files

Without getting too much into the weeds about how Iceberg metadata is structured, maintaining ideal manifest file sizes is very important for optimal scan planning. This can be achieved by running the rewrite_manifests procedure which only requires providing the table name.

spark.sql("CALL demo.system.rewrite_manifests('nyc.taxis_sample')").show()

output:

+-------------------------+---------------------+
|rewritten_manifests_count|added_manifests_count|
+-------------------------+---------------------+
|                        2|                    1|
+-------------------------+---------------------+

That’s it! The manifest files have all been rewritten to optimal sizes.

Note: For an overview on how Iceberg metadata works, take a look at one of our previous blog posts Metadata Indexing in Iceberg by Ryan Blue.

Conclusion

There are many factors that inform which maintenance procedures should be used and how often they should be performed. Query patterns, table size, performance requirements, storage type, regulatory requirements, security policies, and many more things can influence how a table’s data and metadata must be maintained.

Iceberg aims to make it easier to think about the harder questions regarding what kind of maintenance an individual table requires instead of getting bogged down in the how. Sensible defaults allow performing maintenance actions that optimize the most typical table usage patterns and are very valuable before query patterns can be established. Iceberg’s table maintenance procedures are a great example of the Iceberg community’s approach to declarative data engineering and the philosophy that data professionals should be empowered to think more about the actual data and less about its technical maintenance.

If you have thoughts or ideas to share, or just want to drop by to say hello, the Iceberg community is a very welcoming one and you can find ways to join by checking out our community page.