Using Apache Iceberg with Polars

PYICEBERG

Polars is an extremely fast DataFrame library and in-memory query engine. It includes parallel execution, cache-efficient algorithms, and an expressive API, making it an attractive tool for efficient data wrangling and data pipelines. It’s a popular choice for data preprocessing, analytics, and machine learning.

The Polars Python API is now integrated with PyIceberg. This recipe shows how to install and use PyIceberg through Polars.

Installing Polars with PyIceberg

You can install Polars using pip install. Add the pyiceberg extra to include Iceberg support:

pip install 'polars[pyiceberg]'

Reading Iceberg tables from Polars

The recommended way to read from an Iceberg table is to connect to an Iceberg catalog. Refer to the PyIceberg configuration recipe to configure the catalogs in your PyIceberg environment.

To load data from a catalog table in Polars, first load the table using the PyIceberg API and then pass the table to Polars.

from pyiceberg.catalog import load_catalog
catalog = load_catalog('sandbox')
table = catalog.load_table('examples.nyc_taxi_yellow')
df = pl.scan_iceberg(tbl)

Next, you can filter the table dataframe to narrow down the data that will be read.

df.filter(
   (pl.col("pickup_time") >= '2021-01-01T00:00:00+00:00') &
   (pl.col("pickup_time") < '2021-02-01T00:00:00+00:00')).collect()

PyIceberg will evaluate the predicate and prune away all the irrelevant months to reduce the number of queries to S3 and speed up the read. This does not scan the whole table and takes advantage of Iceberg metadata to prune partitions and data files automatically.

Let’s use Polars to find the average tip per mile by weekday, just like in the Pandas recipe:

(df.filter(
    (pl.col("pickup_time") >= '2021-01-01T00:00:00+00:00') & 
    (pl.col("pickup_time") < '2021-02-01T00:00:00+00:00') & 
    (pl.col("tip_amount") > 0) & (pl.col("trip_distance") > 0)
).collect().select([
    (pl.col("tip_amount") / pl.col("trip_distance")).alias("tip_per_mile"),
     pl.col("pickup_time").map_elements(lambda x: x.weekday()).alias("pickup_time_weekday")
]).group_by(['pickup_time_weekday']).mean().sort("pickup_time_weekday"))

Reading static tables

Polars can also read an Iceberg table without interacting with a catalog, by pointing directly to the table’s current metadata location.

When reading directly, you may need to pass S3 configuration so that Polars can load the data files. (These are provided automatically by the REST catalog in the example above.)

import polars as pl
df = pl.scan_iceberg(
   "s3://bucket/path/to/iceberg-table/metadata.json",
   storage_options={
       "s3.region": "eu-central-1",
       "s3.access-key-id": "AKI...",
       "s3.secret-access-key": "19CO...",
   }
)

You can also pass storage options for GCS and Azure:

gcs_options = {
    "gcs.project-id": "my-gcp-project",
    "gcs.oauth.token": "ya29.dr.AfM..."
  }
azure_options = {
    "adlfs.account-name": "AZURE_STORAGE_ACCOUNT_NAME",
    "adlfs.account-key": "AZURE_STORAGE_ACCOUNT_KEY"
  }

For all the configuration options, please check the PyIceberg configuration page.