🔥

データパイプラインを実装したい?それならprefectに入門あるのみ!

に公開

今回は、prefectを紹介します!prefectを利用するとPythonでデータのパイプラインを製品レベルで実装することができます!

prefectとは?

公式ドキュメントの説明によると、

Prefectは、Python関数を最小限の摩擦で本番環境レベルのデータパイプラインに変換するオープンソースのオーケストレーションエンジンです。DSLや複雑な設定ファイルを必要とせず、純粋なPythonでワークフローを構築・スケジュールし、Pythonが実行できる環境であればどこでも実行できます。Prefectは、自動状態追跡、障害処理、リアルタイム監視など、面倒な処理をすぐに実行できます。

とのことです。prefectを利用することで、ハイレベルなパイプラインを構築でき、スケジューリングをはじめとした多くの起動方法を提供してくれます。また、クラウド上のUIを利用することで、実行結果の確認であったり監視を行うこともできます。

なお、重要な特徴として以下が挙げられています。

  • Pythonic:
    • ワークフローはネイティブPythonで記述でき、DSLやYAMLなど特殊な構文は不要
    • 型ヒントやasync/await、最新のPythonパターンをフルサポートしており、既存のIDE、デバッガー、テストツールを使用可能
  • 状態と回復: 成功、失敗、再試行の状態を追跡する堅牢な状態管理。中断された実行を最後の成功した時点から再開し、高負荷の計算をキャッシュすることで不要なやり直しを回避
  • 柔軟でポータブルな実行:
    • フローをローカルで開始して開発を容易にし、単一プロセスからコンテナ、Kubernetes、クラウドサービスまで、ベンダーロックインなしでどこにでもデプロイ可能
    • インフラストラクチャは構成だけでなくコードによって定義されるため、環境の拡張や変更が容易である
  • イベントドリブン:
    • スケジュールや外部イベント、またはAPI経由でフローをトリガー可能
    • フローを一時停止して、人間の介入や承認を求めることが可能
    • 状態、条件、または任意のカスタムロジックに基づいてフローを連結できる
  • 動的ランタイム:
    • 実際のデータまたは条件に基づいて、実行時にタスクを動的に作成
    • 実行中に新しいタスクやブランチを簡単に生成できるため、真のデータドリブンワークフローを実現
  • モダンなUI:
    • 直感的なインターフェースで、フロー実行のリアルタイム監視、ログ記録、状態追跡が可能
    • フローを実行してUIを開くだけで、依存関係グラフとDAGを自動的に表示
  • CI/CDファースト:
    • 通常のPythonコードのようにフローをテストおよびシミュレートし、開発中に迅速なフィードバックを提供
    • 既存のCI/CDパイプラインにシームレスに統合し、テストとデプロイメントを自動化

https://docs.prefect.io/v3/get-started

早速使ってみる

それでは早速使ってみます!今回は公式が提供しているQuickstartを通して利用してみます。

https://docs.prefect.io/v3/get-started/quickstart

prefectへの登録とログイン

まずはprefectのアカウント作成とログインを行います。uvを通して以下のコマンドを実行すると自動的にブラウザが開かれ、アカウント作成およびログイン処理につながりますので、対応してください。

curl -LsSf https://astral.sh/uv/install.sh | sh # uvのインストール
uvx prefect-cloud login # prefect-cloudへログイン

顧客IDを扱うシンプルなパイプラインの実装

それではドキュメントに載っているサンプルコードを見てみます。このサンプルコードでは顧客IDを生成し、そのIDを処理するためのフローが実装されています。

process_customer_id.py
from prefect import flow, task
import random

@task
def get_customer_ids() -> list[str]:
    # Fetch customer IDs from a database or API
    return [f"customer{n}" for n in random.choices(range(100), k=10)]

@task
def process_customer(customer_id: str) -> str:
    # Process a single customer
    return f"Processed {customer_id}"

@flow
def main() -> list[str]:
    customer_ids = get_customer_ids()
    # Map the process_customer task across all customer IDs
    results = process_customer.map(customer_ids)
    return results


if __name__ == "__main__":
    main()

実装を見ると、@task@flowのデコレータを用いて実装されていることがわかります。それぞれのデコレータについては以下のように使い分けます。

  • @task: パイプラインで利用される個々のコンポーネントの実装
  • @flow: スクリプトのエントリーポイントとなる

つまり、@taskで実装されたコンポーネントを@flowが組み合わせながら実行すると言うことになります。

今回の実装を読み解くと、以下のようなパイプラインが構築されています。

  1. get_customer_idsを実行し、ランダムなIDを持つ顧客IDを10こ生成する
  2. get_customer_idsの結果それぞれに対してprocess_customerを適用し、文字列を生成する
  3. process_customerの結果をリストとして返す

コードを実行してみる!

それでは先ほど実装したコードを早速実行してみましょう。実行すると、以下のようなログが表示されます。

uv run process_customer_id.py

# 結果
12:32:32.376 | INFO    | Flow run 'vivacious-swan' - Beginning flow run 'vivacious-swan' for flow 'main'
12:32:32.379 | INFO    | Flow run 'vivacious-swan' - View at https://...
12:32:32.426 | INFO    | Task run 'get_customer_ids-d19' - Finished in state Completed()
12:32:32.597 | INFO    | Task run 'process_customer-10d' - Finished in state Completed()
12:32:32.599 | INFO    | Task run 'process_customer-dd7' - Finished in state Completed()
12:32:32.599 | INFO    | Task run 'process_customer-cf9' - Finished in state Completed()
12:32:32.600 | INFO    | Task run 'process_customer-f59' - Finished in state Completed()
12:32:32.601 | INFO    | Task run 'process_customer-ba2' - Finished in state Completed()
12:32:32.602 | INFO    | Task run 'process_customer-54c' - Finished in state Completed()
12:32:32.602 | INFO    | Task run 'process_customer-e25' - Finished in state Completed()
12:32:32.602 | INFO    | Task run 'process_customer-417' - Finished in state Completed()
12:32:32.602 | INFO    | Task run 'process_customer-4f7' - Finished in state Completed()
12:32:32.603 | INFO    | Task run 'process_customer-4fe' - Finished in state Completed()
12:32:33.063 | INFO    | Flow run 'vivacious-swan' - Finished in state Completed('All states completed.')
12:32:33.246 | WARNING | EventsWorker - Still processing items: 32 items remaining...

ログを見ると、vivacious-swanという名前でフローが作成された後に、@flowで実装したフローが実行されていますね。フローの名前についてはデコレータで就職している関数名であるmainが指定されています。

prefect cloud上で確認してみる

最初にprefectにログインしているので、先ほど実行した結果はprefect cloud上に展開されています。早速アクセスしてみましょう。

サイドバーでFlowsを選択すると、以下のように先ほど実行したフローmainが一覧に追加されます。


フロー一覧画面

mainフローを選択すると、個々のフロー実行単位であるRunsが表示されます。今回はvivacious-swanという名前で実行されています。


フロー実行単位画面

vivacious-swanを選択すると、個々のタスク実行結果が表示されます。最初の画面では、左側に@taskで実装されたコンポーネントの実行順が、右側には実行ログが表示されます。


実行ログ画面

また、ログの上にGraphとあるので選択すると、DAGが表示されどのように処理が進んでいるかをグラフで表示できます。グラフを見ても、get_customer_idsの結果をprocess_customerに渡すという流れがわかります。


実行グラフ画面

まとめ

今回はprefectのQuickstartを通して、prefectの最もシンプルな使い方を調べてみました。公式ドキュメントでは次にスケジューリング実装であったりGitHub等からのデプロイなどもあったりするので、次回はそちらをご紹介しようと思います。

Discussion