📘

lakeFSシリーズ: Qucikstart入門編

に公開1

今回からlakeFS紹介シリーズも始めようと思います(一体いくつシリーズ始めるんだw)。lakeFSを利用することでデータのバージョンをGitで管理することができるようになります。今回は公式で提供されているQiuckstartを通して入門してみます。

lakeFSとは?

lakeFSは、オブジェクトストレージをGitライクなリポジトリに変換するオープンソースのツールであり、コードを管理するようにデータレイクを管理できるもののようです。lakeFSを利用することで、複雑なETLジョブからデータサイエンスやアナリティクスまで、反復可能でアトミックかつバージョン管理されたデータレイクオペレーションを構築できるとのことで、MLライフサイクルにおけるデータ周りの一連の作業を担えるツールになりそうです。似たツールとして以前にDVCを紹介していましたが、公式のブログによると、RDBを用いたりデータの発見に高度なパフォーマンスを求められるような場合はlakeFSが向いているとのことです。

https://github.com/treeverse/lakeFS
https://lakefs.io/blog/dvc-vs-git-vs-dolt-vs-lakefs/#elementor-toc__heading-anchor-15
https://zenn.dev/akasan/articles/7efdcc1c2d4141

実際に使ってみる

それでは実際に使ってみましょう。今回は以下のQuickstartを元にどのように使えるか試してみました。

https://docs.lakefs.io/latest/quickstart/index.html

環境構築

uvを利用して環境構築をします。

uv init lakefs_quickstart -p 3.12
cd lakefs_quickstart
uv add lakefs

環境を構築した後に以下のコマンドを実行すると、処理が完了したら結果に示すようなログが表示されるので、そうなったら127.0.0.1:8000にアクセスしてください。

uv run python -m lakefs.quickstart

# 結果
│
│ lakeFS running in quickstart mode.
│     Login at http://127.0.0.1:8000/
│
│     Access Key ID    : ...
│     Secret Access Key: ...

するとユーザ登録画面が表示されるのでメールアドレスを入力した後に、先ほどのアクセスキーとシークレットアクセスキーを入力すると以下の画面が表示されます。


https://docs.lakefs.io/latest/quickstart/launch/ より引用

表示されたらCreate sample repositoryを実行してください。するとレポジトリが作成されます。


https://docs.lakefs.io/latest/quickstart/launch/ より引用

データのクエリを実行する

lakeFS上でデータのクエリを実行することができます。試しにレポジトリ上のlakes.parquetを選択してください。すると以下のような画面が表示されます。


lake.parquet選択画面

最初の画面では20個のアイテムが表示されますが、試しに以下のクエリを実行してみましょう。このクエリは国ごとにグループ化してその件数を取得します。

SELECT   country, COUNT(*)
FROM     READ_PARQUET('lakefs://quickstart/main/lakes.parquet')
GROUP BY country
ORDER BY COUNT(*) 
DESC LIMIT 5;

実行すると以下のように想定していた結果になっていることが確認できます。


国ごとの集計結果

他にもDepth_mカラムの降順でソートするようなこともできます。


Depth_m降順

ブランチを作成して作業をする

次は、lakes.parquetにあるデータをデンマークのデータだけが含まれたデータに編集する過程を実行してみます。

まずはブランチを作成するための作業をします。コマンドラインでlakectlというコマンドを利用できます。まずは認証を行うために以下を実行してください。

uv run lakectl config

するとアクセスキーとシークレットアクセスキーの入力が求められるので入力してください。それが成功したら以下のコマンドを入力するとmainブランチをベースにしてdenmark-lakesブランチが作成されます。

lakectl branch create lakefs://quickstart/denmark-lakes --source lakefs://quickstart/main

この状態で先ほどのページを見ると確かに新しいブランチが作成されていることが確認できます。


denmark-lakesブランチ作成の確認

次にlakes.parquetからデンマークだけのデータを抽出してファイルを上書きます。以下のクエリを順番に実行してください。

CREATE OR REPLACE TABLE lakes AS SELECT * FROM READ_PARQUET('lakefs://quickstart/denmark-lakes/lakes.parquet'); /* lakesテーブルの作成 */
DELETE FROM lakes WHERE Country != 'Denmark'; /* lakesテーブルからデンマーク以外のデータを削除 */
COPY lakes TO 'lakefs://quickstart/denmark-lakes/lakes.parquet'; /* lakesテーブルの情報でlakes.parquetを上書きする */
DROP TABLE lakes; /* lakesテーブルの削除 */

/* lakes.parquetがデンマークの情報だけになっているか確認 */
SELECT   country, COUNT(*)
FROM     READ_PARQUET('lakefs://quickstart/denmark-lakes/lakes.parquet')
GROUP BY country
ORDER BY COUNT(*) 
DESC LIMIT 5;

ここまで実行すると、デンマークの情報だけになっていることが確認できます。なお、mainブランチは何も触っていないので、以下のクエリを実行すると問題なく元々のデータが表示されます。

SELECT   country, COUNT(*)
FROM     READ_PARQUET('lakefs://quickstart/main/lakes.parquet')
GROUP BY country
ORDER BY COUNT(*) 
DESC LIMIT 5;


デンマークのみのデータになっていることの確認

変更をmainにマージ

それでは先ほどのdenmark-lakesブランチをmainにマージしましょう。ブランチ作成と同様にlakectlを利用することで実現できます。

まずは先ほどテーブルに変更を加えたのでコミットを実行します。

uv run lakectl commit lakefs://quickstart/denmark-lakes -m "Create a dataset of just the lakes in Denmark"

UIからもコミットが作成されていることが確認できます。


テーブル変更のコミットの確認

それではマージ作業をしましょう。今回はコマンドで実行しますがPRをUIからつくることもできます(その結果を添付しておきます)。

uv run lakectl merge lakefs://quickstart/denmark-lakes lakefs://quickstart/main


テーブル変更のコミットの確認

マージした後に試しにlakes.parquetに国のカウントをさせてみましょう。予想通り、デンマーク以外のデータはなくなっています。

SELECT   country, COUNT(*)
FROM     READ_PARQUET('lakefs://quickstart/denmark-lakes/lakes.parquet')
GROUP BY country
ORDER BY COUNT(*) 
DESC LIMIT 5;


マージ後の国別カウント結果

ロールバックを実施

ここで、例えば先ほどデンマークだけのデータにしましたがこれは誤ったデータ変換だったとします。その場合一つ前の状態に戻したいことになります。実際このようなユースケースもあるでしょう。その場合、Gitと同様にrevertの機能が提供されています。今回のサンプルでは以下のように実行することで元に戻すことができます。

uv run lakectl branch revert lakefs: //quickstart/main main --parent-number 1 --yes

これを実行すると、以下のようにrevertのためのコミットが作成されます。

revert前 revert後

また国別カウントを実行すると元の結果が得られていることが確認できます。


revert後のクエリ結果

ActionsとHooksの作成

lakeFSではActionsとHooksに対応しているとのことです。今回はQuickstartの内容をそのまま試してみます。

  1. まずはブランチを作成します。
uv run lakectl branch create lakefs://quickstart/add_action --source lakefs://quickstart/main
  1. check_commit_metadata.ymlというファイル名で以下のワークフローを作成する
name: Check Commit Message and Metadata
on:
  pre-commit:
    branches:
    - etl**
hooks:
- id: check_metadata
  type: lua
  properties:
    script: |
      commit_message=action.commit.message
      if commit_message and #commit_message>0 then
          print("✅ The commit message exists and is not empty: " .. commit_message)
      else
          error("\n\n❌ A commit message must be provided")
      end

      job_name=action.commit.metadata["job_name"]
      if job_name == nil then
          error("\n❌ Commit metadata must include job_name")
      else
          print("✅ Commit metadata includes job_name: " .. job_name)
      end

      version=action.commit.metadata["version"]
      if version == nil then
          error("\n❌ Commit metadata must include version")
      else
          print("✅ Commit metadata includes version: " .. version)
          if tonumber(version) then
              print("✅ Commit metadata version is numeric")
          else
              error("\n❌ Version metadata must be numeric: " .. version)
          end
      end
  1. ワークフローをアップロードする
uv run lakectl fs upload lakefs://quickstart/add_action/_lakefs_actions/check_commit_metadata.yml --source check_commit_metadata.yml
  1. Uncomitted changesに移動し、Commit Changesでコミットする

    Uncommitted Changes

  2. Compareタブからマージする


Compareタブからのマージ

それでは今作ったワークフローを実行してみましょう。まずはetl_sampleというブランチを作ります。

uv run lakectl branch create lakefs://quickstart/etl_sample --source lakefs://quickstart/main

次にlakes.parquetに対して以下のクエリを実行します。

COPY (
    WITH src AS (
        SELECT lake_name, country, depth_m,
            RANK() OVER ( ORDER BY depth_m DESC) AS lake_rank
        FROM READ_PARQUET('lakefs://quickstart/etl_20230504/lakes.parquet'))
    SELECT * FROM SRC WHERE lake_rank <= 10
) TO 'lakefs://quickstart/etl_20230504/top10_lakes.parquet'

そして再びUncommitteed Changesをコミットします。Hooksではコミット文、job_nameの指定、versionの数値指定が義務付けられているため、それぞれが不足していると以下のようにエラーになります。

コミット文が不足 job_nameが不足 versionが不足

成功すると以下のようにジョブの結果が表示されます。


Hook結果

ローカルにファイルをコピーする

lakeFS上で管理されているデータをローカルにコピーすることができます。例えば今回のサンプルリポジトリではimagesフォルダに画像データが入っていますが、以下のコマンドを実行するとローカルのmu_local_dirにその中のデータがコピーされます。

lakectl local clone lakefs://quickstart/main/images my_local_dir

まとめ

今回はlakeFSのQuickstartを通してデータをGitで扱うための入門をしてみました、私自身lakeFSを知るまではdvcしか使ってこなかったですが、lakeFSならではの良さもありそうなのでこれから色々検証していこうと思います。

Discussion

AkasanAkasan

執筆時点で4. Uncomitted changesに移動し、Commit Changesでコミットする
の画像が表示されていませんが不具合です