Configuring Apache Spark

GETTING STARTED

Apache Spark provides comprehensive support for Apache Iceberg via both extended SQL syntax and stored procedures to manage tables and interact with datasets. This recipe will show you how to configure Spark to connect to Iceberg catalogs.

Defining Spark catalogs

In Spark, catalogs are used to connect to data sources that can load tables for queries.

A catalog is a named repository of tables that are organized into namespaces. Most catalogs are like Apache Hive and support a single-level namespace, which is called a database. In Spark, database refers to the same grouping that Trino calls a schema.

Spark loads catalogs from configuration properties under spark.sql.catalog. The most basic configuration creates a catalog from a name property where the value is a JVM class to instantiate. For example, this property creates an Iceberg catalog named sandbox:

spark.sql.catalog.sandbox=org.apache.iceberg.spark.SparkCatalog

Additional properties, starting with the catalog’s name, will be passed to the catalog when it is initialized, with the property prefix removed. For Iceberg, you pass the standard catalog configuration settings from the start of this chapter. For example, to configure a catalog to use the REST protocol, set type=rest.

spark.sql.catalog.sandbox=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.sandbox.type=rest

Note: You can reference environment variables when configuring the REST catalog using the value prefix env: followed by the variable name. For example, to load REST_CREDENTIAL from the environment, use env:REST_CREDENTIAL.

In addition to the common settings, you can pass options that are specific to a catalog (the REST credential, for instance) and settings that are specific to Spark. One particularly useful Spark setting configures the default namespace: default-namespace=examples

Here is a full example configuration for the sandbox catalog:

spark.sql.catalog.sandbox=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.sandbox.type=rest
spark.sql.catalog.sandbox.uri=https://api.www.tabular.io/ws
spark.sql.catalog.sandbox.default-namespace=examples
spark.sql.catalog.sandbox.credential=env:REST_CREDENTIAL
spark.sql.catalog.sandbox.warehouse=sandbox

The Spark session catalog

Before Spark added support for multiple catalogs, each session used a global metastore as the catalog. Spark still maintains compatibility for this built-in session catalog and its v1 data source API.

Many Hive deployments still use the session catalog, so it’s important to be able to configure it and add Iceberg support. The session catalog is configured using a Hive Thrift URI, like the following:

# connect to HMS (hive vs. in-memory)
spark.sql.catalogImplementation=hive
spark.hadoop.hive.metastore.uris=thrift://hive-metastore-host:9083

Iceberg uses the v2 data source API, so the global session catalog doesn’t have Iceberg support by default.

You can add Iceberg support by configuring a v2 implementation for the session catalog, using the special name spark_catalog and Iceberg’s SparkSessionCatalog class.

# connect to HMS (hive vs. in-memory)
spark.sql.catalogImplementation=hive
spark.hadoop.hive.metastore.uris=thrift://hive-metastore-host:9083
# Wrap the session to add Iceberg support
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog
spark.sql.catalog.spark_catalog.type=hive
spark.sql.catalog.spark_catalog.default-namespace=default

If no uri option is passed, the Hive catalog will use the same URI as the session catalog (spark.hadoop.hive.metastore.uris).

Additional configuration

In addition to defining Iceberg catalogs, you should ensure the following configurations are set:

  • spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions This configures Spark to use Iceberg’s extended SQL syntax, which includes commands to run stored procedures and make Iceberg-specific table changes like setting a table’s write order.
  • spark.sql.defaultCatalog=sandbox This sets the default catalog for new Spark SQL sessions.

Configuring your Spark environment

Spark configuration is typically passed in one of two places: a configuration properties file or command-line arguments.

Configuration via spark-defaults.conf

Defining a default configuration for Spark is an effective way to reproducibly initialize your Spark environment across multiple jobs or sessions without a complicated series of commands.

In Spark, you can use conf/spark-defaults.conf to set common configuration across runs. This approach is typically used for configuration that doesn’t change often, like defining catalogs. Each invocation will include the defaults, which can be overridden by settings passed through the command-line. The file format is a Java properties file.

When configuring an environment for production, it is best to include the Iceberg dependencies in the jars directory in your Spark distribution so they are not fetched each time a session is started.

# conf/spark-defaults.conf
# Instead of using spark.jars.packages, add Iceberg Jars to jars/
# Add Iceberg's SQL extensions
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
# Select the default catalog, sandbox
spark.sql.defaultCatalog=sandbox
# Add a REST catalog named sandbox
spark.sql.catalog.sandbox=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.sandbox.type=rest
spark.sql.catalog.sandbox.uri=https://api.www.tabular.io/ws
spark.sql.catalog.sandbox.default-namespace=examples
spark.sql.catalog.sandbox.credential=env:REST_CREDENTIAL
spark.sql.catalog.sandbox.warehouse=sandbox
# Add a Hive catalog
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog
spark.sql.catalog.spark_catalog.type=hive
spark.sql.catalog.spark_catalog.default-namespace=default
spark.hadoop.hive.metastore.uris=thrift://localhost:9083

Configuring via CLI Arguments

When running a Spark executable (spark-sql, spark-shell, pyspark), you can pass arguments via the command line using the --config option. Stringing together multiple arguments allows you to configure Iceberg catalogs and default behaviors. Leveraging the --packages argument allows for including dependencies required as well.

Here’s an example configuration for an Iceberg REST catalog:

# Spark 3.5.0 with a Iceberg REST Catalog named: sandbox
./bin/spark-sql \
  --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.2 \
  --packages org.apache.iceberg:iceberg-aws-bundle:1.4.2 \
  --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
  --conf spark.sql.defaultCatalog=sandbox \
  --conf spark.sql.catalog.sandbox=org.apache.iceberg.spark.SparkCatalog \
  --conf spark.sql.catalog.sandbox.type=rest \
  --conf spark.sql.catalog.sandbox.uri=https://api.www.tabular.io/ws \
  --conf spark.sql.catalog.sandbox.default-namespace=examples \
  --conf spark.sql.catalog.sandbox.credential=${REST_CREDENTIAL} \
  --conf spark.sql.catalog.sandbox.warehouse=sandbox

Here’s an example configuration to connect to a Hive Metastore:

# Spark 3.5.0
./bin/spark-sql \
  --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.2 \
  --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
  --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
  --conf spark.sql.catalog.spark_catalog.type=hive \
  --conf spark.hadoop.hive.metastore.uris=thrift://localhost:9083

Running Spark for the first time

Once you’ve configured Spark, you can check connectivity to Iceberg catalogs using a Spark SQL or PySpark REPL.

# Spark 3.5.0 with catalogs defined in conf/spark-defaults.conf 
./bin/spark-sql

Next, run SQL commands to show the current catalog and namespace, then list tables:

SHOW CURRENT NAMESPACE
-- sandbox  examples

SHOW TABLES

-- backblaze_drive_stats
-- nyc_taxi_locations
-- nyc_taxi_yellow

SELECT * FROM nyc_taxi_yellow LIMIT 10;

-- ...

Helpful commands

Spark SQL sessions have a default catalog and namespace that is used to load tables without specifying the full table name. You can set the defaults in Spark configuration, but it’s not always easy to remember what a session currently uses. To display the current settings, use SHOW CURRENT NAMESPACE:

SHOW CURRENT NAMESPACE

-- sandbox  examples

You can also set the current catalog and namespace with the USE command:

USE sandbox.examples

To see the catalogs that are defined in your environment, use SHOW CATALOGS:

SHOW CATALOGS

-- sandbox
-- spark_catalog