BASICS
This recipe demonstrates how to create Apache Iceberg tables from CSV files. This focuses on ensuring the schema for the table matches the data and uses Apache Spark because it supports a variety of formats and can help infer schemas.
Parquet and CSV files are available from the public New York City Taxi and Limousine Commission Trip Record Data that is available on the AWS Open Data Registry.
CSV and schemas
CSV can be tricky as there is no guarantee that there are headers to name fields and there’s no way to pass the fields’ data types in data files.
The files used in this example have a header, but Spark doesn’t automatically detect one. To tell Spark to use the header as field names, pass header=True
. Spark will also infer column types from the data if you pass inferSchema=True
.
Both of these options can be passed either as string options or directly to the csv
method:
csv_df = spark.read.csv(
's3://nyc-tlc/csv_backup/yellow_tripdata_2022-02.csv',
header=True,
inferSchema=True
)
If your data file doesn’t have a header or column types aren’t inferred correctly, you may need to pass a custom schema with types and column names with the schema
option.
csv_df = spark.read.csv(
's3://nyc-tlc/csv_backup/yellow_tripdata_2022-02.csv',
schema='vendor_id int, tpep_pickup_ts timestamp, ...'
)
After loading, you can make minor adjustments just like in the Parquet recipe. Use withColumn
to adjust types and withColumnRenamed
to rename columns.
There are also many more CSV options that control delimiters, date/time formats, whitespace trimming, etc. Refer to the Spark CSV documentation for complete details.
Creating the Iceberg table
Once your data looks correct, create an Iceberg table using writeTo
syntax:
from pyspark.sql.functions import days
spark.sql("CREATE DATABASE IF NOT EXISTS nyc")
csv_df.writeTo("nyc.taxis") \
.partitionedBy(days("tpep_pickup_datetime")) \
.createOrReplace()
This uses the same options that were covered in the Parquet create table recipe.