Co-authored by Kyle Bendickson
UPDATE: This docker environment has been updated to use the Iceberg REST catalog. You can read about this new setup in our follow-up blog post: Iceberg’s REST Catalog: A Spark Demo.
If you’re interested in Iceberg because you heard it solves a problem, such as schema evolution or row-level updates, and you want an easy way to test it out, then you’re in the right place! This post will get you up and running with Spark and Iceberg locally in minutes. This also highlights many of the incredible Iceberg features that solve data warehousing problems you’ve seen before.
Iceberg provides libraries for interacting directly with tables, but those are too low level for most people. Most of the time, you’ll interact with Iceberg through compute engines, like Spark, Trino, or Flink.
Let’s start by running a local Spark instance with Iceberg integration using Docker. In this environment you’ll be able to test out Iceberg’s many features like time travel, safe schema evolution, and hidden partitioning.
- Launching the Notebook
- A Minimal Runtime
- Schema Evolution
- Partitioning
- Time Travel and Rollback
- Expressive SQL for Row-Level Changes
- Atomic CTAS and RTAS statements
- Spark Procedures
Launching the Notebook
First, install Docker and Docker Compose if you don’t already have them. Next, create a docker-compose.yaml file with the following content.
docker-compose.yaml
version: "3"
services:
spark-iceberg:
image: tabulario/spark-iceberg
container_name: spark-iceberg
build: spark/
networks:
iceberg_net:
depends_on:
- rest
- minio
volumes:
- ./warehouse:/home/iceberg/warehouse
- ./notebooks:/home/iceberg/notebooks/notebooks
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
ports:
- 8888:8888
- 8080:8080
- 10000:10000
- 10001:10001
rest:
image: tabulario/iceberg-rest
container_name: iceberg-rest
networks:
iceberg_net:
ports:
- 8181:8181
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
- CATALOG_WAREHOUSE=s3://warehouse/
- CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
- CATALOG_S3_ENDPOINT=http://minio:9000
minio:
image: minio/minio
container_name: minio
environment:
- MINIO_ROOT_USER=admin
- MINIO_ROOT_PASSWORD=password
- MINIO_DOMAIN=minio
networks:
iceberg_net:
aliases:
- warehouse.minio
ports:
- 9001:9001
- 9000:9000
command: ["server", "/data", "--console-address", ":9001"]
mc:
depends_on:
- minio
image: minio/mc
container_name: mc
networks:
iceberg_net:
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
entrypoint: >
/bin/sh -c "
until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done;
/usr/bin/mc rm -r --force minio/warehouse;
/usr/bin/mc mb minio/warehouse;
/usr/bin/mc policy set public minio/warehouse;
tail -f /dev/null
"
networks:
iceberg_net:
In the same directory as the docker-compose.yaml file, run the following commands to start the runtime and launch an Iceberg-enabled Spark notebook server.
docker-compose up -d
docker exec -it spark-iceberg pyspark-notebook
That’s it! You should now have a fully functional notebook server running at http://localhost:8888, a Spark driver UI running at http://localhost:8080, and a Spark history UI running at http://localhost:18080.
Note: As the tabulario/spark-iceberg
image evolves, be sure to refresh your cached image to pick up the latest changes by running docker-compose pull
.
A Minimal Runtime
The runtime provided by the docker compose file is far from a large scale production-grade warehouse, but it does let you demo Iceberg’s wide range of features. Let’s quickly cover this minimal runtime.
- Spark 3.1.2 in local mode (the Engine)
- A JDBC catalog backed by a Postgres container (the Catalog)
- A docker-compose setup to wire everything together
- A
%%sql
magic command to easily run SQL in a notebook cell
So, What Can Iceberg Do?
The docker environment comes with an “Iceberg – Getting Started” notebook which demos many of the features that are covered in greater depth in this post. The examples you see here are included in the notebook, so that’s also a good place to start learning!
Schema Evolution
Anyone who’s worked with Hive tables or other big data table formats knows how tricky evolving a schema can be. Adding columns that have been previously dropped can lead to data being resurrected from the dead. When you’re lucky enough to quickly catch it, it’s often time-consuming to undo and many times leads you back to the original state of the table. Even renaming a column is a potentially dangerous operation.
You shouldn’t need to worry about which changes work and which ones break your table. In Iceberg, schema operations such as renaming or adding columns are safe operations with no surprising side effects.
ALTER TABLE taxis
RENAME COLUMN fare_amount TO fare
ALTER TABLE taxis
ADD COLUMN fare_per_distance_unit float AFTER distance;
Other schema changes like changing a column’s type, adding a comment, or moving its position are similarly straightforward.
ALTER TABLE taxis RENAME COLUMN trip_distance TO distance;
ALTER TABLE taxis ALTER COLUMN distance COMMENT 'The elapsed trip distance in miles reported by the taximeter.'
ALTER TABLE taxis ALTER COLUMN distance TYPE double;
ALTER TABLE taxis ALTER COLUMN distance AFTER fare;
Furthermore, schema evolution consists of metadata operations only. This is why they’re executed very quickly. Gone are the days of rewriting entire tables just to change the name of a single column!
Partitioning
When you consider changing the partition scheme of a production table, it’s always a big deal. The process is arduous and requires migrating to an entirely new table with a different partition scheme—unless you’re using Iceberg. Historical data in the table must be updated to the new partition scheme, even if you’re only interested in changing the partitioning of newly written data. The hardest part, arguably, is chasing down the table’s consumers to remind them to update the partition clause in all of their queries! Iceberg handles this seamlessly. A table’s partitioning can be updated in place and applied only to newly written data.
ALTER TABLE taxis
ADD PARTITION FIELD VendorID
Query plans are then split, using the old partition scheme for data written before the partition scheme was changed, and using the new partition scheme for data written after. People querying the table don’t even have to be aware of this split. Simple predicates in WHERE clauses are automatically converted to partition filters that prune out files with no matches. This is what’s referred to in Iceberg as Hidden Partitioning.
Time Travel and Rollback
As you change your table, Iceberg tracks each version as a “snapshot” in time. You can time travel to any snapshot or point in time. This is useful when you want to reproduce results of previous queries, for example to reproduce a downstream product such as a report.
spark.read.table("taxis").count() # 2,853,020
val ONE_DAY_MS = 86400000;
val NOW = System.currentTimeMillis()
(spark
.read
.option("as-of-timestamp", NOW_MS - ONE_DAY_MS)
.table("taxis")
.count()) # 2,798,371
In complicated logic that depends on many upstream tables that are frequently being updated, it’s sometimes ambiguous whether changes in query results are caused by the new code change you’re testing, or if the upstream tables have been updated. Using time travel, you can ensure that you’re querying all the tables as of a specific time. This makes creating a controlled environment much easier.
If instead of time traveling for a single query at runtime, you want to actually roll back the table to that point in time or to a specific snapshot ID, you can easily achieve that using the rollback procedure!
CALL catalog_name.system.rollback_to_timestamp('taxis', TIMESTAMP '2021-12-31 00:00:00.000')
CALL demo.system.rollback_to_snapshot('taxis', <SNAPSHOT>)
Expressive SQL for Row-Level Changes
Iceberg’s SQL extensions allow for very expressive queries that perform row-level operations. For example, you can delete all records in a table that match a specific predicate.
DELETE FROM taxis
WHERE fare_per_distance_unit > 4.0 OR distance > 2.0
Also, the MERGE INTO command makes the common task of merging two tables very intuitive.
MERGE INTO prod.nyctaxis pt
USING (SELECT * FROM staging.nyc.taxis) st
ON pt.id = st.id
WHEN NOT MATCHED THEN INSERT *;
Atomic CTAS and RTAS Statements
How often have you needed to create a table that exactly matched the results of a query? Iceberg ensures that CTAS (short for “CREATE TABLE AS SELECT”) or RTAS (short for “REPLACE TABLE AS SELECT”) statements are atomic. If the SELECT query fails, you aren’t left with a partial table that you have to quickly drop before someone queries it!
[CREATE|REPLACE] TABLE prod.nyc.vendor2 AS
SELECT * FROM taxis
WHERE vendor_id = '2'
Spark Procedures
We covered the rollback_to_snapshot
procedure, but Iceberg includes many more procedures that allow you to perform all sorts of table maintenance. You can expire snapshots, rewrite manifest files, or remove orphan files, all using intuitively named Spark procedures. To read more about all the available procedures, check out the Spark Procedures section in the Iceberg documentation.
Closing Remarks
Support for Iceberg is continuing to grow rapidly, with Amazon recently announcing built-in support for Iceberg in EMR and Snowflake recently announcing support for connecting to Iceberg tables in response to significant customer demand. Likewise, the community is growing bigger and stronger every day, with contributors from many industries bringing excitingly unique use cases, perspectives, and solutions. If you have thoughts on any of the features discussed or just want to say hello, check out our community page for all the ways you can join in on the conversation. I hope to see you there!