Using Spark in EMR with Apache Iceberg

blog-image

Many AWS customers already use EMR to run their Spark clusters. If you’re not familiar with EMR, it’s a simple way to get a Spark cluster running in about ten minutes. In this post, I’ll walk you through creating an EMR cluster backed by Apache Iceberg tables. If you’re unfamiliar with Iceberg, it’s a table format for analytic datasets that can integrate with an ever-growing number of compute engines. Along with high-performance queries on data at rest, it comes with a ton of features that enable data professionals to effectively manage big data, even up to tens of petabytes in size.

Creating an EMR Cluster

To create the EMR cluster, use the advanced create cluster setup menu found here. Don’t worry; we’ll only have to set a couple of things here. First, switch the EMR version to 6.4.0 in the drop-down selector. Next, check the box for all of the following applications to include them in the cluster:

Spark 3.1.2
JupyterHub 1.41
JupyterEnterpriseGateway 2.1.0

In the “Edit software settings” section, enter the following configuration into the text box. Remember to replace <Your S3 Warehouse Path> with a path to the location on S3 that you want to use for your warehouse, e.g. s3://emr-spark-and-iceberg/mywarehouse:


[
   {
      "classification":"spark-defaults",
      "properties":{
        "spark.jars.packages":"org.apache.iceberg:iceberg-spark3-runtime:0.12.1,software.amazon.awssdk:bundle:2.15.40,software.amazon.awssdk:url-connection-client:2.15.40",
        "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
        "spark.sql.defaultCatalog":"prodiceberg",
        "spark.sql.catalog.prodiceberg":"org.apache.iceberg.spark.SparkCatalog",
        "spark.sql.catalog.prodiceberg.catalog-impl":"org.apache.iceberg.aws.dynamodb.DynamoDbCatalog",
        "spark.sql.catalog.prodiceberg.warehouse":"<Your S3 Warehouse Path>",
        "spark.sql.catalog.prodiceberg.dynamodb.table-name":"prodiceberg_metastore"
      }
   }
]

Your options should now look like this.

emr_options

In the “General Cluster Settings” section, provide a name for the cluster. I’ve chosen “Iceberg on EMR.”

iceberg_on_emr

For the rest of the options, you can leave the defaults. Just click “Next” through the rest of the sections, and at the end, click “Create Cluster.”

That’s it! After about ten minutes, you’ll have everything you need to start reading and writing data between your EMR compute engines and your data warehouse powered by Iceberg.

A Closer Look at the Configuration

While we’re waiting on the cluster to start, let’s take a moment to cover our setup in a bit more detail. The config block you pasted above sets up Spark configuration to do three things:

  • Include Iceberg’s runtime Jar
  • Create an Iceberg table catalog
  • Enable Iceberg’s SQL extensions

spark.jars.packages

In this property, we added the Iceberg 0.12.1 runtime for Spark 3 as well as a recent AWS SDK.

spark.sql.extension

Here we add the Iceberg SQL extensions that adds additional SQL commands like UPDATE and DELETE. We’ll demo some of these commands in a later section.

spark.sql.catalog.<catalog-name>

We’ve defined prodiceberg as the catalog name and all properties that start with spark.sql.catalog.prodiceberg are configuring this catalog.

  • catalog-impl sets the Iceberg catalog implementation for prodiceberg and is set to store table information in DynamoDB. (There are many other catalog options, such as HiveMetaStore)
  • dynamodb.table-name sets the table name that DynamoDB will use.
  • warehouse sets the S3 location to store the data and metadata.

spark.sql.defaultCatalog

This property simply sets prodiceberg as the default catalog for Spark.

Next, let’s create a notebook and start having fun with Iceberg!

Creating a Notebook

AWS makes it very simple to create a new notebook and attach it to your cluster using Jupyter Enterprise Gateway. On the EMR dashboard in the AWS management console, select “Notebooks” on the left navigation pane. Next, click “Create notebook.”

notebooks_menu

Give the notebook a name (I’ve named it “emr-with-iceberg”), and click the “Choose” button under the “Choose an existing cluster” radio. The modal that pops up shows all of your existing EMR clusters that are compatible with notebooks. You should see the cluster created earlier. Select it, and click “Choose cluster.”

choose_a_cluster

Finally, click “Create notebook.”

create_notebook

The notebook will be pending while the notebook server is starting up and attaching to the EMR cluster.

Loading Data

Once the notebook is ready, open it up and change the kernel to Pyspark.

switch_to_pyspark

You can then start a spark session by simply running spark in a cell.

spark

For this notebook, we will load the New York City Taxi and Limousine Commission Trip Record Data that’s available on the AWS Open Data Registry. This data contains information about trips taken by taxis and for-hire vehicles in New York City. We’ll create an nyc database and save one month of this data into an iceberg table called taxis.


spark.sql("CREATE DATABASE nyc")
df = spark.read.option("header", True).csv("s3://nyc-tlc/trip data/yellow_tripdata_2020-04.csv")
df.writeTo("nyc.taxis").create()

Run a describe table command to see the columns and column types.


spark.sql("DESCRIBE TABLE nyc.taxis").show(truncate=False)

output:

+---------------------+---------+-------+
|col_name             |data_type|comment|
+---------------------+---------+-------+
|VendorID             |string   |       |
|tpep_pickup_datetime |string   |       |
|tpep_dropoff_datetime|string   |       |
|passenger_count      |string   |       |
|trip_distance        |string   |       |
|RatecodeID           |string   |       |
|store_and_fwd_flag   |string   |       |
|PULocationID         |string   |       |
|DOLocationID         |string   |       |
|payment_type         |string   |       |
|fare_amount          |string   |       |
|extra                |string   |       |
|mta_tax              |string   |       |
|tip_amount           |string   |       |
|tolls_amount         |string   |       |
|improvement_surcharge|string   |       |
|total_amount         |string   |       |
|congestion_surcharge |string   |       |
|                     |         |       |
|# Partitioning       |         |       |
+---------------------+---------+-------+

A row count of the nyc.taxis table shows there are 237,993 records.


spark.sql("""
    SELECT COUNT(*) as cnt
    FROM nyc.taxis
""").show()

output:

+------+
|   cnt|
+------+
|237993|
+------+

Row-Level Updates

We now have all of Iceberg’s features at our disposal. Let’s use an UPDATE query to perform some row-level updates. Suppose we found out that all of the congestion surcharges were actually just a flat $3.50 for this month. We can update congestion_surcharge to $3.50 and also change total_amount to subtract the existing congestion_surcharge and add the new flat one. Both updates would only be needed for records that currently have a congestion_surcharge greater than zero.


spark.sql("""
    UPDATE nyc.taxis
    SET total_amount = (total_amount - congestion_surcharge + 3.5),
        congestion_surcharge = 3.5
    WHERE congestion_surcharge > 0
""")

A count shows that the table now contains 184,401 records with a congestion_surcharge of 3.5.


spark.sql("""
    SELECT count(*) as cnt
    FROM nyc.taxis
    WHERE congestion_surcharge = 3.5
""").show()

output:

+------+
|   cnt|
+------+
|184401|
+------+

Time Travel

History tables list all snapshots and the parent snapshot from which they derive. The is_current_ancestor flag lets you know if a snapshot is part of the linear history of the current table.


spark.sql("""
    SELECT *
    FROM nyc.taxis.history
""").show()

output:

+--------------------+-------------------+-------------------+-------------------+
|     made_current_at|        snapshot_id|          parent_id|is_current_ancestor|
+--------------------+-------------------+-------------------+-------------------+
|2021-12-02 23:41:...|7655851632985018025|               null|               true|
|2021-12-02 23:44:...|1326103984451820324|1930340376501382945|               true|
|2021-12-02 23:44:...|8943891383828853591|1326103984451820324|               true|
+--------------------+-------------------+-------------------+-------------------+

You can time-travel using the rollback_to_snapshot stored procedure. Let’s revert the table to the original state by traveling to the very first snapshot ID.


df = spark.sql("SELECT * FROM nyc.taxis.history")
original_snapshot = df.head().snapshot_id
spark.sql(f"CALL prodiceberg.system.rollback_to_snapshot('nyc.taxis', {original_snapshot})")

output:

+--------------------+-------------------+
|previous_snapshot_id|current_snapshot_id|
+--------------------+-------------------+
| 8943891383828853591|7655851632985018025|
+--------------------+-------------------+

Another look at the history table shows that the time-travel event added a new entry with the original snapshot ID.


spark.sql("""
    SELECT *
    FROM nyc.taxis.history
""").show()

output:

+--------------------+-------------------+-------------------+-------------------+
|     made_current_at|        snapshot_id|          parent_id|is_current_ancestor|
+--------------------+-------------------+-------------------+-------------------+
|2021-12-02 23:41:...|7655851632985018025|               null|               true|
|2021-12-02 23:44:...|1326103984451820324|1930340376501382945|              false|
|2021-12-02 23:44:...|8943891383828853591|1326103984451820324|              false|
|2021-12-02 23:51:...|7655851632985018025|               null|               true|
+--------------------+-------------------+-------------------+-------------------+

Checking the table for records with a congestion_surcharge of 3.5 now returns no results. The table has been successfully reverted to it’s initial state!


spark.sql("SELECT COUNT(*) as cnt FROM nyc.taxis WHERE congestion_surcharge = 3.5").show()

output:

+------+
|   cnt|
+------+
|     0|
+------+

These examples are just scratching the surface of Apache Iceberg’s feature set!

Summary

In a very short amount of time, you can have a scalable, reliable, and flexible EMR cluster that’s connected to a powerful warehouse backed by Apache Iceberg. S3 persists your data, and there is an ever-growing list of Iceberg catalog options to choose from.

Check out the Amazon EMR Documentation and the Apache Iceberg Documentation to learn more!