MinIO+Apache Spark+Apache Iceberg+PostgreSQLで作るなんちゃって日曜大工データレイク
本に感化されまして
最近上記の本を買いまして、感化されました。しかも私は誤字脱字を2か所見つけてしまいオライリーに投書までしてしまいました。ということでこの中で Amazon AWS S3と Apache Spark と Apache Iceberg を使ってデータレイクが比較的簡単にできると書いてあったので、じゃぁMinIO で実現できれば誰でも簡単に自分のマシン上でもデータレイクが構築できるんじゃないかと思いやってみました。
材料
材料はDockerがあればできます。Dockerで構築するのは次の3本柱です。
- MinIO
- Java17実行環境(Sparkが乗ります)
- PostgreSQL
作成
日曜大工とと言いながら私は土日をこれに費やしてしまうくらいは試行錯誤が発生していますが、そこは飛ばしてエッセンスだけ抽出してお知らせします。
Docker Composeファイルの作成(MinIO編)
まずはMinIO 用の Docker Composeのファイルを書きます。
services:
s3:
container_name: s3
hostname: s3
image: minio/minio:RELEASE.2024-05-10T01-41-38Z
environment:
- MINIO_ROOT_USER=${MINIO_ROOT_USER}
- MINIO_ROOT_PASSWORD=${MINIO_ROOT_PASSWORD}
- TZ=Asia/Tokyo
volumes:
- minio-volume:/data/
tty: true
ports:
- 9000:9000
- 9001:9001
command: ["server", "/data", "--console-address", ":9001"]
restart: always
volumes:
minio-volume:
もうおわかりだと思いますが、MinIOのパスワードとかのアカウント情報は.env
ファイルに記載します。
MINIO_ROOT_USER=admin
MINIO_ROOT_PASSWORD=password
なぜなら、gitで上げるときにパスワードがついたままで上げるのはセキュリティー上よろしくないからです。
ですのでgitに上げるときにはこれらのディレクトリ上に以下を書きます。
.env
では、立ち上げましょう。
$ docker compose up -d
Docker Composeファイルの作成(Apache Spark編)
まずは基本的なDocker Composeファイルを書きます。
services:
spark:
container_name: spark
hostname: spark
build: ./spark
networks:
- spark-iceberg-minio-network
ports:
- 8083:8080
- 4040:4040
tty: true
volumes:
- ./conf/spark-defaults.conf:/opt/spark-3.5.1-bin-hadoop3/conf/spark-defaults.conf:ro
- ./data/:/home/spark/data/:ro
- ./py/:/home/spark/py/:rw
- ./script/:/script/:rw
environment:
- AWS_ACCESS_KEY_ID=${MINIO_ACCESS_KEY_ID}
- AWS_SECRET_ACCESS_KEY=${MINIO_SECRET_ACCESS_KEY}
- AWS_S3_ENDPOINT=minio:9000
- AWS_REGION=us-east-1
- TZ=Asia/Tokyo
restart: always
depends_on:
- db
db:
image: postgres:latest
container_name: db
hostname: db
networks:
- spark-iceberg-minio-network
volumes:
- db-store:/var/lib/postgresql/data
- ./psql/:/psql/
tty: true
restart: always
environment:
- TZ=Asia/Tokyo
- POSTGRES_USER=iceberg
- POSTGRES_PASSWORD=icebergpassword
- POSTGRES_DB=iceberg
networks:
spark-iceberg-minio-network:
volumes:
db-store:
Apache Sparkのイメージを取ってきても上手く動かなかったので今回は自分でビルドしました。
(本当は動くかもしれないけどこれも修行なので・・・。)
Dockerfile(ApacheSpark用)
Apache SparkはどうやらJava17でないと動かないみたいなのでOpenJDKのバージョン17を選んで動かします。
では最初にDockerfile全体をご覧に入れましょう。
FROM openjdk:17-slim-bullseye
USER root
RUN addgroup --gid 1000 spark && adduser --uid 1000 --gid 1000 spark
RUN mkdir -p /home/spark/spark-events
RUN chown -R spark:spark /home/spark
RUN apt-get update && apt-get -y upgrade && apt-get -y install vim less wget procps iputils-ping curl python3 python3-pip tini
USER spark
RUN curl https://dl.min.io/client/mc/release/linux-amd64/mc --create-dirs -o /home/spark/minio-binaries/mc && chmod +x /home/spark/minio-binaries/mc
ENV PATH=$PATH:/home/spark/minio-binaries/
USER root
RUN mkdir -p /opt/ && \
cd /opt/ && \
wget https://dlcdn.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz && \
tar xzvf spark-3.5.1-bin-hadoop3.tgz && \
rm spark-3.5.1-bin-hadoop3.tgz && \
wget https://dlcdn.apache.org/maven/maven-3/3.9.6/binaries/apache-maven-3.9.6-bin.tar.gz && \
tar xzvf apache-maven-3.9.6-bin.tar.gz && \
rm apache-maven-3.9.6-bin.tar.gz
ENV PATH=$PATH:/opt/apache-maven-3.9.6/bin:/opt/spark-3.5.1-bin-hadoop3/bin
RUN mvn dependency:get -Dartifact=org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2 && \
mvn dependency:get -Dartifact=org.apache.iceberg:iceberg-core:1.5.2 && \
mvn dependency:get -Dartifact=org.apache.iceberg:iceberg-aws:1.5.2 && \
mvn dependency:get -Dartifact=org.apache.iceberg:iceberg-aws-bundle:1.5.2 && \
mvn dependency:get -Dartifact=org.postgresql:postgresql:42.7.3
RUN find /root/.m2/ -name \*.jar -exec mv {} /opt/spark-3.5.1-bin-hadoop3/jars/ \; && rm -rf /root/.m2/
ENV SPARK_HOME=/opt/spark-3.5.1-bin-hadoop3/
USER spark
WORKDIR /home/spark/
CMD ["bash"]
カスタマイズのポイント①:sparkユーザを作る
基本的にDockerfileの中ではrootで構築しますが、ユーザは自分の今のUserIDとGroupIDと同じIDのユーザspark
を作っておくと後々楽になると思います。でホームディレクトリをつくりその上に~/spark-events
というディレクトリを作って置いてください。
カスタマイズのポイント②:足りないDebパッケージのインストール
基本のapt-get update && apt-get -y upgrade
は良いとして、Pythonとwgetは必要になってくるのでこれだけ入れておいてください。それ以外のパッケージはあると便利ということです。
カスタマイズのポイント③:mcコマンドのインストール
これは好みですが、私はDockerコンテナ内でmcコマンドを使えれば良いなと思い入れました。
このときは一般ユーザspark
でインストールします。
カスタマイズのポイント④:mavenとsparkの導入
これはwgetで取ってくればいいので簡単です。
言っておきますが、sparkは普通のでも良いかもしれませんが、今回はspark-3.5.1-bin-hadoop3.tgz
を落としてきました。その後mavenのパッケージが必要になってくるので一緒に落として展開しておきます。
カスタマイズのポイント⑤:足りないJARファイルのダウンロードと配置
じつはSpark単体では今回の「Apache Icebergを使ったデータの格納」はできませんので、以下のパッケージをmavenで落としてきました。
- iceberg-spark-runtime
- iceberg-core
- iceberg-aws
- iceberg-aws-bundle
それからIcebergのメタデータ管理にPostgreSQLを使いますのでそのJARファイルも落としておきます。
どこにそれらは格納されているかというと、/root/.m2/
ディレクトリ以下ですので、そこからsparkのjar置き場になっている/opt/spark-3.5.1-bin-hadoop3/jars/
ディレクトリにJARファイル一切合切だけを移動させておきます。
最後にユーザをspark
に変更してDockerfileの構築は終了です。
これらを一旦起動させます。
$ docker compose up -d --build
MinIOとの接続
sparkとMinIOを接続するためにMinIOへ入ってアクセスキーとシークレットキーを作成してダウンロードします。
MinIOにはWebGUIがついていますのでそれを使います。
http://{MinIOのアドレス}:9001/
に接続するとログイン画面が出ます。
先ほどMinIOの.env
ファイルで指定したアカウントで入ります。
左ペインの中から「Access Keys」を押すとアクセスキーの一覧が出ますのでそこで新規作成(右上の「Create access key」)を押します。
適当に名前も入れても入れなくても良いです、「Create」を押してできあがった対のキーを落としておきます。絶対に落としてください。一回しかこの情報は出てきませんのでコピペだけで済ませないでください。
このアクセスキーとシークレットキーをspark
のDocker Composeファイルに適応させます。こんな感じに。
MINIO_ACCESS_KEY_ID=HeWHScaUjAQzlG12oEml
MINIO_SECRET_ACCESS_KEY=UaXOGuouNXMpkF9Ov5BxQK9uM3aqNKTJJ2IFmZ6k
そして、一旦sparkのDockerを落として再起動させます。
$ docker compose down
$ docker compose up -d
そうするとMinIOとSparkが関連付けられました、次にsparkからIcebergを使ったデータ格納を実現するためのspark-defaults.conf
を編集しましょう。
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.defaultCatalog catalog01
spark.sql.catalog.catalog01 org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.catalog01.type jdbc
spark.sql.catalog.catalog01.uri jdbc:postgresql://db:5432/iceberg
spark.sql.catalog.catalog01.jdbc.useSSL false
spark.sql.catalog.catalog01.jdbc.user iceberg
spark.sql.catalog.catalog01.jdbc.password icebergpassword
spark.sql.catalog.catalog01.io-impl org.apache.iceberg.aws.s3.S3FileIO
spark.sql.catalog.catalog01.warehouse s3://01/default
spark.sql.catalog.catalog01.s3.endpoint http://192.168.1.30:9000/warehouse-data/
spark.sql.catalog.spark_catalog org.apache.iceberg.spark.SparkSessionCatalog
spark.eventLog.enabled true
spark.eventLog.dir /home/spark/spark-events
spark.history.fs.logDirectory /home/spark/spark-events
spark.sql.catalogImplementation in-memory
上から私なりに解釈した設定を説明していきます。
- 1行目はこのパッケージを使いますという宣言だと思います。
- 2行目はカタログ名です、それ以降の左辺に出てきます。
- 3行目はよくわかりませんがこのパッケージを使いますという宣言だと思います。
- 4行目はカタログを格納する方式ですが今回はJDBCを使います
- 5行目はそのJDBCを使うときの接続先です。今回はPostgreSQLであることと
db
というDockerコンテナを使い、データベース名はiceberg
と言うのを使うということです。言うのを忘れていましたが、何も指定しなければDocker Composeファイルで指定しているpostgresのユーザ名がそのままデータベース名になるそうです。 - 6行目はSSLを使うかだと思いますが今回は使いません。
- 7行目8行目のアカウント情報は
compose.yml
に記載されているとおりです。 - 9行目はカタログのIOインプリメンテーションはなにかと聞かれているのでS3と言っています。
- 10行目はカタログのwarehouse(倉庫ファイル)はどこかと聞かれているのでs3の01/defaultというディレクトリであると言っています。
- 11行名はそのs3のエンドポイントはどこだと問われているので先ほどのMinIOのアドレスとバケットを決めて置く必要があります。バケットの作り方は後述します。
- 12行目も何かのパッケージ宣言だと思います
- 13行目はログを取るかということなの
true
にしておきます。 - 14行目15行目はログのディレクトリをさしています。
- 最後の行はこの実行をインメモリでおこなうと言っているに違いないと思います。
このファイルをsparkの/opt/spark-3.5.1-bin-hadoop3/conf/spark-defaults.conf
になるようにマウントします。
これで実験ができます。
MinIOのバケットの作り方
バケットの作り方は次の通りです。
メイン画面の左ペインから「Backets」を選択します。
もう既にバケット作ってありますが、data
というバケットを作ってみましょう。
右上の「Create Backet」を押します。
「Backet Name」は半角英数字でとハイフンが使えますが最後がハイフンで終わることはできないようです。名前を決めたあと「Create Backet」ボタンを押せばバケットができます。
データを入れて見る。
データはダミーデータを使います。
このデータをダウンロードしてみます。そしたらヘッダを見てそれとない英語名に変更してデータの種類を決めます。おそらく年齢以外はString、年齢はIntegerになります。
ヘッダを入れる
まずはデータを入れる入れ物を作成します。今回はPythonを使うのでPythonコードで書きます。
例は以下の通りです。
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import IntegerType, DoubleType, FloatType, LongType, StructType,StructField, StringType
spark = SparkSession.builder.getOrCreate()
schema = StructType([
StructField("name", StringType(), True),
StructField("name_kana", StringType(), True),
StructField("age", IntegerType(), True),
StructField("birth_date", StringType(), True),
StructField("gender", StringType(), True),
StructField("email", StringType(), True),
StructField("phone", StringType(), True),
StructField("zip_code", StringType(), True),
StructField("address", StringType(), True)
])
df = spark.createDataFrame([], schema)
df.writeTo("catalog01.address.list_20240530_001").create()
これの解説は省略します、知りたい人は調べてみてください。このあたりの文献はゴロゴロ出てきます。
では実行して見ます。実行にはsparkコンテナ内にあるspark-submit
を使います。
$ docker compose exec -it spark spark-submit /home/spark/py/create_table.py
メッセージが大量に出るのでここではメッセージについては割愛します。
では、MinIOとPostgreSQLにきちんと入っているか確認します。
MinIOのGUIで「Object Browser」を選択して先ほどのwarehouse-dataをクリックして最下層まで行きます。
warehouse-data/01/default/address/list_20240530_001/metadata
にそれらしきデータが入っているのが確認できました。
では、データを入れていきましょう。
データの投入
データを投入していきます。今回はこんなPythonコードでいきます。
import csv
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import DoubleType, FloatType, LongType, StructType,StructField, StringType
spark = SparkSession.builder.getOrCreate()
schema = spark.table("catalog01.address.list_20240530_001").schema
data = []
with open('/home/spark/data/dummy20240529.csv') as f:
reader = csv.reader(f)
for i,row in enumerate(reader):
if i != 0:
data.append([row[0],row[1],int(row[2]),row[3],row[4],row[5],row[6],row[7],row[8]])
df = spark.createDataFrame(data, schema)
df.writeTo("catalog01.address.list_20240530_001").append()
ここのあたりの書き方も詳しく書いているところが多いので調べてみてください。
ではこれを実行して見ます。
$ docker compose exec -it spark spark-submit /home/spark/py/insert.py
では、本当に入っているかPythonプログラムとMinIOとの両方から見てみましょう。
まずはPythonプログラムで。
from pyspark.sql import SparkSession, DataFrame, functions
from pyspark.sql.types import DoubleType, FloatType, LongType, StructType,StructField, StringType
spark = SparkSession.builder.getOrCreate()
df = spark.table("catalog01.address.list_20240530_001").show()
実行します。
$ docker compose exec -it spark spark-submit /home/spark/py/show.py
-略-
+-----------+-------------------+---+--------------+------------+--------------------+-------------+--------+-----------------------------------+
| name| name_kana|age| birth_date| gender| email| phone|zip_code| address|
+-----------+-------------------+---+--------------+------------+--------------------+-------------+--------+-----------------------------------+
| 恩田 美恵| おんだ みえ| 52|1971年06月29日| 女|mieonda@example.n...|050-3681-7261|447-9200| 愛知県半田市港町3-3-104エン...|
-略-
良さそうですね。ではMinIOの方も見てみましょう。
warehouse-data/01/default/address/list_20240530_001/data
を見てみると。
格納されているみたいですね、今回はparquet(パーケイと呼ぶらしい)ファイルに分割されて入った模様です。
おわりに
これでデータレイクの初期設定ができました。これを駆使して皆さん遊んでみてください。
では。
Discussion