,

An Introduction to the Iceberg Java API Part 3 – Appending Data Files

northern lights

In Part 1 and Part 2, we covered the catalog interface and how to read your table through table scans. In this third part of the Java API series, we’re
going to cover how you can append data files to an existing Iceberg table. We’ll also cover the Iceberg data module that provides some convenience classes for generating records and writing the actual files.

Just like in Part 1 and 2, we’re going to use the docker environment from one of our earlier posts, Docker, Spark, and Iceberg: The Fastest Way to Try Iceberg!.
If you already have the tabulario/spark-iceberg image cached locally, make sure you pick up the latest changes by running the following.

docker-compose pull
docker-compose up -d

You can find a fully runnable notebook named Iceberg - An Introduction to the Iceberg Java API.ipynb and if you scroll down to the “Part 3” section, you can find all of the code snippets from this post.
Now let’s get started!

Initializing the Catalog

Just like in Part 1 and 2, we’ll load the REST catalog that’s already configured in the docker environment.

import org.apache.iceberg.catalog.Catalog;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.rest.RESTCatalog;
import org.apache.iceberg.aws.AwsProperties;

Map<String, String> properties = new HashMap<>();

properties.put(CatalogProperties.CATALOG_IMPL, "org.apache.iceberg.rest.RESTCatalog");
properties.put(CatalogProperties.URI, "http://rest:8181");
properties.put(CatalogProperties.WAREHOUSE_LOCATION, "s3a://warehouse/wh");
properties.put(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.aws.s3.S3FileIO");
properties.put(AwsProperties.S3FILEIO_ENDPOINT, "http://minio:9000");

RESTCatalog catalog = new RESTCatalog();
Configuration conf = new Configuration();
catalog.setConf(conf);
catalog.initialize("demo", properties);

Creating a Table

For this post, let’s create a table webapp.user_events with a simple schema for user events.

import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.PartitionSpec;

Schema schema = new Schema(
      Types.NestedField.optional(1, "event_id", Types.StringType.get()),
      Types.NestedField.optional(2, "username", Types.StringType.get()),
      Types.NestedField.optional(3, "userid", Types.IntegerType.get()),
      Types.NestedField.optional(4, "api_version", Types.StringType.get()),
      Types.NestedField.optional(5, "command", Types.StringType.get())
    );

Namespace webapp = Namespace.of("webapp");
TableIdentifier name = TableIdentifier.of(webapp, "user_events");
catalog.createTable(name, schema, PartitionSpec.unpartitioned());

If you remember from Part 2, we primed our table by inserting a few records using Spark. Since in this post we’re going to use the Iceberg Java client to insert records directly to the table,
we won’t use Spark or any other compute engine.

Creating Records

The Iceberg data module has a GenericRecord class that makes it easy to create records that match an Iceberg schema. Let’s use that class and create a list of a few records.

import java.util.UUID;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.iceberg.data.GenericRecord;

GenericRecord record = GenericRecord.create(schema);
ImmutableList.Builder<GenericRecord> builder = ImmutableList.builder();
builder.add(record.copy(ImmutableMap.of("event_id", UUID.randomUUID().toString(), "username", "Bruce", "userid", 1, "api_version", "1.0", "command", "grapple")));
builder.add(record.copy(ImmutableMap.of("event_id", UUID.randomUUID().toString(), "username", "Wayne", "userid", 1, "api_version", "1.0", "command", "glide")));
builder.add(record.copy(ImmutableMap.of("event_id", UUID.randomUUID().toString(), "username", "Clark", "userid", 1, "api_version", "2.0", "command", "fly")));
builder.add(record.copy(ImmutableMap.of("event_id", UUID.randomUUID().toString(), "username", "Kent", "userid", 1, "api_version", "1.0", "command", "land")));
ImmutableList<GenericRecord> records = builder.build();

Now that we have a list of records, we’re ready to write them to a file.

Writing Records to a File

import org.apache.iceberg.Files;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.data.parquet.GenericParquetWriter;

String filepath = table.location() + "/" + UUID.randomUUID().toString();
OutputFile file = table.io().newOutputFile(filepath);
DataWriter<GenericRecord> dataWriter =
    Parquet.writeData(file)
        .schema(schema)
        .createWriterFunc(GenericParquetWriter::buildWriter)
        .overwrite()
        .withSpec(PartitionSpec.unpartitioned())
        .build();

try {
    for (GenericRecord record : builder.build()) {
        dataWriter.write(record);
    }
} finally {
    dataWriter.close();
}

Appending a File to an Iceberg Table

Once the data writer is closed, it can be converted to a data file. The data file contains all metadata about the file that’s required by the Iceberg table such as the path,
size, record count, as well as column level statistics.

import org.apache.iceberg.DataFile;

DataFile dataFile = dataWriter.toDataFile();

The data file can then be appended directly to the Iceberg table and committed.

import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.Table;

Namespace webapp = Namespace.of("webapp");
TableIdentifier name = TableIdentifier.of(webapp, "user_events");
Table tbl = catalog.loadTable(name);
tbl.newAppend().appendFile(dataFile).commit()

Reading the New Records

The table can be read using a table scan to confirm that the data has been committed.

import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.IcebergGenerics;

CloseableIterable<Record> result = IcebergGenerics.read(tbl).build();
for (Record r: result) {
    System.out.println(r);
}

output

Record(002362eb-03cd-486e-8017-69426f0e1e1d, Bruce, 1, 1.0, grapple)
Record(73e32746-f729-4147-b532-cd7e9cf5d0fc, Wayne, 1, 1.0, glide)
Record(0483535d-c680-4b49-9ccf-0965fa94b99d, Clark, 1, 2.0, fly)
Record(c82fb1ab-8ba4-4af2-8be8-a1ab0734da76, Kent, 1, 1.0, land)

Closing Remarks

The Iceberg data module provides a convenient way to generate records in memory, write them out to data files, and append them to an existing Iceberg table.
This is great for cases that only require appending relatively small amounts of data and don’t have the problems of scale or concurrency. The more typical
mechanism for working with Iceberg tables is to use one of the many available compute engines that include Iceberg support. Each compute engine has their
own set of strengths and matching engine features to a workflow’s requirements is the best way to get the most out of Iceberg.

What’s Next

So far in this series, we’ve covered catalogs and tables, as well as how you can perform table scans to either pull data out of an Iceberg table or produce a set of tasks to be further processed. In this post we illustrated how to you can insert records directly into an Iceberg table using just the Iceberg client. If you have ideas on the next areas of the Iceberg client you’d like us to explore or are simply looking for how to become a part of the Iceberg community, feel free check out our community page to learn how to reach out!