CDKを使ってPrefect を ECS にデプロイしてみた
はじめに
こんにちは!Septeni Japan 株式会社でデータエンジニアをしている大志万といいます。
実務では IaC は Terraform、ワークフローオーケストレーションツールは Digdag を使用して ETL を構築しています。
勉強の一環として、以前から気になっていた CDK(IaC) と Prefect(オーケストレーションツール) に触るために、CDK で Prefect サーバーを デプロイしてみました。
今回はその備忘録となります。
前提
簡単に CDK と Prefect について紹介します。
CDK とは
CDK は AWS のリソースをプログラムで管理するためのフレームワークです。
特徴として以下のようなものがあります。
- プログラミング言語で AWS リソースの定義・作成が可能(TypeScript, Javascript, Python, Java, C#, Go)
- 実態は CloudFormation であり、プログラミング言語で CloudFormation のテンプレートを生成してデプロイする仕組みになっている。
- 抽象化されたモジュールが提供されているので、AWS のリソースを簡単に作成できる。
下記の表に Terrfaorm と CDK の違いをまとめました。
Terraform | CDK | |
---|---|---|
記述方法 | HCL | TypeScript, Javascript, Python, Java, C#, Go |
デプロイ時の挙動 | API を使ってデプロイ | CloudFormation のファイルを生成してデプロイ |
diff の挙動 | 実リソースとの差分 | デプロイした CloudFormation との差分 |
途中で失敗した時の挙動 | 途中までリソースがデプロイされる | 全て成功しないとロールバック |
CDK を触ろうと思ったきっかけは、AWS 公式で モジュールが提供されており、Terraformよりも記述量を減らせそうだと思ったためになります。
Prefect とは
Prefect とは、データエンジニアリングのためのワークフローオーケストレーションツールです。
- シンプルな Python の構文で記述可能
- コンソールの UI がモダンできれい
といった特徴があります。また、アップデートも高頻度で行われており、つい先日 Prefect 3.0 がリリースされました。
Prefect は UI でワークフローの管理、実行履歴の確認、エラーの監視などができ、 OSS と Cloud マネージド版があります。
Cloud マネージド版だとユーザー管理や認証、webhook などが使えるようです。価格は2024年7月時点で下記のようになっています。
- Free プラン: 無料
- Pro プラン: 1850 ドル/月
- Enterprise プラン:問い合わせ
下記に Digdag と Prefect の違いをまとめています。
Digdag | prefect | |
---|---|---|
実行環境 | Java8 or 11 | Python>=3.8 |
記述方法 | yml ライクな独自拡張子.dig に記述 | Python で記述 |
ホスティング | OSS | Prefect Cloud or OSS |
UI | シンプル | リッチ |
Prefect を触ってみたいと思った主な理由は、Python 経験者がチームに多かったので、Python でシンプルな記述が可能であり、UI がリッチで触りやすそうだったためです。
CDK を使って Prefect を ECS にデプロイしてみた。
インフラ構成
今回は下記の記事を参考に ECS の一般的な構成で 構築してみました
下記が今回のリソースの構成図になります。
完成したコード
全体のコードは Github に公開しているので良かったらみてみてください。
コードの解説
VPC
const vpc = new ec2.Vpc(this, "Vpc", {
maxAzs: 2,
});
上記のコードでネットワーク系で必要なリソース一通りが作成されます。
VPC
パブリックサブネット(AZ ごと)
プライベートサブネット(AZ ごと)
セキュリティグループ
NAT ゲートウェイ(AZ ごと)
インターネットゲートウェイ
ルートテーブル
実務で Terraform を使う時はこれらのリソースを一つ一つ設定していたため、初めて CDK を使った時は衝撃でした
RDS
// RDS
const engine = rds.DatabaseInstanceEngine.postgres({
version: rds.PostgresEngineVersion.VER_16_3,
});
const database = new rds.DatabaseInstance(this, "Postgresql", {
vpc,
securityGroups: [new ec2.SecurityGroup(this, "SecurityGroup", { vpc })],
engine,
instanceType: ec2.InstanceType.of(
ec2.InstanceClass.T4G,
ec2.InstanceSize.MICRO
),
databaseName: "prefect",
});
database.connections.allowDefaultPortFromAnyIpv4();
Prefect は PostgreSQL を使うので、PostgreSQL の RDS を作成しています。
ECS
// ECS
const cluster = new ecs.Cluster(this, "Cluster", {
vpc,
});
const taskDefinition = new ecs.FargateTaskDefinition(this, "TaskDefinition", {
memoryLimitMiB: 2048,
cpu: 1024,
});
const host: string = database.instanceEndpoint.hostname;
const port: string = database.instanceEndpoint.port.toString();
const dbSecret: ISecret = database.secret!;
const username: string = dbSecret
.secretValueFromJson("username")
.unsafeUnwrap();
const password: string = dbSecret
.secretValueFromJson("password")
.unsafeUnwrap();
taskDefinition.addContainer("PrefectContainer", {
image: ecs.ContainerImage.fromRegistry(
"prefecthq/prefect:3.0.0rc3-python3.11"
),
memoryLimitMiB: 2048,
cpu: 1024,
entryPoint: ["/opt/prefect/entrypoint.sh", "prefect", "server", "start"],
environment: {
PREFECT_API_URL: "http://localhost/api",
PREFECT_SERVER_API_HOST: "0.0.0.0",
PREFECT_SERVER_API_PORT: "80",
PREFECT_API_DATABASE_CONNECTION_URL: `postgresql+asyncpg://${username}:${password}@${host}:${port}/prefect`,
},
portMappings: [{ containerPort: 80 }],
logging: ecs.LogDrivers.awsLogs({ streamPrefix: "PrefectContainer" }),
});
const service = new ecs.FargateService(this, "Service", {
cluster,
taskDefinition,
});
Prefect は 公式で Docker イメージを提供しており、今回はそのイメージを使って、Prefect サーバーを起動しています。
PREFECT_API_DATABASE_CONNECTION_URL に Prefect が使う DB の接続情報を設定することで、DB との接続が可能です。
タスク定義の環境変数に直接書いてしまっていますが、本来であればサーバーを起動するときに Secrets Manager などから取得して設定した方が良いです。
今回はお試しということで、そのまま記載しています。
ECS の service はデフォルトの設定しか行ってないですが、
オートスケーリング等も設定もできるので、実運用するとなった場合検討してみてもいいかもしれません。
ALB
const lb = new elbv2.ApplicationLoadBalancer(this, "LB", {
vpc,
internetFacing: true,
});
// IP Whitelist
// ここは必要に応じて設定してください
const ipWhiteLists = ["0.0.0.0/0"];
const listener = lb.addListener("Listener", { port: 80 });
listener.addTargets("EcsTarget", {
port: 80,
priority: 1,
conditions: [elbv2.ListenerCondition.sourceIps(ipWhiteLists)],
targets: [service],
healthCheck: { path: "/api/health" },
});
listener.addAction("DefaultTarget", {
action: elbv2.ListenerAction.fixedResponse(401, {
contentType: "text/plain",
messageBody: "Unauthorized access",
}),
});
ECS サービスにルーティングする ALB を作成しています。
addTarget メソッドで、ECS のサービス をターゲットとして追加しています。
今回は最低限のセキュリティ対策として、特定の IP からのアクセスのみを許可したかったので、ルールに IP の制限を設定しています。
また、addAction メソッドで対象外の IP からのアクセスは 401: Unauthorized を返すデフォルトアクションを定義しています。
URL にアクセスしてみる
Load Balancer の DNS 名をコピーして、ブラウザでアクセスしてみます。
無事に Prefect の画面が表示されました。
別の IP からアクセスすると、下記のような画面が表示されたので、IP 制限が効いていることが確認できました。
Prefect に触ってみる
ではローカルで Prefect の ワークフロー を実行してみて、Prefect の UI にログが表示されるか確認してみます。
Prefect サーバーとの接続は、PREFECT_API_URL 環境変数を ALB の URL に変更すれば OK です。
PREFECT_API_URL=http://xxxxx-xxxxx.ap-northeast-1.elb.amazonaws.com/api
flow とタスクを記述してみる
import httpx # an HTTP client library and dependency of Prefect
from prefect import flow, task
@task(retries=2)
def get_repo_info(repo_owner: str, repo_name: str):
"""Get info about a repo - will retry twice after failing"""
url = f"https://api.github.com/repos/{repo_owner}/{repo_name}"
api_response = httpx.get(url)
api_response.raise_for_status()
repo_info = api_response.json()
return repo_info
@task
def get_contributors(repo_info: dict):
"""Get contributors for a repo"""
contributors_url = repo_info["contributors_url"]
response = httpx.get(contributors_url)
response.raise_for_status()
contributors = response.json()
return contributors
@flow(log_prints=True)
def repo_info(repo_owner: str = "PrefectHQ", repo_name: str = "prefect"):
"""
Given a GitHub repository, logs the number of stargazers
and contributors for that repo.
"""
repo_info = get_repo_info(repo_owner, repo_name)
print(f"Stars 🌠 : {repo_info['stargazers_count']}")
contributors = get_contributors(repo_info)
print(f"Number of contributors 👷: {len(contributors)}")
if __name__ == "__main__":
repo_info()
Prefect の公式ドキュメントにある、ワークフローを実行してみます。
内容は、 Prefect リポジトリのスターやコントリビューター数を取得して表示するもので、リポジトリのデータを取得する task と、コントリビューターのデータを取得する task、それらをまとめる flow が記述されています。
実行すると UI に実行情報やログがちゃんと表示されました。
感想
CDK や Prefect については今まで触ったことないので今回を通してだいぶイメージを掴むことができました。
それぞれの感想を書いていきます。
CDK
L2 コンストラクトを使うことで、少しの記述量で直感的に AWS のリソースが作成できるのはかなり便利だと思いました。その反面、どんなリソースが AWS で動いているのかわからなくなるといった懸念も出てきそうです。
diff の挙動に関しても気になったので記載しておきます。
cdk は terraform とは少し違って、実際のリソースではなく、デプロイしたスタックとの比較になります。そのため、terraform では、手動で何かリソースを変更しても terraform plan をすれば実リソースとの差分が表示され、apply すれば強制的に terraform に記載されているリソースが反映されますが、cdk の場合だと、ドリフト検出をして CDK にインポートする or リソースを元に戻す必要があり、その点で少し手間だなと思いました。
(そもそも IaC を使う時点で、手動でリソースを変更するなという話ではありますが)
また、触る上で色々ドキュメントも見てみたのですが、ドキュメントについては CDK よりも Terraform の方がサンプルの記述量が多く、わかりやすいと感じました。
触ってみた感じ、CDK はリソース作成時の時間短縮という面では便利だと思いました。
その反面
- 「よくわからないけど動いている」という状態になる懸念がある
- 突発的に手動でリソースを変更しづらい
- ドキュメントの充実度
という点で、AWSやIaCの初心者が使うには、Terraformの方がとっつきやすそうな気がしました。
個人的には diff の挙動や、ドキュメントの部分で Terraform の方が好きだなとは思いましたが、まだ CDK は使い始めということもあり、個人プロダクトなどでもう少し触ってみたいと思います。
Prefect
Digdag と比べてコンソールがモダンで使いやすい点はいいなと思いました。
Digdag の場合はタスク間の変数の受け渡しが難しく、プラグインを入れないとできませんでしたが、Prefect では python でワークフローを定義しているので、そこら辺も柔軟に設定できるのは良さそうです。
また、デコレーターで task や flow を定義する方法も直感的でわかりやすいと感じました。
かなり感触良かったので、Digdag からの移行も考えつつ引き続き色々触ってみたいと思います。
We are hiring
Septeni Japan では、一緒にプロダクト開発組織を盛り上げてくれる仲間を募集しています!
ご興味のある方は以下リンクから応募していただき、カジュアル面談を通じて働く環境や仲間を知っていただければと思います!
その際、応募フォームの「知ったきっかけ」に「テックブログ」と記載いただければと思います。
Discussion