👋
Apache Airflowを活用したブロックチェーンデータの自動収集・保存
1. Apache Airflowとは?
Apache Airflow は、データパイプラインを自動化し、スケジュール実行やエラー処理を簡単に行うための ワークフロー管理ツール です。Pythonで定義されたワークフロー(DAG)を使い、複数のタスクを順番に実行できます。
1.1 なぜAirflowを使うのか?
- スケジューリングが容易 → 指定した間隔でブロックチェーンデータを取得可能
- エラー処理が自動化 → 失敗時のリトライやログ監視ができる
- タスクの依存関係を管理 → ブロックの取得、DB保存、データ解析を順番に実行
- Web UIでの可視化 → 実行状況を直感的に確認できる
2. Apache Airflowのセットアップ
2.1 必要なパッケージのインストール
まず、Airflowをインストールします。
pip install apache-airflow[postgres]
Airflowは 環境変数 AIRFLOW_HOME
を設定することで、ワークフローの保存先を指定できます。
export AIRFLOW_HOME=~/airflow
Airflowの初期化:
airflow db init
管理者ユーザーを作成:
airflow users create \
--username admin \
--firstname First \
--lastname Last \
--role Admin \
--email admin@example.com
AirflowのWebサーバーを起動:
airflow webserver --port 8080
スケジューラーの起動:
airflow scheduler
Web UIにアクセス(http://localhost:8080)
3. Airflow DAGの作成(Infuraデータ収集用)
Airflowで Ethereumのブロックデータを定期的に取得し、PostgreSQLに保存する ワークフロー(DAG)を作成します。
3.1 DAGの基本構造
DAGは、複数のタスクをスケジュール実行するPythonファイルで定義します。
AirflowのDAGファイルを ~/airflow/dags/infura_data_pipeline.py
に作成。
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from web3 import Web3
import psycopg2
import os
# Infuraの設定
INFURA_URL = "https://mainnet.infura.io/v3/YOUR_INFURA_PROJECT_ID"
w3 = Web3(Web3.HTTPProvider(INFURA_URL))
# PostgreSQLの接続設定
DB_PARAMS = {
"dbname": "blockchain_data",
"user": "your_username",
"password": "your_password",
"host": "localhost",
"port": "5432"
}
# Airflowのデフォルト設定
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'infura_data_pipeline',
default_args=default_args,
description='Fetch Ethereum block data and store in PostgreSQL',
schedule_interval=timedelta(minutes=10), # 10分ごとに実行
catchup=False,
)
# 最新ブロックを取得する関数
def fetch_latest_block():
latest_block_number = w3.eth.block_number
latest_block = w3.eth.get_block(latest_block_number, full_transactions=True)
conn = psycopg2.connect(**DB_PARAMS)
cursor = conn.cursor()
cursor.execute(
"INSERT INTO blocks (block_number, timestamp, miner, transactions_count) VALUES (%s, to_timestamp(%s), %s, %s) ON CONFLICT (block_number) DO NOTHING",
(latest_block.number, latest_block.timestamp, latest_block.miner, len(latest_block.transactions))
)
for tx in latest_block.transactions:
cursor.execute(
"INSERT INTO transactions (tx_hash, block_number, from_address, to_address, value, gas_price, gas_used) VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT (tx_hash) DO NOTHING",
(tx.hash.hex(), tx.blockNumber, tx['from'], tx['to'], w3.from_wei(tx.value, 'ether'), tx.gasPrice, tx.gas)
)
conn.commit()
cursor.close()
conn.close()
print(f"Block {latest_block_number} and {len(latest_block.transactions)} transactions stored successfully.")
# Airflowタスクの設定
fetch_block_task = PythonOperator(
task_id='fetch_latest_block',
python_callable=fetch_latest_block,
dag=dag,
)
fetch_block_task
4. AirflowでDAGを実行する
DAGファイルを保存した後、AirflowのWeb UI(http://localhost:8080)にアクセスし、DAGを有効化すると、10分ごとにEthereumの最新ブロックを取得し、データベースに保存できます。
手動で実行したい場合は、以下のコマンドを使います。
airflow dags trigger infura_data_pipeline
Airflowのログを確認するには、
airflow tasks logs -d infura_data_pipeline fetch_latest_block
5. まとめ
- Apache Airflowを使うことで、Ethereumのブロックデータ収集をスケジュール実行できる。
- DAG(ワークフロー)を作成し、データ取得→保存の流れを自動化できる。
- エラー発生時には自動リトライやログ監視を活用し、運用コストを削減できる。
- AirflowのWeb UIで直感的にワークフローの管理が可能。
この仕組みを活用すれば、定期的なデータ収集を自動化し、リアルタイム分析や機械学習への活用がしやすくなります!
Discussion