An Introduction to the Iceberg Java API Part 2 – Table Scans

coffee mug in front of iceberg

In Part 1, we covered the catalog interface and how you can load a catalog and use it to create a table with a defined schema. In this second part of the Java API series, we’re
going to cover the core Java API that is commonly used by query engines to perform table scans and can also be used for developing applications that need to interact with Iceberg’s core internals.
If you’re a developer for a compute framework or query engine, this is an area of the Iceberg client that you’ll need to get familiar with. However, if you’re a data professional that works with Iceberg
through other tools, you typically won’t use the Java client directly to perform table scans but may find this post as a helpful peak under the hood.

Just like in Part 1, we’re going to use the docker environment from one of our earlier posts, Docker, Spark, and Iceberg: The Fastest Way to Try Iceberg!.
If you already have the tabulario/spark-iceberg image cached locally, make sure you pick up the latest changes by running the following.

docker-compose pull
docker-compose up -d
docker exec -it spark-iceberg notebook

You can find a fully runnable notebook named Iceberg - An Introduction to the Iceberg Java API.ipynb and if you scroll down to the “Part 2” section, you can find all of the code snippets from this post.
Now let’s get started!

Inserting Records to Scan

In Part 1, we’ve loaded the REST catalog that’s already configured in the docker environment and created a demo.webapp.logs table.

import org.apache.iceberg.catalog.Catalog;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.rest.RESTCatalog;
import org.apache.iceberg.aws.AwsProperties;

Map<String, String> properties = new HashMap<>();

properties.put(CatalogProperties.CATALOG_IMPL, "org.apache.iceberg.rest.RESTCatalog");
properties.put(CatalogProperties.URI, "http://rest:8181");
properties.put(CatalogProperties.WAREHOUSE_LOCATION, "s3a://warehouse/wh");
properties.put(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.aws.s3.S3FileIO");
properties.put(AwsProperties.S3FILEIO_ENDPOINT, "http://minio:9000");

RESTCatalog catalog = new RESTCatalog();
Configuration conf = new Configuration();
catalog.setConf(conf);
catalog.initialize("demo", properties);

Namespace webapp = Namespace.of("webapp");
Schema schema = new Schema(
      Types.NestedField.required(1, "level", Types.StringType.get()),
      Types.NestedField.required(2, "event_time", Types.TimestampType.withZone()),
      Types.NestedField.required(3, "message", Types.StringType.get()),
      Types.NestedField.optional(4, "call_stack", Types.ListType.ofRequired(5, Types.StringType.get()))
    );
PartitionSpec spec = PartitionSpec.builderFor(schema)
      .hour("event_time")
      .identity("level")
      .build();
TableIdentifier name = TableIdentifier.of(webapp, "logs");

catalog.createTable(name, schema, spec)

Before we start executing table scans though, we need to add some data in the table. Let’s start a Spark session in Java and run a few INSERT queries. We’ll use all the configuration parameters
from our previous post describing the docker environment, to learn more about what these properties are doing, check out the section in that post which describes
A Minimal Runtime.

import org.apache.spark.sql.SparkSession;

SparkSession spark = SparkSession
      .builder()
      .master("local[*]")
      .appName("Java API Demo")
      .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
      .config("spark.sql.catalog.demo", "org.apache.iceberg.spark.SparkCatalog")
      .config("spark.sql.catalog.demo.catalog-impl", "org.apache.iceberg.rest.RESTCatalog")
      .config("spark.sql.catalog.demo.uri", "http://rest:8181")
      .config("spark.sql.catalog.demo.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
      .config("spark.sql.catalog.demo.s3.endpoint", "http://minio:9000")
      .config("spark.sql.defaultCatalog", "demo")
      .config("spark.eventLog.enabled", "true")
      .config("spark.eventLog.dir", "/home/iceberg/spark-events")
      .config("spark.history.fs.logDirectory", "/home/iceberg/spark-events")
      .getOrCreate();

spark.sparkContext().setLogLevel("ERROR"); // Set the log level to ERROR otherwise we'll get a ton of INFO messages

Now we can use the spark session to submit a our INSERT queries and add data to the demo.webapp.logs table we created in Part 1.

String query = "INSERT INTO demo.webapp.logs "
             + "VALUES "
             + "('info', timestamp 'today', 'Just letting you know!', array('stack trace line 1', 'stack trace line 2', 'stack trace line 3')), "
             + "('warning', timestamp 'today', 'You probably should not do this!', array('stack trace line 1', 'stack trace line 2', 'stack trace line 3')), "
             + "('error', timestamp 'today', 'This was a fatal application error!', array('stack trace line 1', 'stack trace line 2', 'stack trace line 3'))";

spark.sql(query).show()

Loading the Table

Now that we have some data, we need to load our table from the catalog. That involves first loading the catalog, just as we did in Part 1.

import org.apache.iceberg.catalog.Catalog;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.rest.RESTCatalog;

Map<String, String> properties = new HashMap<>();

properties.put(CatalogProperties.CATALOG_IMPL, "org.apache.iceberg.rest.RESTCatalog");
properties.put(CatalogProperties.URI, "http://rest:8181");
properties.put(CatalogProperties.WAREHOUSE_LOCATION, "s3a://warehouse/wh/");
properties.put(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.aws.s3.S3FileIO");
properties.put(AwsProperties.S3FILEIO_ENDPOINT, "http://minio:9000");

RESTCatalog catalog = new RESTCatalog();
Configuration conf = new Configuration();
catalog.setConf(conf);
catalog.initialize("demo", properties);

Next, we’ll need to create a Namespace object for our “webapp” database, a TableIdentifier for
our “logs” table. The table identifier can then be passed to the loadTable method on our RESTCatalog instance.

import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;

Namespace webapp = Namespace.of("webapp");
TableIdentifier name = TableIdentifier.of(webapp, "logs");
Table table = catalog.loadTable(name);

Scan to Produce Generic Records

As mentioned in the intro, there are different types of scans that you can perform on an Iceberg table using the Java client. In a scenario where you simply want to retrieve the data from the table,
optionally passing in a filter expression, then you can use IcebergGenerics. Let’s perform a scan without applying any filters.

import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.IcebergGenerics;

CloseableIterable<Record> result = IcebergGenerics.read(table).build();

The result variable now contains an iterable of Record instances that contains the data from the demo.webapp.logs table. Try and iterate over result and print out each Record.

for (Record r: result) {
    System.out.println(r);
}

output:

Record(info, 2022-04-28T00:00Z, Just letting you know!, [This, Then this, And then finally this!])
Record(warning, 2022-04-28T00:00Z, You probably should not do this!, [This, Then this, And then finally this!])
Record(error, 2022-04-28T00:00Z, This was a fatal application error!, [This, Then this, And then finally this!])

Scan Using an Expression

Often times, a full table scan is not optimal, particularly for tables with massive amounts of data. Using Iceberg expressions, you can add filters to your table scan by chaining where
clauses to the scan builder. Let’s add an expression to filter the scan to just the “error” records.

import org.apache.iceberg.expressions.Expressions;

CloseableIterable<Record> result = IcebergGenerics.read(table)
        .where(Expressions.equal("level", "error"))
        .build();

output:

Record(error, 2022-04-28T00:00Z, This was a fatal application error!, [This, Then this, And then finally this!])

Scan to Produce Scan Tasks

For pulling some data from an Iceberg table into memory, using IcebergGenerics works great. However when integrating Iceberg into a compute framework or query engine, it’s more useful to produce a list
of files that match a particular expression. For that, you can construct a TableScan object which generates a set of tasks.

import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.TableScan;

TableScan scan = table.newScan();

Just like with IcebergGenerics, you can apply expressions to the scan. Let’s add an expression that filters the scan to only include “info” records.

import org.apache.iceberg.expressions.Expressions;

TableScan filteredScan = scan.filter(Expressions.equal("level", "info")).select("message")

Now we can retrieve a list of tasks from the filtered scan.

Iterable<CombinedScanTask> result = filteredScan.planTasks();

The DataFile

CombinedScanTask includes FileScanTask instances that can contain different types of files, such as data files, position delete files, or equality delete files. Let’s pull out the first file and inspect it.

import org.apache.iceberg.DataFile;

CombinedScanTask task = result.iterator().next();
DataFile dataFile = task.files().iterator().next().file();
System.out.println(dataFile);

output:

GenericDataFile{content=data, file_path=s3a://warehouse/wh/nyc/logs/data/event_time_hour=2023-01-31-00/level=info/00000-0-02ca362a-3536-4ccc-b7f0-ceec16121603-00001.parquet, file_format=PARQUET, spec_id=0, partition=PartitionData{event_time_hour=465312, level=info}, record_count=1, file_size_in_bytes=1535, column_sizes=null, value_counts=null, null_value_counts=null, nan_value_counts=null, lower_bounds=null, upper_bounds=null, key_metadata=null, split_offsets=[4], equality_ids=null, sort_order_id=null}

The DataFile instance has loads of information about the file, such as the:
– Path
– Format
– Size
– Partition it belongs to
– Number of records it contains

Additionally, if the file was committed using writers that include column level stats, the DataFile would also include information such as lower/upper bounds for columns, as well as null counts!

Closing Remarks

Table scans are a very important part of Iceberg’s functionality. If you need to build large scale ETL pipelines or run high performance queries, you’re almost always using Iceberg through a compute framework. Compute frameworks handle building Iceberg table scans for you and bundle it with the broader abstraction of the particular tool. Therefore, engineers who work on these compute frameworks are well versed with the scan API and know how to use scan tasks to significantly narrow down the files that need to be processed by the framework. On the other hand, the engineers who use these frameworks are empowered with performant and declarative commands, without the need to understand what’s happening behind the curtain.

What’s Next

So far in this series, we’ve covered catalogs and tables, as well as how you can perform table scans to either pull data out of an Iceberg table or produce a set of tasks to be further processed. Stay tuned for more posts in this series that unravel the Iceberg Java client and as always, if you’d like to be a part of the Iceberg community feel free check out our community page to learn where to find us!