Partitioning for Correctness (and Performance)

blog-image

Partition design is a critical part of data modeling. Unfortunately, given the constraints of most Hive-based tables, data engineers (myself included) are often forced into choices that have unintentional and potentially dangerous implications. In this post I’m going to illustrate some of the potential pitfalls around partition design and techniques for avoiding them using the popular NYC taxi data set.

The Problem with (Hive) Partitions

The New York Taxi & Limousine Commission publishes a rich dataset about the movements of humans in and around New York. The data is conveniently provided as a set of parquet files each covering one month of trip information.

Let us assume that I have been tasked with ingesting this data into my company’s data warehouse to help analyze the best location for opening our brand new flagship store. Because I am a seasoned data practitioner well versed in the ways of functional data engineering , I naturally set up my target data warehouse table with monthly partitions.

CREATE TABLE taxis AS  PARTITIONED BY year, month

Since I am loading data provided in monthly files, using the month indicated in the file name seems like a natural choice. I can be confident that loading any given file will only impact a single partition in the table. This allows me to safely load files in parallel and safely reprocess a file in the future.

INSERT OVERWRITE taxis PARTITION (year=2020, month=3)
SELECT * FROM taxi_202003

However, we will quickly discover a problem with this approach when we try and answer the simple question “how many rides originated in March of 2020?”

SELECT count(*) FROM taxis
WHERE pickup_time BETWEEN 2020-03-01 AND 2020-04-01;

RESULT: 3,007,384

Neat, but somewhere a data engineer is crying because we forgot to add the year and month partitions as predicates meaning we just did a full table scan (GASP!). Let’s modify our query to be more performant:

SELECT count(*) FROM taxis
WHERE pickup_time BETWEEN 2020-03-01 AND 2020-04-01
  AND year=2020 AND month=03;

RESULT: 3,007,687

Wha?!!

As it turns out, the “monthly” taxi source data has trips from multiple months. So by limiting our query to only include a single partition, we have unintentionally excluded records and our query produces incorrect results.

Event Time vs Processing Time

Data engineers have a dirty secret, we typically partition our tables based on processing time. To the casual analyst, this may seem like an innocuous choice, however, the savvy reader has already realized that all of a company’s KPIs should carry an “*”.

How many new customers visited our site yesterday? *

* may include visitors from previous days and also is potentially missing data that just hasn’t landed yet

We often observe a similar pattern when ingesting streaming data referred to as late arriving data . It is very likely that during that hour our system will process records that originated in previous hours (or days). We often choose to partition this data by processing time to avoid the performance and correctness challenges that come from attempting to append this late arriving data to existing partitions.

Partitioning By Event Time

Instead of using the file to populate the year and month partition columns, I can opt to derive the year and month of each record from pickup_time (or dropoff_time, oh the agony!). In order to ensure that I can safely rerun this job without duplicating data, I can use Spark’s INSERT OVERWRITE syntax.

INSERT OVERWRITE taxis PARTITION (year, month)
SELECT *,
year(pickup_time) as year,
month(pickup_time) as month
FROM taxi_202003

BUT, as we have observed, each monthly source file may contain data that will map into multiple target partitions. Even with Spark’s dynamic partition overwrite enabled, if I attempt to naively insert this data I will end up deleting data by overwriting data in unintended partitions.

Partitioning By Processing Time & Event Time

In order to avoid unintentionally overwriting data, I can use a combination of processing time and event time partitions.

CREATE TABLE taxis AS  PARTITIONED BY filemonth, year, month

Now I can combine static and dynamic partition overwrites to achieve an idempotent write.

INSERT OVERWRITE taxis PARTITION (filemonth=202003, year, month)
SELECT *,
       year(pickup_time) as year,
       month(pickup_time) as month
FROM taxi_202003

We have statically defined the partition for the file we are loading, so we can safely load this file multiple times without overwriting data from other files. And because the year and month partitions are derived from the pickup_time column, our two previous queries (with and without the partition predicates) yield identical (correct) results.

Time to celebrate a job well done?

Sadly, not quite yet. I have now laid the foundations for a new, more subtle problem. What started as a single file of source data for each month will be spread across potentially many partitions. As many months of data continue to be loaded into this table, I will likely encounter the dreaded small files problem .

Table Formats to the Rescue

Luckily, we now have table formats like Delta and Iceberg which allow us to leverage MERGE operations and make this pattern much more efficient.

MERGE INTO taxis t USING (SELECT * FROM source) s
ON t.pickup_time = s.pickup_time AND t.dropoff_time = s.dropoff_time
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *

We are almost there, because we still have this anti-pattern of adding columns to our data model just to hold the partition values. As you recall, this also means that all of our queries need to add these extraneous columns in order to avoid full table scans:

SELECT count(*) from nyc_taxi where pickup_time between 2020-03-01 and 2020-04-01 AND year=2020 and month=03

Sidenote: How does the analyst even know they are supposed to add this additional predicate? Tribal knowledge? Being repeatedly scolded by whoever is paying the AWS bill?

Iceberg to the Rescue

If you are wise and experienced (aka old) like me and remember how data used to work in the before days (e.g. before hadoop), then you know that the concept of partitioning is not new.

Unlike hadoop-based table formats, Iceberg partitioning is configuration that instructs engines how to structure the data, rather than extra columns that represent a directory structure. Iceberg keeps track of the relationship between the data and the structure, so we never need to select those directories manually nor do our queries require extra predicates in order to be efficient.

In fact, for time-based partitions, Iceberg provides handy transforms for defining partitions as part of our table definition so we don’t even have to think about partitioning as part of our ingest job.

CREATE TABLE iceberg.taxis AS  PARTITIONED BY month(pickup_time)

So we can finally get all the way back to our original ideal query and know that it will be both efficient and accurate.

SELECT count(*) FROM iceberg.taxis WHERE pickup_time BETWEEN 2020-03-01 AND 2020-04-01;

RESULT: 3,007,384

Expert Mode: This also means that we can evolve the partition scheme for a table without having to restate all the data!

The Circle is Complete

We have seen how a table’s partitioning strategy can impact both read and write performance as well as the accuracy of query results. By leveraging Iceberg, data engineers can avoid the problems introduced by the complexities of processing time vs event time while analysts can easily avoid full table scans!