BASICS
This recipe demonstrates how to create Iceberg tables from JSON files. This focuses on ensuring the schema for the table matches the data and uses Apache Spark because it supports a variety of formats and can help infer schemas.
Parquet and CSV files are available from the public New York City Taxi and Limousine Commission Trip Record Data that is available on the AWS Open Data Registry.
Writing a JSON dataset
There isn’t a public JSON version of the NYC taxi dataset for this example, but you can create one by writing the DataFrame from the CSV recipe to a folder:
csv_df.coalesce(1).write.json('/tmp/yellow_tripdata_2022-02/')
JSON and schemas
Spark’s JSON DataFrameReader does not have an option to infer a schema because JSON values have JSON types, but some types are not supported by JSON. For example, if you show the schema from the newly created JSON data, the date/time columns are shown as strings:
spark.read.json('/tmp/yellow_tripdata_2022-02/json/').printSchema()
# root
# |-- DOLocationID: long (nullable = true)
# |-- PULocationID: long (nullable = true)
# |-- RatecodeID: double (nullable = true)
# |-- VendorID: long (nullable = true)
# |-- congestion_surcharge: double (nullable = true)
# |-- extra: double (nullable = true)
# |-- fare_amount: double (nullable = true)
# |-- improvement_surcharge: double (nullable = true)
# |-- mta_tax: double (nullable = true)
# |-- passenger_count: double (nullable = true)
# |-- payment_type: long (nullable = true)
# |-- store_and_fwd_flag: string (nullable = true)
# |-- tip_amount: double (nullable = true)
# |-- tolls_amount: double (nullable = true)
# |-- total_amount: double (nullable = true)
# |-- tpep_dropoff_datetime: string (nullable = true)
# |-- tpep_pickup_datetime: string (nullable = true)
# |-- trip_distance: double (nullable = true)
To work around this, use the inferTimestamp=True
option. There are similar options for using decimals or setting date/time formats to produce the right schema:
json_df = spark.read.option('inferTimestamp', 'True') \
.json('/tmp/yellow_tripdata_2022-02/json/')
.printSchema()
# root
# |-- DOLocationID: long (nullable = true)
# |-- PULocationID: long (nullable = true)
# |-- RatecodeID: double (nullable = true)
# |-- VendorID: long (nullable = true)
# |-- congestion_surcharge: double (nullable = true)
# |-- extra: double (nullable = true)
# |-- fare_amount: double (nullable = true)
# |-- improvement_surcharge: double (nullable = true)
# |-- mta_tax: double (nullable = true)
# |-- passenger_count: double (nullable = true)
# |-- payment_type: long (nullable = true)
# |-- store_and_fwd_flag: string (nullable = true)
# |-- tip_amount: double (nullable = true)
# |-- tolls_amount: double (nullable = true)
# |-- total_amount: double (nullable = true)
# |-- tpep_dropoff_datetime: timestamp (nullable = true)
# |-- tpep_pickup_datetime: timestamp (nullable = true)
# |-- trip_distance: double (nullable = true)
The next issue is that JSON schema columns are not necessarily in the original order. You can reorder columns using the DataFrame select
method with an ordered list of columns by name.
Like CSV, you can also supply a custom SQL schema string. This is a good option for JSON because it produces both the correct column order and lets you specify types.
json_df = spark.read.json(
'/tmp/yellow_tripdata_2022-02/json/',
schema='VendorID int, tpep_pickup_datetime timestamp, ...'
)
Keep in mind that the schema is applied by name in JSON, so the field names must match the data in your JSON file. The CSV recipe showed how to rename VendorID
to vendor_id
in the schema, but that isn’t possible with JSON; you must use withColumnRenamed
instead.
Creating the Iceberg table
Once your JSON DataFrame is ready, you can use it to create an Iceberg table:
from pyspark.sql.functions import days
spark.sql("CREATE DATABASE IF NOT EXISTS nyc")
json_df.writeTo("nyc.taxis") \
.partitionedBy(days("tpep_pickup_datetime")) \
.createOrReplace()
This uses the same options that were covered in the Parquet create table recipe