Streaming ingestion into Apache Iceberg tables is an exciting topic that brings the worlds of real-time and batch closer together. But it’s important also to think about what we do with the data after it’s ingested.
In a recent post, I talked about streaming change data capture (CDC) data into a mirror table in Iceberg using Debezium, Apache Kafka, and Kafka Connect. This is a great way to make operational data available for analytic purposes, but there are trade-offs to consider. In that post, we learned how to merge data from a change log table to a mirror table to keep the mirror table up-to-date. The merge operation can be resource-intensive so we don’t want to run it too frequently. But the less frequently we run it, the more stale our mirror table will be. Like I said, trade-offs.
In this post, we look at another technique that builds off of the merge option. Using a view that combines our change log table and mirror table can give us more control as we tune our processes to find the right balance for each use case.
We’ll start with the scenario from my earlier post. If you haven’t read it, please check it out for context.
The change log and the mirror tables
In our current state we have a change log table that records changes from our source table via the Iceberg connector for Kafka Connect. We have configured the connector to append these change log records every five minutes.
The records in this table contain an operation code specifying an insert, update, or delete, along with two structs containing the before and after values. Each of those structs contains the fields from our source table. This data is pretty fresh, given the five-minute interval setting, but the structure and size of this table make it a poor candidate for querying the current state of the transactional system.
op string
payload <struct<
after:struct<
firstName: string,
lastName: string,
last_updated: string,
id: string,
email: string>,
before:struct<
firstName: string,
...
We also have a mirror table that contains the same fields as the source table. The mirror table is updated by a merge operation we have scheduled to run every three hours.
id string
firstName string
lastName string
email string
last_updated string
This table is much smaller and easy to query, but it can be up to three hours out of date.
This is where Iceberg’s view support comes in. We can create a view that combines data in the mirror table with the most recent changes from the change log table to get the best of both worlds, sort of.
The view
Our view combines the records in the mirror table with the after portion of the records from the change log table, selecting the most recent record for each ID.
CREATE OR REPLACE VIEW rpc.customer_view AS
SELECT *
FROM (
-- Find the most recent record for each ID across both tables
SELECT *, row_number() OVER(PARTITION BY id ORDER BY last_updated DESC) AS rnk
FROM (
-- Union the mirror table with all of the new records in the change log table
SELECT * FROM rpc.customer_mirror
UNION ALL
SELECT payload.after.id, payload.after.firstName, payload.after.lastName,
payload.after.email, payload.after.last_updated FROM rpc.cdc_customer
WHERE op != 'd' AND payload.after.last_updated >= (
SELECT MAX(last_updated) FROM rpc.customer_mirror)
)
)
WHERE rnk = 1;
Now we can use this view to retrieve the freshest customer data in our data lake more efficiently. Let’s try it out.
First, we select the latest data from the mirror table.
spark-sql ()> select max(last_updated) from rpc.customer_mirror;
2024-04-28T20:42:36
Time taken: 0.152 seconds, Fetched 1 row(s)
Now let’s do the same from the view.
spark-sql ()> select max(last_updated) from rpc.customer_view;
2024-04-28T23:19:23
Time taken: 0.641 seconds, Fetched 1 row(s)
As you can see, the query against the view was slower than that against the mirror table, but the data from the view is much more recent.
The speed of queries against the view is directly affected by the amount of new data in the change log table, since the last merge to the mirror table. Merging more frequently can speed up these queries. So, there are trade-offs and tuning that can be done. But if low latency is important to you, this might be a good approach.
Still, there are situations where a view won’t be the best option – for example, if the source table changes very frequently. However, for many CDC use cases, this really does give us the best of both worlds.