Cloud Composerのローカル開発環境をカスタマイズして開発サイクルを大幅短縮した話
はじめに
WED株式会社データエンジニアのmomokun7です。
Cloud Composer (Managed Airflow) でDAGを開発していると、こんな経験はありませんか?
「コード修正 → GCSアップロード → 反映待ち(3-5分) → エラー → また修正...」
この繰り返し、本当に時間がかかりますよね。ローカルで即座に動作確認できれば、開発効率は劇的に向上するはずです。Googleはcomposer-local-devというローカル開発ツールを提供しています。ただし、実務で使おうとすると、環境構築に手間取ったり、ドキュメントが英語のみだったりと、そのままでは使いづらい部分がありました。
本記事では、このcomposer-local-devをベースに実務で必要な機能を追加・カスタマイズすることで、開発サイクルを大幅に短縮できたローカル開発環境の実装について解説します。
課題:開発サイクルが遅すぎる
Cloud Composerでの開発には、いくつかの大きな課題があります。まず、デプロイに非常に時間がかかります。DAGファイルをGCSにアップロードし、Composerが変更を検知するまで数分待つ必要があります。エラーがあれば、また修正してアップロードという作業を繰り返すことになり、フィードバックループが非常に長くなってしまいます。
次に、環境構築の煩雑さがあります。開発者ごとに異なるローカル環境を使っていると、依存パッケージのバージョン不一致が発生したり、staging/productionとの設定差異に悩まされたりします。
これらの課題を解決するため、3つの目標を設定しました。第一に、開発サイクルの短縮です。コード修正から動作確認まで数秒で完了する環境を目指しました。第二に、環境の一貫性です。stagingと完全に同じ環境をローカルに構築することで、「ローカルでは動くけど本番で動かない」問題を排除します。第三に、開発者体験の向上です。簡単にセットアップでき、分かりやすいCLIで操作できる環境を作ることを目指しました。
解決策:composer-local-devをベースにカスタマイズ
本記事で紹介する環境は、Googleが提供する composer-local-dev をベースにしています。このツールはCloud Composerのローカル実行を可能にする公式実装ですが、より扱いやすくするためには以下の点で工夫が必要だと感じました:
- Variables管理の自動化: 本番環境のVariablesを手動でコピーする必要があった
- 操作の簡易化: 複数のコマンドを覚える必要があった
- 現在の状況確認: 現在認証情報が適用されているのか、どんな環境を使っているのかなどすぐにわかるようにしたい
そこで、composer-local-devの基盤実装はそのまま活用しつつ、以下の機能を追加・カスタマイズしました:
- Secret Manager統合による Variables の自動同期
- 日本語ドキュメントと日本語CLI出力
- Makefileによる簡易操作
- 現在のステータス確認
- uvによる高速パッケージ管理
- Richライブラリによる視覚的なCLI表示
システム構成

このシステムは4つの主要コンポーネントで構成されています。Pythonで実装された環境管理ツール「composer-local CLI」(composer-local-devのものを使用)、Airflow + PostgreSQLをコンテナ化するDocker、Variablesを安全に保管・同期するSecret Manager、そして開発者向けの簡易コマンド集としてのMakefileです。
実装の詳細
まずTerraformでSecret Managerの作成や特定のユーザーグループがComposerのサービスアカウントの権限を借用できるように設定していきます。
1. インフラ構築(Terraform)【新規実装】
Secret Manager の作成
resource "google_secret_manager_secret" "composer_airflow_variables" {
project = var.project_id
secret_id = "composer-airflow-variables"
labels = {
"app" : "composer"
}
replication {
auto {}
}
}
IAM権限の設定
resource "google_service_account_iam_member" "composer_sa_impersonation" {
service_account_id = google_service_account.composer_sa.name
role = "roles/iam.serviceAccountTokenCreator"
member = "group:dev@example.com"
}
2. composer-local CLIツール【カスタマイズ】
既存のCLIツール(composer-local-devベース)の機能をさらに充実させ、画面に表示されるメッセージをすべて自然な日本語にし、処理に時間がかかる際に「作業中」であることが一目でわかるアニメーション(スピナー)を追加します。
プロジェクト構成
composer_local/
├── cli.py # CLIエントリーポイント
├── composer_settings.py # 設定ファイル
├── environment.py # Docker環境管理
├── secret_manager_sync.py # Secret Manager連携
├── export_composer_variables.py # Variables エクスポート
└── import_variables_to_local.py # Variables インポート
設定ファイル
# プロジェクト設定
PROJECT_ID = "your-gcp-project-id"
COMPOSER_ENV_NAME = "your-composer-environment"
COMPOSER_LOCATION = "your-location"
# Composerイメージ設定
COMPOSER_IMAGE_VERSION = "composer-3-airflow-2.10.5-build.13"
# サービスアカウント
SERVICE_ACCOUNT = "your-composer-sa@your-project-id.iam.gserviceaccount.com"
# Secret Manager
SECRET_ID = "your-composer-airflow-variables"
# ローカル環境設定
LOCAL_AIRFLOW_PORT = 8080
LOCAL_POSTGRES_PORT = 25432
LOCAL_MEMORY_LIMIT = "4g"
主要コマンド【日本語対応カスタマイズ】
import typer
from rich.console import Console
app = typer.Typer()
console = Console()
@app.command()
def create():
"""ローカルComposer環境を作成"""
console.print("[bold green]環境を作成しています...[/bold green]")
# Docker環境構築、PostgreSQL初期化、Airflow設定
# ※基盤処理はcomposer-local-devを活用
@app.command()
def start():
"""環境を起動"""
console.print("[bold blue]環境を起動しています...[/bold blue]")
# Dockerコンテナ起動、Webサーバー起動(port 8080)
# ※基盤処理はcomposer-local-devを活用
@app.command()
def sync_vars():
"""Variablesを同期"""
console.print("[bold cyan]Variables を同期しています...[/bold cyan]")
# Secret Manager → ローカルAirflow(新規実装)
Docker環境管理【composer-local-devの基盤を活用】
import docker
from pathlib import Path
class ComposerEnvironment:
def __init__(self):
self.client = docker.from_env()
self.project_root = Path(__file__).parent.parent
def create_container(self):
"""Dockerコンテナを作成"""
# ※この処理はcomposer-local-devをベースにしています
container = self.client.containers.run(
image=f"gcr.io/cloud-airflow-releaser/{COMPOSER_IMAGE_VERSION}",
name="composer-local",
ports={
"8080/tcp": LOCAL_AIRFLOW_PORT,
"5432/tcp": LOCAL_POSTGRES_PORT,
},
volumes={
str(self.project_root / "dags"): {
"bind": "/home/airflow/dags",
"mode": "rw",
},
},
environment={
"AIRFLOW__CORE__LOAD_EXAMPLES": "False",
"AIRFLOW__CORE__DAGS_FOLDER": "/home/airflow/dags",
},
mem_limit=LOCAL_MEMORY_LIMIT,
detach=True,
)
return container
Cloud Composerの公式Dockerイメージを使用することで、stagingと完全に同じ環境を実現しています。PostgreSQLのポートは競合を避けるため25432に設定し、DAGファイルの編集は即座に反映されます。
Secret Manager連携【新規実装】
Variables同期は2段階で実現:
- export: Cloud Composer → Secret Manager
- import: Secret Manager → ローカルAirflow
from google.cloud import composer_v1
from google.cloud import secretmanager
import json
def export_variables_to_secret_manager():
"""Cloud ComposerのVariablesをSecret Managerに保存"""
# Composer環境からVariables取得
# 注: 実際にはAirflow REST APIまたはAirflow CLIを使用してVariablesを取得します
composer_client = composer_v1.EnvironmentsClient()
env_name = f"your/env/path"
# Airflow Variables取得の実装例
# variables = get_airflow_variables_via_api()
# Secret Managerに保存
secret_client = secretmanager.SecretManagerServiceClient()
parent = f"your/secret/path"
payload = json.dumps(variables, ensure_ascii=False).encode("utf-8")
response = secret_client.add_secret_version(
request={"parent": parent, "payload": {"data": payload}}
)
print(f"✓ Variables を保存: {response.name}")
from google.cloud import secretmanager
from airflow.models import Variable
import json
def import_variables_from_secret_manager():
"""Secret ManagerからVariablesを取得してローカルAirflowに設定"""
# Secret Managerから取得
client = secretmanager.SecretManagerServiceClient()
name = f"your/secret/path"
response = client.access_secret_version(name=name)
variables = json.loads(response.payload.data.decode("utf-8"))
# ローカルAirflowに設定
for key, value in variables.items():
Variable.set(key, value)
# セキュリティ: Variableの値は絶対にログ出力しない
print(f"✓ Variable 設定: {key}")
並列処理で高速化:
from concurrent.futures import ThreadPoolExecutor, as_completed
def delete_all_variables_parallel():
"""既存Variablesを並列削除(最大10ワーカー)"""
existing_keys = Variable.get_all().keys()
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [
executor.submit(Variable.delete, key)
for key in existing_keys
]
for future in as_completed(futures):
try:
future.result()
except Exception as e:
print(f"削除エラー: {e}")
3. Makefileで簡易化【新規実装】
# 環境作成
create:
@gcloud auth login
@gcloud auth application-default login
@uv run -- composer-local create --from-image-version composer-3-airflow-2.10.2-build.13
@$(MAKE) start
@$(MAKE) sync-vars
# 環境起動
start:
@uv run -- composer-local start
# 環境停止
stop:
@uv run -- composer-local stop
# Variables同期
sync-vars:
@uv run -- python composer_local/export_composer_variables.py
@uv run -- python composer_local/import_variables_to_local.py
# 認証(個人アカウント)
auth-user:
@gcloud auth login
@gcloud auth application-default login
# 認証(サービスアカウント)
auth-sa:
@gcloud auth login
@gcloud auth application-default login --impersonate-service-account=$${SERVICE_ACCOUNT}
# 環境の状態確認
status:
@uv run -- composer-local describe
# ログ表示
logs:
@uv run -- composer-local logs
# 環境再作成
recreate:
@$(MAKE) stop || true
@uv run -- composer-local remove --force --skip-confirmation || true
@$(MAKE) create
Variables の同期
# staging環境の最新Variablesを取得
make sync-vars
実装のポイント
1. フォアグラウンド起動への変更
ゾンビプロセスの発生やログの可視性の問題から、フォアグラウンド起動に変更しました。
def start_foreground(self):
"""フォアグラウンドで起動"""
import signal
def signal_handler(sig, frame):
console.print("[yellow]終了シグナルを受信[/yellow]")
self.stop()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
container = self.client.containers.get("composer-local")
for line in container.logs(stream=True):
console.print(line.decode("utf-8"), end="")
2. Richによる見やすいCLI出力
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn
console = Console()
def sync_variables_with_progress():
"""進捗表示付きでVariables同期"""
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
console=console,
) as progress:
task = progress.add_task("[cyan]Secret Managerから取得中...", total=None)
variables = fetch_from_secret_manager()
progress.update(task, completed=True)
task2 = progress.add_task("[green]ローカルに設定中...", total=len(variables))
for key, value in variables.items():
Variable.set(key, value)
progress.advance(task2)
成果と効果
開発効率の向上
以前:
コード修正 → GCSアップロード → 反映待ち(3-5分) → 確認
現在:
コード修正 → ブラウザリロード → 確認
チーム全体での効果
現在、チームの開発者全員がこの環境を使用しています:
- デプロイ前に高速でデバッグすることが可能になった
- 「ちょっと試してみる」ハードルが下がり、実験的な実装が増加
- レビュー時にローカルで動作確認できるので、レビュー品質が向上
まとめ
composer-local-devをベースにカスタマイズすることで、Cloud Composerの開発体験が大きく改善しました。フィードバックループの高速化、環境の一貫性、そしてチーム全体での生産性向上を実現できました。
Cloud Composerでの開発に時間がかかっていると感じている方は、ぜひローカル環境の導入を検討してみてください。
実際に試してみる
本記事で紹介した実装は、実際に動作するコードとしてGitHubで公開しています(📦 composer-local-jp)。
クイックスタート
git clone https://github.com/momokun7/composer-local-jp.git
cd composer-local-jp
cp composer_local/composer_settings.py.example composer_local/composer_settings.py
vim composer_local/composer_settings.py # 設定編集
make import # 依存関係インストール
make create # 環境構築
make start # 起動
リポジトリへのスター⭐️も励みになります。バグ報告、機能要望、質問はIssuesへ、コードの改善はPull Requestで歓迎します。
Discussion