January 2, 2024

Lakehouse Example with Apache Spark, Minio, Nessie Catalog, Iceberg and Docker

spark-nessie-icaberg-lakehouse-kapak-960x640

Lakehouse solutions, which offer us the comfort of a relational database on big data by combining the best aspects of the data warehouse and the data lake, take their place in our lives daily. Today we will create a simple lakehouse example on docker using completely open-source components.

1. Components That Make Up the Infrastructure

1.1. Nessie

Nessie is an open-source project that functions as the Apache Hive Metastore, providing a transactional catalog for data lakes. The biggest difference between it and Hive metastore is the data versioning (Git-inspired data version control) feature.

1.2. MinIO

MinIO is a popular object storage solution that is Amazon S3 compatible. It solves the need for object-based storage, especially in closed environments where S3 cannot be used, and with Kubernetes.

1.3. Apache Spark

Apache Spark is an open-source distributed data processing engine. It is widely used for many needs such as data engineering, data science, data analysis, graph analysis, machine learning, real-time data processing.

1.4. Apache Iceberg

Iceberg is a high-performance format for large analytical tables. Iceberg brings the reliability and simplicity of SQL tables to big data, making it possible for tools such as Spark, Trino, Flink, Presto, Hive and Impala to work securely with the same tables simultaneously.

2. Standing Up Infrastructure with Docker Compose

We will set up the infrastructure for our example using the docker-compose.yaml file below.

version: "3"

services:
  # Nessie Catalog Server
  nessie:
    image: projectnessie/nessie:0.67.0
    container_name: nessie
    networks:
      vbo:
    ports:
      - 19120:19120
  # Minio
  minio:
    image: "minio/minio:RELEASE.2023-05-04T21-44-30Z"
    container_name: minio
    environment:
      - MINIO_ROOT_USER=minioadmin
      - MINIO_ROOT_PASSWORD=minioadmin
    networks:
      vbo:
    ports:
      - 9001:9001
      - 9000:9000
    command: ["server", "/data", "--console-address", ":9001"]
  # Spark
  spark:
    container_name: spark
    image: veribilimiokulu/pyspark-3.4.1_python-3.8:1.0
    ports:
      - "8888:8888"
      - "4041:4040"
    networks:
      - vbo
    volumes:
      - ./spark/examples:/opt/examples
    command: sleep infinity
networks:
  vbo:

Let’s stand up our Docker compose containers.

docker-compose up -d

3. MinIO Web Interface

Let’s log in at http://localhost:9001/login with user: minioadmin and password: minioadmin.

Let’s create a bucket called warehouse.

4. Spark

Let’s connect to the spark container.

docker exec -it spark bash

4.1. Start Jupyter Lab

I want to use Jupyter lab. For this, I will install the jupyterlab package and the findspark packages to expose Spark to Jupyter.

pip install jupyterlab findspark

Start Jupyterlab

jupyter lab --ip 0.0.0.0 --port 8888 --allow-root

When we take the link in the info logs and paste it into our browser, we will find Jupyter Lab.

4.2. Spark Application

Let’s import the necessary libraries in the first cell and tell the location of Spark.

import findspark
findspark.init("/opt/spark/")
from pyspark.sql import SparkSession, functions as F

Let’s define the important variables. Necessary details are available as comments.

# Spark reaches Nessie
url = "http://nessie:19120/api/v1"

# Nessie tables' bucket
full_path_to_warehouse = 's3a://warehouse'

# Branch for Nessie
ref = "main"

# Nessie authentication type. Other options (NONE, BEARER, OAUTH2 or AWS)
auth_type = "NONE"

# Because we use MinIO instead of AWS S3. To tell Spark, don't go to Amazon, stay here.
s3_endpoint = "http://minio:9000"

# To access MinIO. These are specified in docker-compose as root. In this state, it should not be used in prod environments.
accessKeyId='minioadmin'
secretAccessKey='minioadmin'

4.2.1. Spark Configuration

spark = (
    SparkSession.builder
    .master("local[2]")
    .appName("Spark Nessie Iceberg Demo")
    .config("spark.driver.memory", "2g")
    .config('spark.jars.packages',
            'org.apache.hadoop:hadoop-aws:3.3.0,io.delta:delta-core_2.12:2.4.0,org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.3.0,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.4_2.12:0.75.0')
    .config("spark.hadoop.fs.s3a.access.key", accessKeyId)
    .config("spark.hadoop.fs.s3a.secret.key", secretAccessKey)
    .config("spark.hadoop.fs.s3a.path.style.access", True)
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.hadoop.fs.s3a.endpoint", s3_endpoint)
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions")
    .config("spark.sql.catalog.nessie.uri", url)
    .config("spark.sql.catalog.nessie.ref", ref)
    .config("spark.sql.catalog.nessie.authentication.type", auth_type)
    .config("spark.sql.catalog.nessie", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.nessie.catalog-impl", "org.apache.iceberg.nessie.NessieCatalog")
    .config("spark.sql.catalog.nessie.warehouse", full_path_to_warehouse)
    .getOrCreate()
)

4.2.2. Spark Dataframe

Let’s read a data set from GitHub and create a spark dataframe.

from pyspark import SparkFiles
sc = spark.sparkContext
github_url="https://raw.githubusercontent.com/erkansirin78/datasets/master/Churn_Modelling.csv"
sc.addFile(github_url)
df = spark.read.csv(SparkFiles.get("Churn_Modelling.csv"),header= True, inferSchema=True)
df.show(3)

4.2.3.Nessie Namespace and Table Creation

spark.sql("CREATE NAMESPACE IF NOT EXISTS nessie.demo;")

spark.sql("DROP TABLE IF EXISTS nessie.demo.churn;")

spark.createDataFrame([], df.schema).writeTo("nessie.demo.churn").create()

4.2.4.Writing Spark Dataframe to Nessie Table

df.write.format("iceberg").mode("overwrite") \
    .save("nessie.demo.churn")

4.2.5. Creating Spark Dataframe by Reading from Nessie Table with Spark

Read from the table we wrote and create a dataframe again.

df_from_iceberg = spark.table("nessie.demo.churn")
df_from_iceberg.show()

In this article, we have created an example of creating a table on Lakehouse with open-source tools and reading and writing to this table with Spark. Goodbye until we meet in another article.

Notebook and compose file are here.

Note: I benefited greatly from articles [1, 2] in the preparation of this article, thank you.

Photo by Alexander Hafemann on Unsplash

5. References

  1. https://www.dremio.com/a-notebook-for-getting-started-with-project-nessie-apache-iceberg-and-apache-spark/
  2. https://medium.com/@khurrammeraj17/creating-a-lakehouse-by-using-apache-spark-minio-nessie-catalog-and-dremio-67c23a335616

Related articles