Partition design is a critical part of data modeling. Unfortunately, given the constraints of most Hive-based tables, data engineers (myself included) are often forced into choices that have unintentional and potentially dangerous implications. In this post I’m going to illustrate some of the potential pitfalls around partition design and techniques for avoiding them using the popular NYC taxi data set.
The Problem with (Hive) Partitions
The New York Taxi & Limousine Commission publishes a rich dataset about the movements of humans in and around New York. The data is conveniently provided as a set of parquet files each covering one month of trip information.
Let us assume that I have been tasked with ingesting this data into my company’s data warehouse to help analyze the best location for opening our brand new flagship store. Because I am a seasoned data practitioner well versed in the ways of functional data engineering, I naturally set up my target data warehouse table with monthly partitions.
CREATE TABLE taxis AS … PARTITIONED BY year, month
Since I am loading data provided in monthly files, using the month indicated in the file name seems like a natural choice. I can be confident that loading any given file will only impact a single partition in the table. This allows me to safely load files in parallel and safely reprocess a file in the future.
INSERT OVERWRITE taxis PARTITION (year=2020, month=3) SELECT * FROM taxi_202003
However, we will quickly discover a problem with this approach when we try and answer the simple question “how many rides originated in March of 2020?”
SELECT count(*) FROM taxis WHERE pickup_time BETWEEN ‘2020-03-01’ AND ‘2020-04-01’; RESULT: 3,007,384
Neat, but somewhere a data engineer is crying because we forgot to add the year and month partitions as predicates meaning we just did a full table scan (GASP!). Let’s modify our query to be more performant:
SELECT count(*) FROM taxis WHERE pickup_time BETWEEN ‘2020-03-01’ AND ‘2020-04-01’ AND year=2020 AND month=03; RESULT: 3,007,687
As it turns out, the “monthly” taxi source data has trips from multiple months. So by limiting our query to only include a single partition, we have unintentionally excluded records and our query produces incorrect results.
Event Time vs Processing Time
Data engineers have a dirty secret, we typically partition our tables based on processing time. To the casual analyst, this may seem like an innocuous choice, however, the savvy reader has already realized that all of a company’s KPIs should carry an “*”.
How many new customers visited our site yesterday? *
* may include visitors from previous days and also is potentially missing data that just hasn’t landed yet
We often observe a similar pattern when ingesting streaming data referred to as late arriving data. It is very likely that during that hour our system will process records that originated in previous hours (or days). We often choose to partition this data by processing time to avoid the performance and correctness challenges that come from attempting to append this late arriving data to existing partitions.
Partitioning By Event Time
Instead of using the file to populate the year and month partition columns, I can opt to derive the year and month of each record from
dropoff_time, oh the agony!). In order to ensure that I can safely rerun this job without duplicating data, I can use Spark’s
INSERT OVERWRITE syntax.
INSERT OVERWRITE taxis PARTITION (year, month) SELECT *, year(pickup_time) as year, month(pickup_time) as month FROM taxi_202003
BUT, as we have observed, each monthly source file may contain data that will map into multiple target partitions. Even with Spark’s dynamic partition overwrite enabled, if I attempt to naively insert this data I will end up deleting data by overwriting data in unintended partitions.
Partitioning By Processing Time & Event Time
In order to avoid unintentionally overwriting data, I can use a combination of processing time and event time partitions.
CREATE TABLE taxis AS … PARTITIONED BY filemonth, year, month
Now I can combine static and dynamic partition overwrites to achieve an idempotent write.
INSERT OVERWRITE taxis PARTITION (filemonth=202003, year, month) SELECT *, year(pickup_time) as year, month(pickup_time) as month FROM taxi_202003
We have statically defined the partition for the file we are loading, so we can safely load this file multiple times without overwriting data from other files. And because the year and month partitions are derived from the
pickup_time column, our two previous queries (with and without the partition predicates) yield identical (correct) results.
Time to celebrate a job well done?
Sadly, not quite yet. I have now laid the foundations for a new, more subtle problem. What started as a single file of source data for each month will be spread across potentially many partitions. As many months of data continue to be loaded into this table, I will likely encounter the dreaded small files problem.
Table Formats to the Rescue
Luckily, we now have table formats like Delta and Iceberg which allow us to leverage MERGE operations and make this pattern much more efficient.
MERGE INTO taxis t USING (SELECT * FROM source) s ON t.pickup_time = s.pickup_time AND t.dropoff_time = s.dropoff_time WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *
We are almost there, because we still have this anti-pattern of adding columns to our data model just to hold the partition values. As you recall, this also means that all of our queries need to add these extraneous columns in order to avoid full table scans:
SELECT count(*) from nyc_taxi where pickup_time between ‘2020-03-01’ and ‘2020-04-01’ AND year=2020 and month=03
Sidenote: How does the analyst even know they are supposed to add this additional predicate? Tribal knowledge? Being repeatedly scolded by whoever is paying the AWS bill?
Iceberg to the Rescue
If you are wise and experienced (aka old) like me and remember how data used to work in the before days (e.g. before hadoop), then you know that the concept of partitioning is not new.
Unlike hadoop-based table formats, Iceberg partitioning is configuration that instructs engines how to structure the data, rather than extra columns that represent a directory structure. Iceberg keeps track of the relationship between the data and the structure, so we never need to select those directories manually nor do our queries require extra predicates in order to be efficient.
In fact, for time-based partitions, Iceberg provides handy transforms for defining partitions as part of our table definition so we don’t even have to think about partitioning as part of our ingest job.
CREATE TABLE iceberg.taxis AS … PARTITIONED BY month(pickup_time)
So we can finally get all the way back to our original ideal query and know that it will be both efficient and accurate.
SELECT count(*) FROM iceberg.taxis WHERE pickup_time BETWEEN ‘2020-03-01’ AND ‘2020-04-01’; RESULT: 3,007,384
Expert Mode: This also means that we can evolve the partition scheme for a table without having to restate all the data!
The Circle is Complete
We have seen how a table’s partitioning strategy can impact both read and write performance as well as the accuracy of query results. By leveraging Iceberg, data engineers can avoid the problems introduced by the complexities of processing time vs event time while analysts can easily avoid full table scans!