Creating a Table from Parquet

BASICS

This recipe demonstrates how to create Apache Iceberg tables from Parquet. This focuses on ensuring the schema for the table matches the data and uses Spark because it supports a variety of formats and can help infer schemas.

Parquet and CSV files are available from the public New York City Taxi and Limousine Commission Trip Record Data that is available on the AWS Open Data Registry.

Loading source data from Parquet

The easiest open file format to read from is Parquet. Parquet is a binary format that includes a schema for the records stored in each data file. That schema is helpful when creating an Iceberg table.

Start by loading your Parquet file into a PySpark DataFrame:

df = spark.read.parquet(
    "s3://nyc-tlc/trip data/yellow_tripdata_2020-02.parquet")
df.printSchema()

# root
#  |-- VendorID: long (nullable = true)
#  |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
#  |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
#  |-- passenger_count: double (nullable = true)
#  |-- trip_distance: double (nullable = true)
#  |-- RatecodeID: double (nullable = true)
#  |-- store_and_fwd_flag: string (nullable = true)
#  |-- PULocationID: long (nullable = true)
#  |-- DOLocationID: long (nullable = true)
#  |-- payment_type: long (nullable = true)
#  |-- fare_amount: double (nullable = true)
#  |-- extra: double (nullable = true)
#  |-- mta_tax: double (nullable = true)
#  |-- tip_amount: double (nullable = true)
#  |-- tolls_amount: double (nullable = true)
#  |-- improvement_surcharge: double (nullable = true)
#  |-- total_amount: double (nullable = true)
#  |-- congestion_surcharge: double (nullable = true)
#  |-- airport_fee: integer (nullable = true)

Spark has automatically used the Parquet file schema for the DataFrame. At this point, you may want to apply small updates to the incoming schema, like changing the pickup location ID, PULocationID, to an int instead of a long. Individual changes can be applied by replacing columns using withColumn:

from pyspark.sql.functions import col
updated_df = df.withColumn(
    "PULocationID", col("PULocationID").cast("int"))

Creating the Iceberg table

When you’ve finished updating the schema, create a table from the DataFrame using writeTo:

from pyspark.sql.functions import days
spark.sql("CREATE DATABASE IF NOT EXISTS nyc")
updated_df.writeTo("nyc.taxis") \
    .partitionedBy(days("tpep_pickup_datetime")) \
    .createOrReplace()

This creates a table called nyc.taxis, as well as the database if it doesn’t already exist.

Note: The writeTo() method uses the newer DataFrame write API. This API is recommended because it has well-defined behaviors — the older write API accessed by calling write() has behavior differences across source implementations. The newer API also uses actions that correspond to SQL actions, like create() or replace(), and can overwrite data by filter.

Before calling the createOrReplace() action that will create the Iceberg table from the incoming DataFrame, you can configure the table. This example creates a partitioned table by adding a call: partitionedBy(days("tpep_pickup_datetime")). That configures the table to split data into day-sized partitions using Iceberg’s hidden partitioning.

Once the table is created, you can use DESCRIBE TABLE or a SELECT query to validate the table.

spark.sql("DESCRIBE TABLE nyc.taxis").show(truncate=False)
spark.sql("SELECT * FROM nyc.taxis LIMIT 5").show()