AWS Glue WorkflowsをTerraformで構築する
背景
ゴリゴリ系エンジニアの生みの親、pageoです。
AWSのサーバレスETLパイプライン管理サービスであるGlue Workflowsに関する日本語の情報が少なかったので、記事にしてまとめてみました。
ただサービスの特徴を説明しても面白くないので、Terraformを用いて各Glueサービス(Jobs/Crawlersなど)を一通り実装し、Terraform Registryで公開しています
Github Repositoryは以下です(スター頂けるとpageo喜びます)
「Terraform Registryとはなんぞや?」「Terraform Registryのmoduleでどうやって公開するんや?」という方は、以下の記事で詳しく説明しているのでそちらを参照ください。(ハート頂けるとpageo喜びます)
Glueとは
Glueをざっくり説明
Glueをざっくり一言で表すと「サーバーレスなデータ統合サービス」です。
"サーバレスなETLサービス"と表現しても良いのですが、Glue Data CatalogなどETL以外にもサービスとしての守備範囲が広いので、"サーバーレスなデータ統合サービス"とさせていただきました。
Glue Workflowsとは
各Glueサービス(Jobs/Crawlers/Triggersなど)を使用した一連の複雑なETLジョブを管理/可視化できるサービスです。
以下はGlue Workflowの可視化イメージです
参照: 公式サイト
Glue 各種サービスについて
-
Glue Triggers
-
Glue Jobs
やGlue Crawlers
を開始するサービスです - トリガーの種類には
SCHEDULED
,ON_DEMAND
,CONDITIONAL
,EVENT
があります
-
-
Glue Jobs
- ETLジョブを実行するサービスです
- ジョブコマンドの種類には
glueetl
,pythonshell
,gluestreaming
があります
-
Glue Crawlers
-
Glue Catalog
にメタデータを書き込んでくれるサービスです - データソースとしてS3やDynamoDBなどさまざまなデータストアをクロールできます
-
-
Glue Data Catalog
-
Databases
-
Tables
を管理するサービスです
-
-
Tables
-
Glue Jobs
のデータソース or ターゲットとして使用可能な、データストアのメタデータを管理するサービスです - 手動での管理や、
Glue Crawlers
を使った自動管理も可能です
-
-
Databases
-
その他(この記事での実装対象外)
-
Glue Blueprints
-
Glue Workflows
を簡単に作成/共有できるサービスです - (使い道がわからん、、、)
-
-
Glue Studio
- ETL ジョブを簡単に作成/実行/監視できるビジュアルインターフェースです
- (Terraformで本格的にIaC化する前の試験実装としてよく使用しています)
-
Glue Databrew
-
Glue Studio
を非エンジニア向けにしたノンコードなビジュアルデータ処理サービス - Databrew自体がかなりゴツめのサービス(Glue本体の管理コンソールページと切り離されている)なので、使用するまでに必要な学習量が多い気がします
- (いつか使いたい)
-
-
Glue Blueprints
ここまでざっくりとAWS Glueについて説明してきましたが、さらに詳細な説明が欲しい方は公式サイトやClassmethodさんの記事に丁寧に整理されているので、そちらを参考いただければと思います。
前提
構築内容
それでは本題です
この記事で作成するAWS Resourceは以下です
- Glue Workflows
- Glue Triggers
- Glue Jobs
- Glue Clawlers
- Glue Data Catalog
イメージ
構築イメージは以下です
想定するETL Workflowは以下です
- EventBridgeによりスケジューリングされた時間でGlue Workflowを起動
- EventBridgeと連携されたGlue TriggersからGlue Crawlersを起動
- Glue CrawlersでS3 Bucket(Data Source)からGlue Data Catalogを更新
- Glue Data Catalogの更新完了をトリガーに、Glue Jobsを起動
- Glue JobsでS3 Bucket(Data Source)からS3 Bucket(Data Target)に結果を出力
構成
ディレクトリ構成は以下になります。
+- terraform-aws-glue-workflows/
|
+---- examples/
| +---- sample/
| +---- main.tf
| +---- provider.tf
| +---- locals.tf
| +---- README.md
+---- .gitignore
+---- catalog.tf
+---- crawler.tf
+---- job.tf
+---- workflow.tf
+---- variables.tf
+---- outputs.tf
+---- README.md
+---- LICENSE
使い方
Github/Terraform RegistryにExamplesを載せているので詳細はそちらを参照ください
以下はmoduleの使用例です
module "glue_workflows" {
source = "page-o/glue-workflows/aws"
workflow = {
name = "${local.env}-${local.project}-glue-workflow"
description = "Sample"
}
event_trigger = {
name = "${local.env}-${local.project}-event-trigger"
description = "Sample"
}
crawler_trigger = {
name = "${local.env}-${local.project}-crawler-trigger"
description = "Sample"
}
catalog = {
database_name = "${local.env}-${local.project}-database"
database_description = "Sample"
tables = [
{
name = "hoges"
description = "Sample"
storage_location = "s3://${module.source_bucket.name}/hoges/"
ser_de_name = "hoges"
},
{
name = "fugas"
description = "Sample"
storage_location = "s3://${module.source_bucket.name}/fugas/"
ser_de_name = "fugas"
}
]
}
crawler = {
name = "${local.env}-${local.project}-crawler"
description = "Sample"
role_arn = module.workflow_role.arn
catalog_target_tables = ["hoges", "fugas"]
}
jobs = [
{
name = "${local.env}-${local.project}-hoges-job"
description = "Sample"
script_object_key = local.job_script_object_key.hoge
default_arguments = local.default_arguments
},
{
name = "${local.env}-${local.project}-fugas-job"
description = "Sample"
script_object_key = local.job_script_object_key.fuga
default_arguments = local.default_arguments
}
]
job_role_arn = module.workflow_role.arn
job_tmp_bucket_name = module.scripts_bucket.name
}
実装
以下に書くResourceの実装について説明していきます。
基本的に、for
を用いてコードの使い回しわ避けるなどDRY原則に基づいて実装しています
TerraformにおけるDRY原則の実践については、以下の記事で詳しく説明しているので参照ください。(ハート頂けるとpageo喜びます)
まずGlue Data Catalog
ですが、単純にDatabase
とTable
のResourceを定義しているだけです
データソースがS3であることを前提として実装していますが、データソースによってはstorage_descriptor
パラメータの書き方が若干変わると思われます
resource "aws_glue_catalog_database" "main" {
name = var.catalog.database_name
description = var.catalog.database_description
}
resource "aws_glue_catalog_table" "main" {
count = length(var.catalog.tables)
database_name = aws_glue_catalog_database.main.name
name = element(var.catalog.tables.*.name, count.index)
description = element(var.catalog.tables.*.description, count.index)
table_type = element(var.catalog.tables.*.table_type, count.index)
storage_descriptor {
location = element(var.catalog.tables.*.storage_location, count.index)
input_format = element(var.catalog.tables.*.input_format, count.index)
output_format = element(var.catalog.tables.*.output_format, count.index)
ser_de_info {
name = element(var.catalog.tables.*.ser_de_name, count.index)
serialization_library = element(var.catalog.tables.*.serialization_library, count.index)
parameters = {
"serialization.format" = 1
}
}
}
lifecycle {
ignore_changes = [parameters, storage_descriptor, owner]
}
}
Glue Crawler
も同じく単純な定義です
ここも、データソースがGlue Catalog
であることを前提として実装しているので、他のデータソースを使用する場合はcatalog_target
パラメータをs3_target
/dynamodb_target
/jdbc_target
/mongodb_target
などに変更する必要があります
その辺りはTerraform公式ドキュメントを参考に実装してください
resource "aws_glue_crawler" "main" {
database_name = aws_glue_catalog_database.main.name
name = var.crawler.name
description = var.crawler.description
role = var.crawler.role_arn
catalog_target {
database_name = aws_glue_catalog_database.main.name
tables = var.crawler.catalog_target_tables
}
schema_change_policy {
delete_behavior = "LOG"
update_behavior = "UPDATE_IN_DATABASE"
}
lifecycle {
ignore_changes = [configuration]
}
}
続いてGlue Jobs
です
ここも、Glue Jobs
のTypeがglueetl
であることを前提に実装しています
他のType(pythonshell
/gluestreaming
)の場合にはmax_capacity
/timeout
/command
パラメータを修正する必要があります
Terraform公式ドキュメントを参考に実装しましょう
resource "aws_glue_job" "main" {
count = length(var.jobs)
name = element(var.jobs.*.name, count.index)
description = element(var.jobs.*.description, count.index)
glue_version = element(var.jobs.*.glue_version, count.index)
execution_class = element(var.jobs.*.execution_class, count.index)
max_retries = element(var.jobs.*.max_retries, count.index)
timeout = element(var.jobs.*.timeout, count.index)
worker_type = element(var.jobs.*.worker_type, count.index)
number_of_workers = element(var.jobs.*.number_of_workers, count.index)
role_arn = var.job_role_arn
execution_property {
max_concurrent_runs = element(var.jobs.*.max_concurrent_runs, count.index)
}
command {
script_location = "s3://${var.job_tmp_bucket_name}/${element(var.jobs.*.script_object_key, count.index)}"
}
default_arguments = element(var.jobs.*.default_arguments, count.index)
}
最後にGlue Workflows
です
上記で実装してきたGlue Jobs
/Glue Crawlers
/Glue Catalog
をここでWorkflowとして連結しています
aws_glue_trigger.event
の設定ですが、構築イメージで示した通りEventBridgeによるWorkflowのトリガーを想定しているので、type
をEVENTにしています
「EventBridgeによるWorkflowのトリガー」の実装例は、Terraform公式ドキュメントにも記載がないので要注意です
以下の引用/キャプチャのように、Terraform公式ドキュメントのtype
の説明にEVENTの記載がないのですが、EventBridgeによるGlue Workflowのトリガーの際はtype
にEVENTを設定する必要があります
(Required) The type of trigger. Valid values are CONDITIONAL, ON_DEMAND, and SCHEDULED.
resource "aws_glue_workflow" "main" {
name = var.workflow.name
description = var.workflow.description
}
resource "aws_glue_trigger" "event" {
workflow_name = aws_glue_workflow.main.name
name = var.event_trigger.name
description = var.event_trigger.description
type = "EVENT"
event_batching_condition {
batch_size = 1
batch_window = 900
}
actions {
crawler_name = aws_glue_crawler.main.name
}
}
resource "aws_glue_trigger" "crawler" {
workflow_name = aws_glue_workflow.main.name
name = var.crawler_trigger.name
description = var.crawler_trigger.description
type = "CONDITIONAL"
predicate {
logical = "ANY"
conditions {
crawler_name = aws_glue_crawler.main.name
crawl_state = "SUCCEEDED"
logical_operator = "EQUALS"
}
}
dynamic "actions" {
for_each = aws_glue_job.main
content {
job_name = actions.value.id
}
}
}
一応variables.tf
も載せときます
Github Repositoryに全てのコードが載っているので、詳しくはこちらを参照ください
variable "workflow" {
description = "Workflow"
type = object({
name = string
description = string
})
}
variable "event_trigger" {
description = "Event Trigger"
type = object({
name = string
description = string
})
}
variable "crawler_trigger" {
description = "Crawler Trigger"
type = object({
name = string
description = string
})
}
variable "catalog" {
description = "Catalog"
type = object({
database_name = string
database_description = string
tables = list(object({
name = string
description = string
table_type = optional(string, "EXTERNAL_TABLE")
storage_location = string
input_format = optional(string, "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat")
output_format = optional(string, "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat")
ser_de_name = string
serialization_library = optional(string, "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")
}))
})
}
variable "crawler" {
description = "Crawler"
type = object({
name = string
description = string
role_arn = string
catalog_target_tables = list(string)
})
}
variable "jobs" {
description = "Jobs"
type = list(object({
name = string
description = string
glue_version = optional(string, "3.0")
execution_class = optional(string, "STANDARD")
max_retries = optional(number, 3)
timeout = optional(number, 2880)
worker_type = optional(string, "G.1X")
number_of_workers = optional(number, 10)
max_concurrent_runs = optional(number, 1)
script_object_key = string
default_arguments = map(string)
}))
}
variable "job_role_arn" {
description = "Job Role ARN"
type = string
}
variable "job_tmp_bucket_name" {
description = "Job Temporary Bucket Name"
type = string
}
最後に
TerraformでGlue Workflows
を実装している例がほとんど世になかったので、今回の記事ではS3 To S3のETLパイプラインの実装を公開してみました
AWS Glueは、年々サービスとしての守備範囲が広がるなど注目度が増してきているので、今後も継続的に ウォッチしていきたいと思います!
参考
Discussion