👋

Apache Airflowを活用したブロックチェーンデータの自動収集・保存

2025/02/27に公開

1. Apache Airflowとは?

Apache Airflow は、データパイプラインを自動化し、スケジュール実行やエラー処理を簡単に行うための ワークフロー管理ツール です。Pythonで定義されたワークフロー(DAG)を使い、複数のタスクを順番に実行できます。

1.1 なぜAirflowを使うのか?

  • スケジューリングが容易 → 指定した間隔でブロックチェーンデータを取得可能
  • エラー処理が自動化 → 失敗時のリトライやログ監視ができる
  • タスクの依存関係を管理 → ブロックの取得、DB保存、データ解析を順番に実行
  • Web UIでの可視化 → 実行状況を直感的に確認できる

2. Apache Airflowのセットアップ

2.1 必要なパッケージのインストール

まず、Airflowをインストールします。

pip install apache-airflow[postgres]

Airflowは 環境変数 AIRFLOW_HOME を設定することで、ワークフローの保存先を指定できます。

export AIRFLOW_HOME=~/airflow

Airflowの初期化:

airflow db init

管理者ユーザーを作成:

airflow users create \
    --username admin \
    --firstname First \
    --lastname Last \
    --role Admin \
    --email admin@example.com

AirflowのWebサーバーを起動:

airflow webserver --port 8080

スケジューラーの起動:

airflow scheduler

Web UIにアクセス(http://localhost:8080


3. Airflow DAGの作成(Infuraデータ収集用)

Airflowで Ethereumのブロックデータを定期的に取得し、PostgreSQLに保存する ワークフロー(DAG)を作成します。

3.1 DAGの基本構造

DAGは、複数のタスクをスケジュール実行するPythonファイルで定義します。

AirflowのDAGファイルを ~/airflow/dags/infura_data_pipeline.py に作成。

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from web3 import Web3
import psycopg2
import os

# Infuraの設定
INFURA_URL = "https://mainnet.infura.io/v3/YOUR_INFURA_PROJECT_ID"
w3 = Web3(Web3.HTTPProvider(INFURA_URL))

# PostgreSQLの接続設定
DB_PARAMS = {
    "dbname": "blockchain_data",
    "user": "your_username",
    "password": "your_password",
    "host": "localhost",
    "port": "5432"
}

# Airflowのデフォルト設定
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'infura_data_pipeline',
    default_args=default_args,
    description='Fetch Ethereum block data and store in PostgreSQL',
    schedule_interval=timedelta(minutes=10),  # 10分ごとに実行
    catchup=False,
)

# 最新ブロックを取得する関数
def fetch_latest_block():
    latest_block_number = w3.eth.block_number
    latest_block = w3.eth.get_block(latest_block_number, full_transactions=True)
    
    conn = psycopg2.connect(**DB_PARAMS)
    cursor = conn.cursor()
    
    cursor.execute(
        "INSERT INTO blocks (block_number, timestamp, miner, transactions_count) VALUES (%s, to_timestamp(%s), %s, %s) ON CONFLICT (block_number) DO NOTHING",
        (latest_block.number, latest_block.timestamp, latest_block.miner, len(latest_block.transactions))
    )
    
    for tx in latest_block.transactions:
        cursor.execute(
            "INSERT INTO transactions (tx_hash, block_number, from_address, to_address, value, gas_price, gas_used) VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT (tx_hash) DO NOTHING",
            (tx.hash.hex(), tx.blockNumber, tx['from'], tx['to'], w3.from_wei(tx.value, 'ether'), tx.gasPrice, tx.gas)
        )
    
    conn.commit()
    cursor.close()
    conn.close()
    print(f"Block {latest_block_number} and {len(latest_block.transactions)} transactions stored successfully.")

# Airflowタスクの設定
fetch_block_task = PythonOperator(
    task_id='fetch_latest_block',
    python_callable=fetch_latest_block,
    dag=dag,
)

fetch_block_task

4. AirflowでDAGを実行する

DAGファイルを保存した後、AirflowのWeb UI(http://localhost:8080)にアクセスし、DAGを有効化すると、10分ごとにEthereumの最新ブロックを取得し、データベースに保存できます。

手動で実行したい場合は、以下のコマンドを使います。

airflow dags trigger infura_data_pipeline

Airflowのログを確認するには、

airflow tasks logs -d infura_data_pipeline fetch_latest_block

5. まとめ

  • Apache Airflowを使うことで、Ethereumのブロックデータ収集をスケジュール実行できる。
  • DAG(ワークフロー)を作成し、データ取得→保存の流れを自動化できる。
  • エラー発生時には自動リトライやログ監視を活用し、運用コストを削減できる。
  • AirflowのWeb UIで直感的にワークフローの管理が可能。

この仕組みを活用すれば、定期的なデータ収集を自動化し、リアルタイム分析や機械学習への活用がしやすくなります!

Discussion