Integrated Audits: Streamlined Data Observability with Apache Iceberg

blog-image

There are many challenges with managing data at scale. This blog post will show you how Iceberg solves one of those problems–integrating data auditing.

A top priority for a data-driven organization is data quality. However, what qualifies as “quality data” is often nuanced and can vary drastically accross organizations, business contexts, and even individual datasets. Using integrated audits, Iceberg lets you seamlessly add audits that run directly before data is commited to your production tables.

Note: If you’d like an easy way to try this demo out locally, check out our other blog post “Docker, Spark, and Iceberg: The Fastest Way to Try Iceberg!” . It will help you setup an Iceberg environment in minutes and includes a notebook titled “Iceberg - Integrated Audits Demo” where you can try out everything described here! If you already have the tabulario/spark-iceberg image cached locally, make sure you pick up the latest changes by running docker-compose pull.

Overview

Integrated audits include three primary steps:

  1. Writing data in “stage-only” mode by setting spark.wap.id
  2. Auditing the staged snapshot using an auditing tool or custom queries
  3. Cherry-picking the staged snapshot into the current state of the table

We’ll go into more detail on each of these steps and for the rest of this post I’ll refer to a single round of these steps as an integrated audit session.

Let’s see this in action!

Load NYC Film Permits Data

For this demo, we will use the New York City Film Permits dataset available as part of the NYC Open Data initiative. We’re using a locally saved copy of a 1000 record sample, but feel free to download the entire dataset to use in this demo!

We’ll save the sample dataset into an iceberg table called permits.

df = spark.read.option("inferSchema","true").option("multiline","true").json("nyc_film_permits.json")
df.write.saveAsTable("permits")

Taking a quick peek at the data, you can see that there are a number of permits for different boroughs in New York.

spark.read \
    .format("iceberg") \
    .load("permits") \
    .groupBy("borough") \
    .count() \
    .show()

output:

+-------------+-----+
|      borough|count|
+-------------+-----+
|    Manhattan|  456|
|       Queens|  157|
|     Brooklyn|  340|
|        Bronx|   38|
|Staten Island|    9|
+-------------+-----+

Generate an ID for an Integrated Audit Session

An integrated audit session is a single cadence of:

  1. Staging changes to a table
  2. Auditing the staged changes
  3. Committing the changes (optional)

Each of these sessions must be represented with an ID. You can use any convention that makes sense in your environment but in this demo we’ll simply use a UUID.

import uuid
ia_session_id = uuid.uuid4().hex
print(ia_session_id)  # 2e8d33a9c7ae4ee8b3c3f8c1faf31afa

The Setup

By default, changes to tables are immediately visible. For integrated audits, you need to get Iceberg to stage changes as snapshots that are not referenced by the table’s current state. To do this, first set write.wap.enabled to enable the ability to stage commits to be audited and later published on the table.

ALTER TABLE permits
SET TBLPROPERTIES (
    'write.wap.enabled'='true'
)

Next, set spark.wap.id in the Spark session. This is the audit session ID and will be used to look up the staged changes later.

spark.conf.set('spark.wap.id', ia_session_id)

With a spark.wap.id value set, you can now safely write directly to the permits table–don’t worry, these changes will only be staged, not committed!

Staging The Changes

Besides setting the Iceberg table property and the the audit session ID in your Spark session configuration, staging changes is as simple as writing directly to the permits table! This is awesome in situations where you’re working with a large and complex data ingestion pipeline and means you no longer have to include hard-coded logic to handle switching between some form of an “audit-mode” as opposed to a “production-mode”.

For this demo, let’s use a simple query to stage a delete of all records for film permits in the manhattan borough.

DELETE FROM permits
WHERE borough='Manhattan'

As described, even though the query was executed against the production table, these changes are only staged and not committed since we are within an integrated audit session. Let’s confirm this by verifying that a count by borough still includes the Manhattan records.

SELECT borough, count(*) permit_cnt
FROM permits
GROUP BY borough

output:

+-------------+-----------+
|      borough| permit_cnt|
+-------------+-----------+
|    Manhattan|        456|
|       Queens|        157|
|     Brooklyn|        340|
|        Bronx|         38|
|Staten Island|          9|
+-------------+- ---------+

The Audit

Once the changes for this session are staged, you can perform all of your audits to validate the data. The first step is to retrieve the snapshot ID generated by the changes and tagged with this integrated audit session ID.

query = f"""
SELECT snapshot_id
FROM permits.snapshots
WHERE summary['wap.id'] = '{ia_session_id}'
"""
ia_session_snapshot = spark.sql(query).first()[0]
print(ia_session_snapshot)  # 6860666459848084266

This snapshot includes the staged (but not commited) changes to your production table. Once you have this snapshot ID, you can use Iceberg’s Time Travel feature to query it!

spark.read \
    .option("snapshot-id", ia_session_snapshot) \
    .format("iceberg") \
    .load("permits") \
    .groupBy("borough") \
    .count() \
    .show()

output:

+-------------+-----+
|      borough|count|
+-------------+-----+
|       Queens|  157|
|     Brooklyn|  340|
|        Bronx|   38|
|Staten Island|    9|
+-------------+-----+

At this point, you can use any auditing tool or technique to validate your changes. For this demo, we’ll do a simple audit that confirms that the only remaining boroughs are Queens, Brooklyn, Bronx, and Staten Island. If either borough is missing or any additional boroughs are found, we’ll raise an exception.

expected_boroughs = {"Queens", "Brooklyn", "Bronx", "Staten Island"}
distinct_boroughs = spark.read \
    .option("snapshot-id", ia_session_snapshot) \
    .format("iceberg") \
    .load("permits") \
    .select("borough") \
    .distinct() \
    .toLocalIterator()
boroughs = {row[0] for row in distinct_boroughs}

if len(boroughs) != len(expected_boroughs) != len(set.union(boroughs, expected_boroughs)):
    raise ValueError(f"Audit failed, borough set does not match expected boroughs: {boroughs} != {expected_boroughs}")

If the above check does not fail, we can go ahead and commit our staged data to publish our changes!

The Publish

After the audits are completed, publishing the data is as simple as running a cherrypick_snapshot stored procedure.

publish_query = f"CALL demo.system.cherrypick_snapshot('permits', {ia_session_snapshot})"
spark.sql(publish_query)

output:

+--------------------+--------------------+
|  source_snapshot_id| current_snapshot_id|
+--------------------+--------------------+
| 6860666459848084266| 6860666459848084266|
+--------------------+--------------------+

The source_snapshot_id representes the unpublished snapshot ID. The output of the cherry-pick shows that the snapshot ID has been published and is now the current_snapshot_id of the table.

That’s it! Publishing the changes from this integrated audit session is a simple metadata-only operation that instantly makes the changes live for all downstream consumers querying the permits table! Query results will now include the commit that removed all Manhattan records.

spark.read \
    .format("iceberg") \
    .load("permits") \
    .groupBy("borough") \
    .count() \
    .show()

output:

+-------------+-----+
|      borough|count|
+-------------+-----+
|       Queens|  157|
|     Brooklyn|  340|
|        Bronx|   38|
|Staten Island|    9|
+-------------+-----+

Closing Remarks

Integrated audits is one example of the many features Iceberg offers to make working with massive data more manageable. The flexibility of this feature allows you to decouple the logic for your ingestion pipelines from your data quality infrastructure. Countless engineering hours are saved from having to repeatedly design intricate “test-mode” flags for individual pipelines, something that each engineer will probably do differently leading to a fragmented data quality story accross an organization. Additionally, any audit tool or technique that can set the “snapshot-id” read option can integrate cleanly into your data pipelines and upcoming Iceberg features such as branching and tagging will enhance integrated audits even further. There are even countless ways you can design abstractions in your workflow orchestration systems that further build upon this core feature.

Finally, if you have thoughts about Iceberg’s integrated audits or just want to meet the Iceberg community, check out our community page to learn where to find us!