🏊

MinIO+Apache Spark+Apache Iceberg+PostgreSQLで作るなんちゃって日曜大工データレイク

2024/06/01に公開

本に感化されまして

https://www.oreilly.co.jp/books/9784814400676/

最近上記の本を買いまして、感化されました。しかも私は誤字脱字を2か所見つけてしまいオライリーに投書までしてしまいました。ということでこの中で Amazon AWS S3と Apache Spark と Apache Iceberg を使ってデータレイクが比較的簡単にできると書いてあったので、じゃぁMinIO で実現できれば誰でも簡単に自分のマシン上でもデータレイクが構築できるんじゃないかと思いやってみました。

材料

材料はDockerがあればできます。Dockerで構築するのは次の3本柱です。

  • MinIO
  • Java17実行環境(Sparkが乗ります)
  • PostgreSQL

作成

日曜大工とと言いながら私は土日をこれに費やしてしまうくらいは試行錯誤が発生していますが、そこは飛ばしてエッセンスだけ抽出してお知らせします。

Docker Composeファイルの作成(MinIO編)

まずはMinIO 用の Docker Composeのファイルを書きます。

compose.yml
services:
  s3:
    container_name: s3
    hostname: s3
    image: minio/minio:RELEASE.2024-05-10T01-41-38Z
    environment:
      - MINIO_ROOT_USER=${MINIO_ROOT_USER}
      - MINIO_ROOT_PASSWORD=${MINIO_ROOT_PASSWORD}
      - TZ=Asia/Tokyo
    volumes:
      - minio-volume:/data/
    tty: true
    ports:
      - 9000:9000
      - 9001:9001
    command: ["server", "/data", "--console-address", ":9001"]
    restart: always

volumes:
  minio-volume:

もうおわかりだと思いますが、MinIOのパスワードとかのアカウント情報は.envファイルに記載します。

.env
MINIO_ROOT_USER=admin
MINIO_ROOT_PASSWORD=password

なぜなら、gitで上げるときにパスワードがついたままで上げるのはセキュリティー上よろしくないからです。
ですのでgitに上げるときにはこれらのディレクトリ上に以下を書きます。

.gitignore
.env

では、立ち上げましょう。

$ docker compose up -d

Docker Composeファイルの作成(Apache Spark編)

まずは基本的なDocker Composeファイルを書きます。

compose.yml
services:
  spark:
    container_name: spark
    hostname: spark
    build: ./spark
    networks:
      - spark-iceberg-minio-network
    ports:
      - 8083:8080
      - 4040:4040
    tty: true
    volumes:
      - ./conf/spark-defaults.conf:/opt/spark-3.5.1-bin-hadoop3/conf/spark-defaults.conf:ro
      - ./data/:/home/spark/data/:ro
      - ./py/:/home/spark/py/:rw
      - ./script/:/script/:rw
    environment:
      - AWS_ACCESS_KEY_ID=${MINIO_ACCESS_KEY_ID}
      - AWS_SECRET_ACCESS_KEY=${MINIO_SECRET_ACCESS_KEY}
      - AWS_S3_ENDPOINT=minio:9000
      - AWS_REGION=us-east-1
      - TZ=Asia/Tokyo
    restart: always
    depends_on:
      - db

  db:
    image: postgres:latest
    container_name: db
    hostname: db
    networks:
      - spark-iceberg-minio-network
    volumes:
      - db-store:/var/lib/postgresql/data
      - ./psql/:/psql/
    tty: true
    restart: always
    environment:
      - TZ=Asia/Tokyo
      - POSTGRES_USER=iceberg
      - POSTGRES_PASSWORD=icebergpassword
      - POSTGRES_DB=iceberg


networks:
  spark-iceberg-minio-network:

volumes:
  db-store:

Apache Sparkのイメージを取ってきても上手く動かなかったので今回は自分でビルドしました。
(本当は動くかもしれないけどこれも修行なので・・・。)

Dockerfile(ApacheSpark用)

Apache SparkはどうやらJava17でないと動かないみたいなのでOpenJDKのバージョン17を選んで動かします。
では最初にDockerfile全体をご覧に入れましょう。

Dockerfile
FROM openjdk:17-slim-bullseye

USER root

RUN addgroup --gid 1000 spark && adduser --uid 1000 --gid 1000 spark
RUN mkdir -p /home/spark/spark-events
RUN chown -R spark:spark /home/spark

RUN apt-get update && apt-get -y upgrade && apt-get -y install vim less wget procps iputils-ping curl python3 python3-pip tini

USER spark
RUN curl https://dl.min.io/client/mc/release/linux-amd64/mc  --create-dirs -o /home/spark/minio-binaries/mc && chmod +x /home/spark/minio-binaries/mc
ENV PATH=$PATH:/home/spark/minio-binaries/
USER root

RUN mkdir -p /opt/ && \
    cd /opt/ && \
    wget https://dlcdn.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz && \
    tar xzvf spark-3.5.1-bin-hadoop3.tgz && \
    rm spark-3.5.1-bin-hadoop3.tgz && \
    wget https://dlcdn.apache.org/maven/maven-3/3.9.6/binaries/apache-maven-3.9.6-bin.tar.gz && \
    tar xzvf apache-maven-3.9.6-bin.tar.gz && \
    rm apache-maven-3.9.6-bin.tar.gz

ENV PATH=$PATH:/opt/apache-maven-3.9.6/bin:/opt/spark-3.5.1-bin-hadoop3/bin

RUN mvn dependency:get -Dartifact=org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2 && \
    mvn dependency:get -Dartifact=org.apache.iceberg:iceberg-core:1.5.2 && \
    mvn dependency:get -Dartifact=org.apache.iceberg:iceberg-aws:1.5.2 && \
    mvn dependency:get -Dartifact=org.apache.iceberg:iceberg-aws-bundle:1.5.2 && \
    mvn dependency:get -Dartifact=org.postgresql:postgresql:42.7.3

RUN find /root/.m2/ -name \*.jar -exec mv {} /opt/spark-3.5.1-bin-hadoop3/jars/ \; && rm -rf /root/.m2/

ENV SPARK_HOME=/opt/spark-3.5.1-bin-hadoop3/
USER spark
WORKDIR /home/spark/

CMD ["bash"]

カスタマイズのポイント①:sparkユーザを作る

基本的にDockerfileの中ではrootで構築しますが、ユーザは自分の今のUserIDとGroupIDと同じIDのユーザsparkを作っておくと後々楽になると思います。でホームディレクトリをつくりその上に~/spark-eventsというディレクトリを作って置いてください。

カスタマイズのポイント②:足りないDebパッケージのインストール

基本のapt-get update && apt-get -y upgradeは良いとして、Pythonとwgetは必要になってくるのでこれだけ入れておいてください。それ以外のパッケージはあると便利ということです。

カスタマイズのポイント③:mcコマンドのインストール

これは好みですが、私はDockerコンテナ内でmcコマンドを使えれば良いなと思い入れました。
このときは一般ユーザsparkでインストールします。

カスタマイズのポイント④:mavenとsparkの導入

これはwgetで取ってくればいいので簡単です。
言っておきますが、sparkは普通のでも良いかもしれませんが、今回はspark-3.5.1-bin-hadoop3.tgzを落としてきました。その後mavenのパッケージが必要になってくるので一緒に落として展開しておきます。

カスタマイズのポイント⑤:足りないJARファイルのダウンロードと配置

じつはSpark単体では今回の「Apache Icebergを使ったデータの格納」はできませんので、以下のパッケージをmavenで落としてきました。

  • iceberg-spark-runtime
  • iceberg-core
  • iceberg-aws
  • iceberg-aws-bundle

それからIcebergのメタデータ管理にPostgreSQLを使いますのでそのJARファイルも落としておきます。
どこにそれらは格納されているかというと、/root/.m2/ディレクトリ以下ですので、そこからsparkのjar置き場になっている/opt/spark-3.5.1-bin-hadoop3/jars/ディレクトリにJARファイル一切合切だけを移動させておきます。

最後にユーザをsparkに変更してDockerfileの構築は終了です。
これらを一旦起動させます。

$ docker compose up -d --build

MinIOとの接続

sparkとMinIOを接続するためにMinIOへ入ってアクセスキーとシークレットキーを作成してダウンロードします。
MinIOにはWebGUIがついていますのでそれを使います。
http://{MinIOのアドレス}:9001/に接続するとログイン画面が出ます。

先ほどMinIOの.envファイルで指定したアカウントで入ります。

左ペインの中から「Access Keys」を押すとアクセスキーの一覧が出ますのでそこで新規作成(右上の「Create access key」)を押します。

適当に名前も入れても入れなくても良いです、「Create」を押してできあがった対のキーを落としておきます。絶対に落としてください。一回しかこの情報は出てきませんのでコピペだけで済ませないでください。

このアクセスキーとシークレットキーをsparkのDocker Composeファイルに適応させます。こんな感じに。

.env
MINIO_ACCESS_KEY_ID=HeWHScaUjAQzlG12oEml
MINIO_SECRET_ACCESS_KEY=UaXOGuouNXMpkF9Ov5BxQK9uM3aqNKTJJ2IFmZ6k

そして、一旦sparkのDockerを落として再起動させます。

$ docker compose down
$ docker compose up -d

そうするとMinIOとSparkが関連付けられました、次にsparkからIcebergを使ったデータ格納を実現するためのspark-defaults.confを編集しましょう。

spark-defaults.conf
spark.sql.extensions                          org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.defaultCatalog                      catalog01
spark.sql.catalog.catalog01                   org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.catalog01.type              jdbc
spark.sql.catalog.catalog01.uri               jdbc:postgresql://db:5432/iceberg
spark.sql.catalog.catalog01.jdbc.useSSL       false
spark.sql.catalog.catalog01.jdbc.user         iceberg
spark.sql.catalog.catalog01.jdbc.password     icebergpassword
spark.sql.catalog.catalog01.io-impl           org.apache.iceberg.aws.s3.S3FileIO
spark.sql.catalog.catalog01.warehouse         s3://01/default
spark.sql.catalog.catalog01.s3.endpoint       http://192.168.1.30:9000/warehouse-data/
spark.sql.catalog.spark_catalog               org.apache.iceberg.spark.SparkSessionCatalog
spark.eventLog.enabled                        true
spark.eventLog.dir                            /home/spark/spark-events
spark.history.fs.logDirectory                 /home/spark/spark-events
spark.sql.catalogImplementation               in-memory

上から私なりに解釈した設定を説明していきます。

  • 1行目はこのパッケージを使いますという宣言だと思います。
  • 2行目はカタログ名です、それ以降の左辺に出てきます。
  • 3行目はよくわかりませんがこのパッケージを使いますという宣言だと思います。
  • 4行目はカタログを格納する方式ですが今回はJDBCを使います
  • 5行目はそのJDBCを使うときの接続先です。今回はPostgreSQLであることとdbというDockerコンテナを使い、データベース名はicebergと言うのを使うということです。言うのを忘れていましたが、何も指定しなければDocker Composeファイルで指定しているpostgresのユーザ名がそのままデータベース名になるそうです。
  • 6行目はSSLを使うかだと思いますが今回は使いません。
  • 7行目8行目のアカウント情報はcompose.ymlに記載されているとおりです。
  • 9行目はカタログのIOインプリメンテーションはなにかと聞かれているのでS3と言っています。
  • 10行目はカタログのwarehouse(倉庫ファイル)はどこかと聞かれているのでs3の01/defaultというディレクトリであると言っています。
  • 11行名はそのs3のエンドポイントはどこだと問われているので先ほどのMinIOのアドレスとバケットを決めて置く必要があります。バケットの作り方は後述します。
  • 12行目も何かのパッケージ宣言だと思います
  • 13行目はログを取るかということなのtrueにしておきます。
  • 14行目15行目はログのディレクトリをさしています。
  • 最後の行はこの実行をインメモリでおこなうと言っているに違いないと思います。

このファイルをsparkの/opt/spark-3.5.1-bin-hadoop3/conf/spark-defaults.confになるようにマウントします。

これで実験ができます。

MinIOのバケットの作り方

バケットの作り方は次の通りです。
メイン画面の左ペインから「Backets」を選択します。
もう既にバケット作ってありますが、dataというバケットを作ってみましょう。

右上の「Create Backet」を押します。


「Backet Name」は半角英数字でとハイフンが使えますが最後がハイフンで終わることはできないようです。名前を決めたあと「Create Backet」ボタンを押せばバケットができます。

データを入れて見る。

データはダミーデータを使います。

https://testdata.userlocal.jp/

このデータをダウンロードしてみます。そしたらヘッダを見てそれとない英語名に変更してデータの種類を決めます。おそらく年齢以外はString、年齢はIntegerになります。

ヘッダを入れる

まずはデータを入れる入れ物を作成します。今回はPythonを使うのでPythonコードで書きます。
例は以下の通りです。

create_table.py
from  pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import IntegerType, DoubleType, FloatType, LongType, StructType,StructField, StringType
spark = SparkSession.builder.getOrCreate()
schema = StructType([
  StructField("name", StringType(), True),
  StructField("name_kana", StringType(), True),
  StructField("age", IntegerType(), True),
  StructField("birth_date", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("email", StringType(), True),
  StructField("phone", StringType(), True),
  StructField("zip_code", StringType(), True),
  StructField("address", StringType(), True)
])

df = spark.createDataFrame([], schema)
df.writeTo("catalog01.address.list_20240530_001").create()

これの解説は省略します、知りたい人は調べてみてください。このあたりの文献はゴロゴロ出てきます。
では実行して見ます。実行にはsparkコンテナ内にあるspark-submitを使います。

$ docker compose exec -it spark spark-submit /home/spark/py/create_table.py

メッセージが大量に出るのでここではメッセージについては割愛します。
では、MinIOとPostgreSQLにきちんと入っているか確認します。

MinIOのGUIで「Object Browser」を選択して先ほどのwarehouse-dataをクリックして最下層まで行きます。
warehouse-data/01/default/address/list_20240530_001/metadataにそれらしきデータが入っているのが確認できました。

では、データを入れていきましょう。

データの投入

データを投入していきます。今回はこんなPythonコードでいきます。

insert.py
import csv
from  pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import DoubleType, FloatType, LongType, StructType,StructField, StringType
spark = SparkSession.builder.getOrCreate()

schema = spark.table("catalog01.address.list_20240530_001").schema

data = []
with open('/home/spark/data/dummy20240529.csv') as f:
    reader = csv.reader(f)
    for i,row in enumerate(reader):
        if i != 0:
            data.append([row[0],row[1],int(row[2]),row[3],row[4],row[5],row[6],row[7],row[8]])
df = spark.createDataFrame(data, schema)
df.writeTo("catalog01.address.list_20240530_001").append()

ここのあたりの書き方も詳しく書いているところが多いので調べてみてください。
ではこれを実行して見ます。

$ docker compose exec -it spark spark-submit /home/spark/py/insert.py

では、本当に入っているかPythonプログラムとMinIOとの両方から見てみましょう。
まずはPythonプログラムで。

view.py
from  pyspark.sql import SparkSession, DataFrame, functions
from pyspark.sql.types import DoubleType, FloatType, LongType, StructType,StructField, StringType
spark = SparkSession.builder.getOrCreate()
df = spark.table("catalog01.address.list_20240530_001").show()

実行します。

$ docker compose exec -it spark spark-submit /home/spark/py/show.py
-略-
+-----------+-------------------+---+--------------+------------+--------------------+-------------+--------+-----------------------------------+
|       name|          name_kana|age|    birth_date|      gender|               email|        phone|zip_code|                            address|
+-----------+-------------------+---+--------------+------------+--------------------+-------------+--------+-----------------------------------+
|  恩田 美恵|        おんだ みえ| 52|1971年06月29日||mieonda@example.n...|050-3681-7261|447-9200|     愛知県半田市港町3-3-104エン...|
-略-

良さそうですね。ではMinIOの方も見てみましょう。
warehouse-data/01/default/address/list_20240530_001/dataを見てみると。

格納されているみたいですね、今回はparquet(パーケイと呼ぶらしい)ファイルに分割されて入った模様です。

おわりに

これでデータレイクの初期設定ができました。これを駆使して皆さん遊んでみてください。
では。

Discussion