Tables at scale have always required a disciplined approach to maintenance. Skilled data engineers have learned best practices to optimize table performance based on data characteristics and query patterns. In the past, this often involved working around the actual table format and table optimizations either lived with the data pipeline logic that loads the data, or as part of the custom maintenance logic highly tuned to a specific table and its requirements.
Iceberg takes a different approach and provides table maintenance procedures out of the box that allow performing powerful table optimizations in declarative fashion. This post will cover these declarative procedures and how the Iceberg-Spark runtime allows executing them using SQL commands.
To see these procedures in action, you can follow along using the getting-started docker setup described in an earlier post. Using the following docker-compose file, run docker-compose up
.
docker-compose.yml
version: "3"
services:
spark-iceberg:
image: tabulario/spark-iceberg
depends_on:
- postgres
container_name: spark-iceberg
volumes:
- ./warehouse:/home/iceberg/warehouse
- ./notebooks:/home/iceberg/notebooks/notebooks
ports:
- 8888:8888
- 8080:8080
- 18080:18080
postgres:
image: postgres:13.4-bullseye
container_name: postgres
environment:
- POSTGRES_USER=admin
- POSTGRES_PASSWORD=password
- POSTGRES_DB=demo_catalog
volumes:
- ./postgres/data:/var/lib/postgresql/data
The notebook server can be found running at localhost:8888 with a fully runnable notebook titled Iceberg - Table Maintenance.ipynb
.
Creating a Sample Table
Start by creating a table to load NYC Taxi and Limousine Commision Trip Record Data
contained in the registry of open data on AWS.
In order to simulate a table with small files, set the write.target-file-size-bytes
table property to ~5MB.
spark.sql("""
CREATE TABLE demo.nyc.taxis_sample (
`VendorID` BIGINT,
`tpep_pickup_datetime` TIMESTAMP,
`tpep_dropoff_datetime` TIMESTAMP,
`passenger_count` DOUBLE,
`trip_distance` DOUBLE,
`RatecodeID` DOUBLE,
`store_and_fwd_flag` STRING,
`PULocationID` BIGINT,
`DOLocationID` BIGINT,
`payment_type` BIGINT,
`fare_amount` DOUBLE,
`extra` DOUBLE,
`mta_tax` DOUBLE,
`tip_amount` DOUBLE,
`tolls_amount` DOUBLE,
`improvement_surcharge` DOUBLE,
`total_amount` DOUBLE,
`congestion_surcharge` DOUBLE,
`airport_fee` DOUBLE)
USING iceberg
LOCATION '/home/iceberg/warehouse/nyc/taxis_sample'
TBLPROPERTIES(
'write.target-file-size-bytes'='5242880'
)
""")
Parquet files for the past year of trip data are included in the tabulario/spark-iceberg
image.
ls -Shs /home/iceberg/data/yellow_tripdata_*.parquet
output:
54M /home/iceberg/data/yellow_tripdata_2022-03.parquet
53M /home/iceberg/data/yellow_tripdata_2022-04.parquet
44M /home/iceberg/data/yellow_tripdata_2021-04.parquet
44M /home/iceberg/data/yellow_tripdata_2021-05.parquet
44M /home/iceberg/data/yellow_tripdata_2021-06.parquet
44M /home/iceberg/data/yellow_tripdata_2021-07.parquet
44M /home/iceberg/data/yellow_tripdata_2021-08.parquet
44M /home/iceberg/data/yellow_tripdata_2021-09.parquet
44M /home/iceberg/data/yellow_tripdata_2021-10.parquet
44M /home/iceberg/data/yellow_tripdata_2021-11.parquet
44M /home/iceberg/data/yellow_tripdata_2021-12.parquet
44M /home/iceberg/data/yellow_tripdata_2022-02.parquet
37M /home/iceberg/data/yellow_tripdata_2022-01.parquet
Note: If you are running the shell commands in a jupyter notebook, remember to begin the command with an exclamation point–for example !ls
.
Load the data for January, February, and March of 2022 and insert it into the nyc.taxis_sample
table.
val df_202201 = spark.read.parquet("/home/iceberg/data/yellow_tripdata_2022-01.parquet")
val df_202202 = spark.read.parquet("/home/iceberg/data/yellow_tripdata_2022-02.parquet")
val df_202203 = spark.read.parquet("/home/iceberg/data/yellow_tripdata_2022-03.parquet")
val df_q1 = df_202201.union(df_202202).union(df_202203)
df_q1.write.insertInto("nyc.taxis_sample")
Rewriting Data Files
Iceberg tables include a metadata files
table that includes lots of useful information about the underlying data files. Query the files
table to see the data files and file sizes for the nyc.taxis_sample
table.
spark.sql("SELECT file_path, file_size_in_bytes FROM nyc.taxis_sample.files").show(100)
output:
+--------------------+------------------+
| file_path|file_size_in_bytes|
+--------------------+------------------+
|/home/iceberg/war...| 4181875|
|/home/iceberg/war...| 4005835|
|/home/iceberg/war...| 4073450|
|/home/iceberg/war...| 3946420|
|/home/iceberg/war...| 3981032|
|/home/iceberg/war...| 3967325|
|/home/iceberg/war...| 3843492|
|/home/iceberg/war...| 3958067|
|/home/iceberg/war...| 3879571|
|/home/iceberg/war...| 4020438|
|/home/iceberg/war...| 1242185|
|/home/iceberg/war...| 3932652|
|/home/iceberg/war...| 3763271|
|/home/iceberg/war...| 3945090|
|/home/iceberg/war...| 3755812|
|/home/iceberg/war...| 3765442|
|/home/iceberg/war...| 3874506|
|/home/iceberg/war...| 3763194|
|/home/iceberg/war...| 3801471|
|/home/iceberg/war...| 3918764|
|/home/iceberg/war...| 3788822|
|/home/iceberg/war...| 3791834|
|/home/iceberg/war...| 3826818|
|/home/iceberg/war...| 2154343|
|/home/iceberg/war...| 3802070|
|/home/iceberg/war...| 3661636|
|/home/iceberg/war...| 3891395|
|/home/iceberg/war...| 3783518|
|/home/iceberg/war...| 3773159|
|/home/iceberg/war...| 3778178|
|/home/iceberg/war...| 3884718|
|/home/iceberg/war...| 3826138|
|/home/iceberg/war...| 3782716|
|/home/iceberg/war...| 3941177|
|/home/iceberg/war...| 3757253|
|/home/iceberg/war...| 3786523|
|/home/iceberg/war...| 3802603|
|/home/iceberg/war...| 3734727|
|/home/iceberg/war...| 4869774|
+--------------------+------------------+
Looking at the size of the data files shows that they’re only a few MB each. Now, this is because of the write.target-file-size-bytes
table property used for the sake of this demo, but
in practice there are a number of ways one could end up with tables containing files that are too small or too large. Small files can have a major impact on query performance by creating large
metadata overhead. Analyzing a query involves using each file’s metadata to calculate how many splits are required and where to schedule each task to maximize data localization. The more files,
the longer this part of query planning will take. Large files, on the other hand, can also cause significantly decreased performance by limiting parallelism. A good post that covers many of these
scenarios is Optimizing data warehouse storage from the Netflix Technology Blog.
Since the table now has small files, the table property can be unset.
spark.sql("ALTER TABLE nyc.taxis_sample UNSET TBLPROPERTIES ('write.target-file-size-bytes')")
Rewriting the data files using the rewrite_data_files
procedure is straightforward when accepting the default algorithm of bin-packing small files and splitting large files based on the default write size of the table, in which case only the table name is required. However for this example, also specify a file size target using the target-file-size-bytes
option with a value of ~50MB.
spark.sql("CALL demo.system.rewrite_data_files(table => 'nyc.taxis_sample', options => map('target-file-size-bytes','52428800'))").show()
output:
+--------------------------+----------------------+
|rewritten_data_files_count|added_data_files_count|
+--------------------------+----------------------+
| 39| 4|
+--------------------------+----------------------+
Take another look at the files table.
spark.sql("SELECT file_path, file_size_in_bytes FROM nyc.taxis_sample.files").show()
output:
+--------------------+------------------+
| file_path|file_size_in_bytes|
+--------------------+------------------+
|/home/iceberg/war...| 46185713|
|/home/iceberg/war...| 4869774|
|/home/iceberg/war...| 46449079|
|/home/iceberg/war...| 44439258|
+--------------------+------------------+
As described by the output from the rewrite_data_files
procedure call, the 39 smaller data files have been compacted to 4 files.
Expiring Snapshots
When a system includes the concept of table snapshots, it’s intuitive to consider the storage implications of keeping data around that may not be a part of the table’s current state. With Iceberg, this problem is greatly minimized by the fact that Iceberg snapshots re-use unchanged data files from previous snapshots. However, new snapshots often require rewriting or removing data files, making these files only relevant to historical snapshots. These files accumulate over time and add to the overhead of keeping older snapshots around indefinitely.
The current state of the nyc.taxis_sample
table is a great example of this. After compaction, there are only 4 larger files, much less than the 39 smaller files before the compaction. However, looking at the directory where the data lives shows that those 39 smaller files still exist!
ls -Shs /home/iceberg/warehouse/nyc/taxis_sample/data
output:
total 293M
46M 00000-28-a4a4fa54-b114-4064-993b-0e6a28411eef-00001.parquet
45M 00001-29-60657418-5bb1-494f-a1e3-2a8bb5e0242c-00001.parquet
44M 00002-30-a83491c4-75fe-45f6-8d81-d4eede907d9a-00001.parquet
5.1M 00003-31-a52999e1-a997-415a-8035-442bbe4ccd0b-00001.parquet
5.1M 00019-22-44cb1e14-8a40-4c52-971f-a4d57df6f5c0-00015.parquet
4.1M 00003-6-c5e529b1-7f8c-45e9-a944-80840ed106a4-00001.parquet
4.1M 00003-6-c5e529b1-7f8c-45e9-a944-80840ed106a4-00003.parquet
4.1M 00003-6-c5e529b1-7f8c-45e9-a944-80840ed106a4-00010.parquet
4.1M 00003-6-c5e529b1-7f8c-45e9-a944-80840ed106a4-00002.parquet
4.1M 00003-6-c5e529b1-7f8c-45e9-a944-80840ed106a4-00005.parquet
4.1M 00003-6-c5e529b1-7f8c-45e9-a944-80840ed106a4-00006.parquet
4.1M 00003-6-c5e529b1-7f8c-45e9-a944-80840ed106a4-00008.parquet
4.1M 00003-6-c5e529b1-7f8c-45e9-a944-80840ed106a4-00004.parquet
4.1M 00011-14-48631ab2-d51d-4d6d-ba2c-8aa312148d8c-00003.parquet
4.1M 00019-22-44cb1e14-8a40-4c52-971f-a4d57df6f5c0-00010.parquet
4.1M 00011-14-48631ab2-d51d-4d6d-ba2c-8aa312148d8c-00001.parquet
4.1M 00011-14-48631ab2-d51d-4d6d-ba2c-8aa312148d8c-00009.parquet
4.1M 00019-22-44cb1e14-8a40-4c52-971f-a4d57df6f5c0-00003.parquet
4.1M 00019-22-44cb1e14-8a40-4c52-971f-a4d57df6f5c0-00007.parquet
4.1M 00003-6-c5e529b1-7f8c-45e9-a944-80840ed106a4-00009.parquet
4.1M 00011-14-48631ab2-d51d-4d6d-ba2c-8aa312148d8c-00006.parquet
4.1M 00003-6-c5e529b1-7f8c-45e9-a944-80840ed106a4-00007.parquet
4.1M 00011-14-48631ab2-d51d-4d6d-ba2c-8aa312148d8c-00012.parquet
4.1M 00019-22-44cb1e14-8a40-4c52-971f-a4d57df6f5c0-00008.parquet
4.1M 00019-22-44cb1e14-8a40-4c52-971f-a4d57df6f5c0-00013.parquet
4.1M 00019-22-44cb1e14-8a40-4c52-971f-a4d57df6f5c0-00001.parquet
4.1M 00011-14-48631ab2-d51d-4d6d-ba2c-8aa312148d8c-00008.parquet
4.1M 00011-14-48631ab2-d51d-4d6d-ba2c-8aa312148d8c-00011.parquet
4.1M 00011-14-48631ab2-d51d-4d6d-ba2c-8aa312148d8c-00010.parquet
4.2M 00019-22-44cb1e14-8a40-4c52-971f-a4d57df6f5c0-00012.parquet
4.1M 00019-22-44cb1e14-8a40-4c52-971f-a4d57df6f5c0-00004.parquet
4.3M 00019-22-44cb1e14-8a40-4c52-971f-a4d57df6f5c0-00009.parquet
4.1M 00019-22-44cb1e14-8a40-4c52-971f-a4d57df6f5c0-00006.parquet
4.1M 00019-22-44cb1e14-8a40-4c52-971f-a4d57df6f5c0-00005.parquet
4.1M 00011-14-48631ab2-d51d-4d6d-ba2c-8aa312148d8c-00005.parquet
4.1M 00011-14-48631ab2-d51d-4d6d-ba2c-8aa312148d8c-00002.parquet
4.1M 00011-14-48631ab2-d51d-4d6d-ba2c-8aa312148d8c-00007.parquet
4.1M 00019-22-44cb1e14-8a40-4c52-971f-a4d57df6f5c0-00011.parquet
4.1M 00011-14-48631ab2-d51d-4d6d-ba2c-8aa312148d8c-00004.parquet
4.1M 00019-22-44cb1e14-8a40-4c52-971f-a4d57df6f5c0-00014.parquet
4.2M 00019-22-44cb1e14-8a40-4c52-971f-a4d57df6f5c0-00002.parquet
3.1M 00011-14-48631ab2-d51d-4d6d-ba2c-8aa312148d8c-00013.parquet
1.2M 00003-6-c5e529b1-7f8c-45e9-a944-80840ed106a4-00011.parquet
This is because the previous snapshot that references these files still exists. That means these files must be preserved to allow
for time-travel queries and rolling back to the previous snapshot. The previous snapshot can be seen by looking at the snapshots table.
spark.sql("SELECT committed_at, snapshot_id, operation FROM nyc.taxis_sample.snapshots").show(truncate=false)
output:
+-----------------------+-------------------+---------+
|committed_at |snapshot_id |operation|
+-----------------------+-------------------+---------+
|2022-07-16 18:19:43.407|5498522437864970329|append |
|2022-07-16 18:21:08.215|1469784638096125204|replace |
+-----------------------+-------------------+---------+
Fortunately, once the older snapshot is expired, the related data files will be cleaned up as well. Older snapshots can be expired using the expire_snapshots
procedure. In its simplest form, expire_snapshots
takes just the table name with sensible defaults that remove snapshots older than 5 days.
For demonstration purposes, override the default value for the older_than
and retain_last
options and use the current time for older_than
so that all snapshots are included. Also, set retain_last
to 1
so that the most recent snapshot is not expired. In other words, the action will expire all snapshots except the latest one.
val now = java.util.Calendar.getInstance().getTime()
val format = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
val now_str = format.format(now)
spark.sql(s"CALL demo.system.expire_snapshots(table => 'nyc.taxis_sample', older_than => TIMESTAMP '$now_str', retain_last => 1)").show()
output:
+------------------------+----------------------------+----------------------------+
|deleted_data_files_count|deleted_manifest_files_count|deleted_manifest_lists_count|
+------------------------+----------------------------+----------------------------+
| 39| 1| 1|
+------------------------+----------------------------+----------------------------+
The output from the expire_snapshots
procedure describes that 39 data files were deleted which are all of the data files associated with the previous uncompacted snapshot. A look at the snapshots table now shows only a single snapshot. This is the snapshot created by the compaction event and is the current
snapshot of the table.
spark.sql("SELECT committed_at, snapshot_id, operation FROM nyc.taxis_sample.snapshots").show(truncate=false)
output:
+-----------------------+-------------------+---------+
|committed_at |snapshot_id |operation|
+-----------------------+-------------------+---------+
|2022-07-16 18:21:08.215|1469784638096125204|replace |
+-----------------------+-------------------+---------+
Likewise, a peek at the data directory confirms that the files from the older snapshot no longer exist.
ls -Shs /home/iceberg/warehouse/nyc/taxis_sample/data
output:
total 138M
46M 00000-28-a4a4fa54-b114-4064-993b-0e6a28411eef-00001.parquet
45M 00001-29-60657418-5bb1-494f-a1e3-2a8bb5e0242c-00001.parquet
44M 00002-30-a83491c4-75fe-45f6-8d81-d4eede907d9a-00001.parquet
5.1M 00003-31-a52999e1-a997-415a-8035-442bbe4ccd0b-00001.parquet
Removing Orphan Files
In the previous section, storage was optimized by expiring older snapshots and allowing Iceberg to identify and remove
data files no longer needed by the remaining snapshots. However, what if data files exist in the table directory that are not tracked by Iceberg or referenced by any snapshot? These kind of files are referred to as “orphan files”. There are many ways a table’s directory can end up containing orphan files. One example would be if the original parquet files from the NYC taxi open dataset were stored in the table’s directory before being loaded. Once the dataframe was saved as a table, the table’s metadata would have no knowledge of the original data files used to construct the dataframe.
Determining which files are actually orphan files is not a trivial task and requires considering all snapshots of a table. Iceberg allows removing orphan files by calling the remove_orphan_files
procedure. The procedure defaults to only removing orphan files older than three days and returns a table with a single column called orphan_file_location
that shows the full file path of each orphan file removed.
Calling remove_orphan_files
on the nyc.taxis_sample
table does not show any results since the table’s directory does not contain any orphan files.
spark.sql(s"CALL demo.system.remove_orphan_files(table => 'nyc.taxis_sample')").show(truncate=false)
output:
+--------------------+
|orphan_file_location|
+--------------------+
+--------------------+
To simulate the existence of orphan files, manually create three files in the table’s data directory. Also, set the timestamp on the files to make sure they’re older than 24 hours, otherwise theremove_orphan_files
procedure won’t remove them and will instead warn that removing files so recently written could interfere with currently running jobs that may be writing to the table.
touch -d "2022-01-01 00:00:00.000000000" /home/iceberg/warehouse/nyc/taxis_sample/data/i-shouldnt-be-here-1.parquet /home/iceberg/warehouse/nyc/taxis_sample/data/i-shouldnt-be-here-2.parquet /home/iceberg/warehouse/nyc/taxis_sample/data/i-shouldnt-be-here-3.parquet
Now the data directory for the nyc.taxis_sample
table contains the orphan files.
ls -Shs /home/iceberg/warehouse/nyc/taxis_sample/data/
output:
total 138M
46M 00000-28-a4a4fa54-b114-4064-993b-0e6a28411eef-00001.parquet
45M 00001-29-60657418-5bb1-494f-a1e3-2a8bb5e0242c-00001.parquet
44M 00002-30-a83491c4-75fe-45f6-8d81-d4eede907d9a-00001.parquet
5.1M 00003-31-a52999e1-a997-415a-8035-442bbe4ccd0b-00001.parquet
4.0K i-shouldnt-be-here-1.parquet
4.0K i-shouldnt-be-here-2.parquet
4.0K i-shouldnt-be-here-3.parquet
Go ahead and try a dry run of the remove_orphan_files
procedure by including dry_run => true
.
spark.sql(s"CALL demo.system.remove_orphan_files(table => 'nyc.taxis_sample', dry_run => true)").show(truncate=false)
output:
+-------------------------------------------------------------------------------+
|orphan_file_location |
+-------------------------------------------------------------------------------+
|file:/home/iceberg/warehouse/nyc/taxis_sample/data/i-shouldnt-be-here-1.parquet|
|file:/home/iceberg/warehouse/nyc/taxis_sample/data/i-shouldnt-be-here-2.parquet|
|file:/home/iceberg/warehouse/nyc/taxis_sample/data/i-shouldnt-be-here-3.parquet|
+-------------------------------------------------------------------------------+
The dry run shows that the orphan files we created would be removed by the procedure. When ready to remove them, simply re-run the remove_orphan_files
procedure without the dry_run
flag.
spark.sql(s"CALL demo.system.remove_orphan_files(table => 'nyc.taxis_sample')").show(truncate=false)
output:
+-------------------------------------------------------------------------------+
|orphan_file_location |
+-------------------------------------------------------------------------------+
|file:/home/iceberg/warehouse/nyc/taxis_sample/data/i-shouldnt-be-here-1.parquet|
|file:/home/iceberg/warehouse/nyc/taxis_sample/data/i-shouldnt-be-here-2.parquet|
|file:/home/iceberg/warehouse/nyc/taxis_sample/data/i-shouldnt-be-here-3.parquet|
+-------------------------------------------------------------------------------+
Another look at the data directory shows that the orphan files were removed.
ls -Shs /home/iceberg/warehouse/nyc/taxis_sample/data/
output:
total 138M
46M 00000-28-a4a4fa54-b114-4064-993b-0e6a28411eef-00001.parquet
45M 00001-29-60657418-5bb1-494f-a1e3-2a8bb5e0242c-00001.parquet
44M 00002-30-a83491c4-75fe-45f6-8d81-d4eede907d9a-00001.parquet
5.1M 00003-31-a52999e1-a997-415a-8035-442bbe4ccd0b-00001.parquet
Rewriting Manifest Files
Without getting too much into the weeds about how Iceberg metadata is structured, maintaining ideal manifest file sizes is very important for optimal scan planning. This can be achieved by running the rewrite_manifests
procedure which only requires providing the table name.
spark.sql("CALL demo.system.rewrite_manifests('nyc.taxis_sample')").show()
output:
+-------------------------+---------------------+
|rewritten_manifests_count|added_manifests_count|
+-------------------------+---------------------+
| 2| 1|
+-------------------------+---------------------+
That’s it! The manifest files have all been rewritten to optimal sizes.
Note: For an overview on how Iceberg metadata works, take a look at one of our previous blog posts Metadata Indexing in Iceberg by Ryan Blue.
Conclusion
There are many factors that inform which maintenance procedures should be used and how often they should be performed. Query patterns, table size, performance requirements, storage type, regulatory requirements, security policies, and many more things can influence how a table’s data and metadata must be maintained.
Iceberg aims to make it easier to think about the harder questions regarding what kind of maintenance an individual table requires instead of getting bogged down in the how. Sensible defaults allow performing maintenance actions that optimize the most typical table usage patterns and are very valuable before query patterns can be established. Iceberg’s table maintenance procedures are a great example of the Iceberg community’s approach to declarative data engineering and the philosophy that data professionals should be empowered to think more about the actual data and less about its technical maintenance.
If you have thoughts or ideas to share, or just want to drop by to say hello, the Iceberg community is a very welcoming one and you can find ways to join by checking out our community page.