Trocco の運用を Terraform 管理に変えてみた
はじめに
こんにちは、メドレーでデータエンジニアをしている山邉(@beniyama)です。
先日、弊社で利用中の Trocco についてこちらの記事を寄稿させていただきましたが、今後の展望として以下を挙げていました。
上述の通り段々と規模が大きくなってきて GUI での管理が大変になってきているため、API や Terraform 経由でのプロビジョニングも生成 AI を絡めて試していきたいです。
今回、Trocco の一部運用を Terraform による IaC (Infrastructure as Code) 管理に移行してみました。この記事では、既に連携が回っている Trocco 運用を Terraform 管理に移行するための具体的な手順と、移行の中で気づいたポイントについて紹介します。
なぜ Terraform で管理するのか?
上述の記事のおさらいですが、Trocco は GUI での操作が直感的で極めて便利な一方で、運用規模が大きくなるにつれ以下のような課題に直面しつつありました。
- 一気に数十のテーブルを連携しようとするとひたすら複製 -> 編集で画面をポチポチしないといけない(設定ミスをしがち + 一括で修正をかけたいときも個別に開いて対応するしかない)
- 数十のテーブルをワークフロー管理しようとするとノードのつなぎ変えや再配置が面倒(あと転送設定名が長いと見切れてしまってどれがどの設定かわからなくなる)
などなど。
長大になってしまったワークフロー
また、Git 連携で変更を追跡はできるものの、後で一連のデータ連携追加対応も自動化したいという思惑もあり、Terraform による IaC 管理を導入することにしました。
Trocco の Terraform Provider について
Trocco は公式で Terraform Provider を提供しています。
この Provider を使うことで、以下のようなリソースを Terraform で管理できます。
- Job Definitions (データ転送ジョブ定義): 本件では RDS(MySQL) スナップショット から BigQuery へのデータ転送設定
- Pipelines (ワークフロー): 複数のジョブを組み合わせた依存関係のあるパイプライン
- Labels, Teams, Resource Groups: その他、ユーザーやチーム、リソースグループなどの管理リソース
同様に REST API も明文化されており、こちらから直接ジョブ情報を取得したり、操作することも可能です。
移行の全体像
既存の Trocco 運用を Terraform 管理に移行する際の流れは以下の通りです。
-
既存ジョブ情報の取得: Trocco API から既に設定済みのジョブ定義を取得し、
terraform import
のための空のリソース定義を作成 - Terraform import: 既存のジョブ定義を Terraform の state に取り込む
-
Terraform コード生成: state から
.tf
ファイルを自動生成 -
差分確認:
terraform plan
で意図しない差分がないことを確認 - 運用フロー構築: 以降の変更は Terraform + Git で管理
ここでのポイントは API で取得した情報から .tf ファイルを起こすのではなく、一回 state に import してから生成する という点です。API から組み立てようとすると結局 drift (Terraform の state と実環境との乖離)が起きるので、API は対象ジョブの情報を特定するためだけに使っています。
具体的な移行手順
1. 既存ジョブ情報の取得
まず、Trocco API を使って既存のジョブ定義を取得するスクリプトを作成しました。
import http.client
import json
import os
class TroccoAnalyzer:
def __init__(self):
self.api_key = os.getenv('TROCCO_API_KEY')
def fetch_api_data(self, endpoint: str) -> Dict:
"""Trocco API からデータを取得してJSONファイルに保存"""
conn = http.client.HTTPSConnection("trocco.io")
headers = {
'accept': "application/json",
'Authorization': f"Token {self.api_key}"
}
# ページネーション処理
all_items = []
cursor = None
while True:
url = f"/api/{endpoint}?limit=100"
if cursor:
url += f"&cursor={cursor}"
conn.request("GET", url, headers=headers)
res = conn.getresponse()
data = json.loads(res.read().decode("utf-8"))
all_items.extend(data.get('items', []))
if not data.get('next_cursor'):
break
cursor = data['next_cursor']
return {"items": all_items}
このスクリプトで job_definitions
の情報を取得し、JSON ファイルに保存します。
2. Terraform import
次に、取得した既存ジョブを Terraform の state に import します。
#!/bin/bash
# import_jobs.sh
# 既存の state をクリーンアップ
terraform state list | grep 'trocco_job_definition' | xargs -I {} terraform state rm {}
# 空の .tf ファイルを作成 (import に必要)
for job_id in $(jq -r '.items[].id' data/job_definitions.json); do
job_name=$(jq -r ".items[] | select(.id == $job_id) | .name" data/job_definitions.json)
resource_name=$(echo "$job_name" | sed 's/[^a-zA-Z0-9_]/_/g')
# 空のリソース定義を作成
echo "resource \"trocco_job_definition\" \"$resource_name\" {}" > "table_$resource_name.tf"
# terraform import 実行
terraform import "trocco_job_definition.$resource_name" "$job_id"
done
ここのポイントは以下の2点です。
- State のクリーンアップ: 実行時に state を一度削除してから import することで冪等性を保証
-
空の .tf ファイル作成:
terraform import
は対象リソースの定義が必要なので、先ほど取得した API 情報から空のファイルを作成し、import を実行
この空の .tf ファイルは、state への import 完了後には不要となり、次のステップで正規のものに置き換わります。
3. State から Terraform コード生成
import した state から、実際の .tf
ファイルを生成します。
#!/usr/bin/env python3
"""
Terraform state から .tf ファイルを生成するスクリプト
"""
import json
import subprocess
from pathlib import Path
def generate_from_state(output_dir: Path):
# terraform show -json で state を取得
result = subprocess.run(
["terraform", "show", "-json"],
capture_output=True,
text=True,
check=True
)
state = json.loads(result.stdout)
# 各リソースについて .tf ファイルを生成
for resource in state["values"]["root_module"]["resources"]:
if resource["type"] != "trocco_job_definition":
continue
resource_name = resource["name"]
values = resource["values"]
# HCL 形式で出力
tf_content = generate_hcl(resource_name, values)
output_file = output_dir / f"table_{resource_name}.tf"
output_file.write_text(tf_content)
# terraform fmt で整形
subprocess.run(["terraform", "fmt", str(output_dir)], check=True)
改めてなぜ API ではなく state から生成するのか? という点ですが、当初 API から起こそうとしたところ drift が起きてしまい、どうも API からの返却値と Provider 経由で扱われる値に差分がありそう(デフォルト値周り?)だったため、state からの生成に切り替えた、という経緯です。
4. 生成された Terraform コードの確認
以上のプロセスを経て生成された .tf
ファイルは以下のような形になります(テーブル名などはダミーです)。また中身の構成は運用されている環境で異なるかと思います。
resource "trocco_job_definition" "users" {
name = "production-db.users"
description = "ユーザーマスタの転送"
resource_group_id = 1234
is_runnable_concurrently = false
labels = [
{
name = "Production Data Transfer"
}
]
filter_columns = [
{
name = "id"
src = "id"
type = "string"
},
{
name = "email"
src = "email"
type = "string"
},
{
name = "created_at"
src = "created_at"
type = "long"
},
]
filter_masks = [
{
name = "email"
mask_type = "email"
},
]
input_option_type = "s3"
input_option = {
s3_input_option = {
s3_connection_id = 4321
bucket = "production-db-export"
region = "ap-northeast-1"
path_prefix = "$today$/"
path_match_pattern = "/*/production-db/.../*.parquet"
parquet_parser = {
columns = [
{ name = "id", type = "string" },
{ name = "email", type = "string" },
{ name = "created_at", type = "long" },
]
}
}
}
output_option_type = "bigquery"
output_option = {
bigquery_output_option = {
bigquery_connection_id = 8888
dataset = "production_models"
table = "users"
mode = "replace"
}
}
}
この状態で terraform plan
して差分が出なければ移行は完了です。
例えば同じデータソースのテーブルを新規追加したかったり、既存テーブルのカラム変更をしたいときはこの .tf ファイルをベースに作業すれば、あとは terraform plan/apply で変更が適用できるようになります。
5. ワークフローの管理
補足ですが、個別のジョブだけでなく、複数のジョブを組み合わせたワークフロー(Pipeline)も同様の手順で Terraform 管理ができます。例として、以下のようなファイルが生成されます。
resource "trocco_pipeline_definition" "daily_data_transfer" {
name = "daily-production-data-transfer"
resource_group_id = 1234
max_task_parallelism = 2
schedules = [
{
frequency = "daily"
time_zone = "Asia/Tokyo"
minute = 0
hour = 5
}
]
notifications = [
{
type = "job_execution"
destination_type = "slack"
notify_when = "failed"
slack_config = {
notification_id = 1111
message = "<!subteam^XXXXX>\nワークフローの実行が失敗しました"
}
}
]
tasks = [
{
key = "1"
type = "trocco_transfer"
trocco_transfer_config = {
definition_id = 111111
}
},
{
key = "2"
type = "trocco_transfer"
trocco_transfer_config = {
definition_id = 111112
}
},
]
task_dependencies = [
{
source = "1"
destination = "2"
},
]
}
これにより、
- DAG のコード管理: 依存関係の柔軟な組み替え、転送ジョブの設定漏れ検知
- スケジュール設定: ワークフロー起動時間の一括管理
- 通知設定: 終了時・エラー発生時それぞれの設定確認、設定漏れ防止
などができるようになります。
実際、上記の大量になってしまったワークフローに新規の転送設定を一つ追加するケースで、
「ルートに新しいエッジを足して、今回新規作成したジョブをそこにつけて」
と Claude Code に指示を出し、できたファイルを apply すると下記のように変更されました。
新しいエッジとノード(データ転送ジョブ)が追加された!
(変更対象ではないノードの位置も変わってはいますが、見た目上のレイアウトだけです)
同様に「エッジの幅は3で深さは均等になるように配置して」とか「深さは1でひたすらフラットに配置して」とか「最近不安定なこのジョブは(後続を止めないように)独立させて」などの指示も可能です。
これで何件データ転送ジョブがあっても怖いものはないですね!
まとめ
今回は運用中の Trocco 管理を Terraform に移行する手順とポイントをご紹介しました。小規模運用のフェーズでは GUI 管理をし、中〜大規模になったタイミングで IaC 管理に移行するケースはそれなりにあるかと思いますので、何かのご参考になれば幸いです。
今回、ジョブ転送設定の追加変更からワークフローへの反映も一気通貫でできるようになりましたので、新規の連携設定も生成できれば、
- データソースのスキーマ変更や、Github Issues などからのデータ連携設定依頼を検知
- AI エージェントが自律的に Trocco リソースの変更 Pull Request を作成し、
terraform plan
結果を記載 - データチームによるレビューと承認
- CI/CD から
terraform apply
で環境反映
ということも実現できます。
そのためには データソース側のスキーマからどういった .tf ファイルを生成するべきか? を知る必要があり、今回のような 既存の連携設定から起こしたお手本データ が必要になります。
この辺りについての取り組みもまた次回の記事でご紹介できればと思います!
We're hiring!
メドレーでは生成 AI を活用したデータマネジメントの取り組みを一緒に進めていただけるデータエンジニア・データアナリストを募集しています。ご興味あれば下記リンクよりぜひご応募ください!
Discussion