🧀

AWS Glue WorkflowsをTerraformで構築する

2022/10/14に公開

背景

ゴリゴリ系エンジニアの生みの親、pageoです。
AWSのサーバレスETLパイプライン管理サービスであるGlue Workflowsに関する日本語の情報が少なかったので、記事にしてまとめてみました。
ただサービスの特徴を説明しても面白くないので、Terraformを用いて各Glueサービス(Jobs/Crawlersなど)を一通り実装し、Terraform Registryで公開しています
https://registry.terraform.io/modules/page-o/glue-workflows/aws/latest

Github Repositoryは以下です(スター頂けるとpageo喜びます)
https://github.com/page-o/terraform-aws-glue-workflows

Terraform Registryとはなんぞや?」「Terraform Registryのmoduleでどうやって公開するんや?」という方は、以下の記事で詳しく説明しているのでそちらを参照ください。(ハート頂けるとpageo喜びます)
https://zenn.dev/pageo/articles/9b492bf8d19756

Glueとは

Glueをざっくり説明

Glueをざっくり一言で表すと「サーバーレスなデータ統合サービス」です。
"サーバレスなETLサービス"と表現しても良いのですが、Glue Data CatalogなどETL以外にもサービスとしての守備範囲が広いので、"サーバーレスなデータ統合サービス"とさせていただきました。

Glue Workflowsとは

各Glueサービス(Jobs/Crawlers/Triggersなど)を使用した一連の複雑なETLジョブを管理/可視化できるサービスです。

以下はGlue Workflowの可視化イメージです

参照: 公式サイト

Glue 各種サービスについて

  • Glue Triggers
    • Glue JobsGlue Crawlersを開始するサービスです
    • トリガーの種類にはSCHEDULED, ON_DEMAND, CONDITIONAL, EVENTがあります
  • Glue Jobs
    • ETLジョブを実行するサービスです
    • ジョブコマンドの種類にはglueetl, pythonshell, gluestreamingがあります
  • Glue Crawlers
    • Glue Catalogにメタデータを書き込んでくれるサービスです
    • データソースとしてS3やDynamoDBなどさまざまなデータストアをクロールできます
  • Glue Data Catalog
    • Databases
      • Tablesを管理するサービスです
    • Tables
      • Glue Jobsのデータソース or ターゲットとして使用可能な、データストアのメタデータを管理するサービスです
      • 手動での管理や、Glue Crawlersを使った自動管理も可能です
  • その他(この記事での実装対象外)
    • Glue Blueprints
      • Glue Workflowsを簡単に作成/共有できるサービスです
      • (使い道がわからん、、、)
    • Glue Studio
      • ETL ジョブを簡単に作成/実行/監視できるビジュアルインターフェースです
      • (Terraformで本格的にIaC化する前の試験実装としてよく使用しています)
    • Glue Databrew
      • Glue Studioを非エンジニア向けにしたノンコードなビジュアルデータ処理サービス
      • Databrew自体がかなりゴツめのサービス(Glue本体の管理コンソールページと切り離されている)なので、使用するまでに必要な学習量が多い気がします
      • (いつか使いたい)

ここまでざっくりとAWS Glueについて説明してきましたが、さらに詳細な説明が欲しい方は公式サイトやClassmethodさんの記事に丁寧に整理されているので、そちらを参考いただければと思います。

前提

構築内容

それでは本題です
この記事で作成するAWS Resourceは以下です

  • Glue Workflows
  • Glue Triggers
  • Glue Jobs
  • Glue Clawlers
  • Glue Data Catalog

イメージ

構築イメージは以下です

想定するETL Workflowは以下です

  1. EventBridgeによりスケジューリングされた時間でGlue Workflowを起動
  2. EventBridgeと連携されたGlue TriggersからGlue Crawlersを起動
  3. Glue CrawlersでS3 Bucket(Data Source)からGlue Data Catalogを更新
  4. Glue Data Catalogの更新完了をトリガーに、Glue Jobsを起動
  5. Glue JobsでS3 Bucket(Data Source)からS3 Bucket(Data Target)に結果を出力

構成

ディレクトリ構成は以下になります。

+- terraform-aws-glue-workflows/
    |
    +---- examples/
    |       +---- sample/
    |              +---- main.tf
    |              +---- provider.tf
    |              +---- locals.tf
    |              +---- README.md
    +---- .gitignore
    +---- catalog.tf
    +---- crawler.tf
    +---- job.tf
    +---- workflow.tf
    +---- variables.tf
    +---- outputs.tf
    +---- README.md
    +---- LICENSE

使い方

Github/Terraform RegistryにExamplesを載せているので詳細はそちらを参照ください

以下はmoduleの使用例です

sample.tf
module "glue_workflows" {
  source = "page-o/glue-workflows/aws"

  workflow = {
    name        = "${local.env}-${local.project}-glue-workflow"
    description = "Sample"
  }
  event_trigger = {
    name        = "${local.env}-${local.project}-event-trigger"
    description = "Sample"
  }
  crawler_trigger = {
    name        = "${local.env}-${local.project}-crawler-trigger"
    description = "Sample"
  }
  catalog = {
    database_name        = "${local.env}-${local.project}-database"
    database_description = "Sample"
    tables = [
      {
        name             = "hoges"
        description      = "Sample"
        storage_location = "s3://${module.source_bucket.name}/hoges/"
        ser_de_name      = "hoges"
      },
      {
        name             = "fugas"
        description      = "Sample"
        storage_location = "s3://${module.source_bucket.name}/fugas/"
        ser_de_name      = "fugas"
      }
    ]
  }
  crawler = {
    name                  = "${local.env}-${local.project}-crawler"
    description           = "Sample"
    role_arn              = module.workflow_role.arn
    catalog_target_tables = ["hoges", "fugas"]
  }
  jobs = [
    {
      name              = "${local.env}-${local.project}-hoges-job"
      description       = "Sample"
      script_object_key = local.job_script_object_key.hoge
      default_arguments = local.default_arguments
    },
    {
      name              = "${local.env}-${local.project}-fugas-job"
      description       = "Sample"
      script_object_key = local.job_script_object_key.fuga
      default_arguments = local.default_arguments
    }
  ]
  job_role_arn        = module.workflow_role.arn
  job_tmp_bucket_name = module.scripts_bucket.name
}

実装

以下に書くResourceの実装について説明していきます。

基本的に、forを用いてコードの使い回しわ避けるなどDRY原則に基づいて実装しています
TerraformにおけるDRY原則の実践については、以下の記事で詳しく説明しているので参照ください。(ハート頂けるとpageo喜びます)
https://zenn.dev/pageo/articles/d90d89e2168061

まずGlue Data Catalogですが、単純にDatabaseTableのResourceを定義しているだけです
データソースがS3であることを前提として実装していますが、データソースによってはstorage_descriptorパラメータの書き方が若干変わると思われます

catalog.tf
resource "aws_glue_catalog_database" "main" {
  name        = var.catalog.database_name
  description = var.catalog.database_description
}

resource "aws_glue_catalog_table" "main" {
  count = length(var.catalog.tables)

  database_name = aws_glue_catalog_database.main.name
  name          = element(var.catalog.tables.*.name, count.index)
  description   = element(var.catalog.tables.*.description, count.index)
  table_type    = element(var.catalog.tables.*.table_type, count.index)

  storage_descriptor {
    location      = element(var.catalog.tables.*.storage_location, count.index)
    input_format  = element(var.catalog.tables.*.input_format, count.index)
    output_format = element(var.catalog.tables.*.output_format, count.index)

    ser_de_info {
      name                  = element(var.catalog.tables.*.ser_de_name, count.index)
      serialization_library = element(var.catalog.tables.*.serialization_library, count.index)

      parameters = {
        "serialization.format" = 1
      }
    }
  }

  lifecycle {
    ignore_changes = [parameters, storage_descriptor, owner]
  }
}

Glue Crawlerも同じく単純な定義です
ここも、データソースがGlue Catalogであることを前提として実装しているので、他のデータソースを使用する場合はcatalog_targetパラメータをs3_target/dynamodb_target/jdbc_target/mongodb_targetなどに変更する必要があります
その辺りはTerraform公式ドキュメントを参考に実装してください

crawler.tf
resource "aws_glue_crawler" "main" {
  database_name = aws_glue_catalog_database.main.name
  name          = var.crawler.name
  description   = var.crawler.description
  role          = var.crawler.role_arn

  catalog_target {
    database_name = aws_glue_catalog_database.main.name
    tables        = var.crawler.catalog_target_tables
  }

  schema_change_policy {
    delete_behavior = "LOG"
    update_behavior = "UPDATE_IN_DATABASE"
  }

  lifecycle {
    ignore_changes = [configuration]
  }
}

続いてGlue Jobsです
ここも、Glue JobsのTypeがglueetlであることを前提に実装しています
他のType(pythonshell/gluestreaming)の場合にはmax_capacity/timeout/commandパラメータを修正する必要があります
Terraform公式ドキュメントを参考に実装しましょう

job.tf
resource "aws_glue_job" "main" {
  count = length(var.jobs)

  name              = element(var.jobs.*.name, count.index)
  description       = element(var.jobs.*.description, count.index)
  glue_version      = element(var.jobs.*.glue_version, count.index)
  execution_class   = element(var.jobs.*.execution_class, count.index)
  max_retries       = element(var.jobs.*.max_retries, count.index)
  timeout           = element(var.jobs.*.timeout, count.index)
  worker_type       = element(var.jobs.*.worker_type, count.index)
  number_of_workers = element(var.jobs.*.number_of_workers, count.index)

  role_arn = var.job_role_arn

  execution_property {
    max_concurrent_runs = element(var.jobs.*.max_concurrent_runs, count.index)
  }

  command {
    script_location = "s3://${var.job_tmp_bucket_name}/${element(var.jobs.*.script_object_key, count.index)}"
  }

  default_arguments = element(var.jobs.*.default_arguments, count.index)
}

最後にGlue Workflowsです
上記で実装してきたGlue Jobs/Glue Crawlers/Glue CatalogをここでWorkflowとして連結しています
aws_glue_trigger.eventの設定ですが、構築イメージで示した通りEventBridgeによるWorkflowのトリガーを想定しているので、typeEVENTにしています
EventBridgeによるWorkflowのトリガー」の実装例は、Terraform公式ドキュメントにも記載がないので要注意です
以下の引用/キャプチャのように、Terraform公式ドキュメントのtypeの説明にEVENTの記載がないのですが、EventBridgeによるGlue Workflowのトリガーの際はtypeEVENTを設定する必要があります

(Required) The type of trigger. Valid values are CONDITIONAL, ON_DEMAND, and SCHEDULED.

workflow.tf
resource "aws_glue_workflow" "main" {
  name        = var.workflow.name
  description = var.workflow.description
}

resource "aws_glue_trigger" "event" {
  workflow_name = aws_glue_workflow.main.name
  name          = var.event_trigger.name
  description   = var.event_trigger.description
  type          = "EVENT"

  event_batching_condition {
    batch_size   = 1
    batch_window = 900
  }

  actions {
    crawler_name = aws_glue_crawler.main.name
  }
}

resource "aws_glue_trigger" "crawler" {
  workflow_name = aws_glue_workflow.main.name
  name          = var.crawler_trigger.name
  description   = var.crawler_trigger.description
  type          = "CONDITIONAL"

  predicate {
    logical = "ANY"
    conditions {
      crawler_name     = aws_glue_crawler.main.name
      crawl_state      = "SUCCEEDED"
      logical_operator = "EQUALS"
    }
  }

  dynamic "actions" {
    for_each = aws_glue_job.main

    content {
      job_name = actions.value.id
    }
  }
}

一応variables.tfも載せときます
Github Repositoryに全てのコードが載っているので、詳しくはこちらを参照ください

variables.tf
variable "workflow" {
  description = "Workflow"
  type = object({
    name        = string
    description = string
  })
}

variable "event_trigger" {
  description = "Event Trigger"
  type = object({
    name        = string
    description = string
  })
}

variable "crawler_trigger" {
  description = "Crawler Trigger"
  type = object({
    name        = string
    description = string
  })
}

variable "catalog" {
  description = "Catalog"
  type = object({
    database_name        = string
    database_description = string
    tables = list(object({
      name                  = string
      description           = string
      table_type            = optional(string, "EXTERNAL_TABLE")
      storage_location      = string
      input_format          = optional(string, "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat")
      output_format         = optional(string, "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat")
      ser_de_name           = string
      serialization_library = optional(string, "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")
    }))
  })
}

variable "crawler" {
  description = "Crawler"
  type = object({
    name                  = string
    description           = string
    role_arn              = string
    catalog_target_tables = list(string)
  })
}

variable "jobs" {
  description = "Jobs"
  type = list(object({
    name                = string
    description         = string
    glue_version        = optional(string, "3.0")
    execution_class     = optional(string, "STANDARD")
    max_retries         = optional(number, 3)
    timeout             = optional(number, 2880)
    worker_type         = optional(string, "G.1X")
    number_of_workers   = optional(number, 10)
    max_concurrent_runs = optional(number, 1)
    script_object_key   = string
    default_arguments   = map(string)
  }))
}

variable "job_role_arn" {
  description = "Job Role ARN"
  type        = string
}

variable "job_tmp_bucket_name" {
  description = "Job Temporary Bucket Name"
  type        = string
}

最後に

TerraformでGlue Workflowsを実装している例がほとんど世になかったので、今回の記事ではS3 To S3のETLパイプラインの実装を公開してみました
AWS Glueは、年々サービスとしての守備範囲が広がるなど注目度が増してきているので、今後も継続的に ウォッチしていきたいと思います!
https://github.com/page-o/terraform-aws-glue-workflows

参考

https://zenn.dev/hisamitsu/articles/6c233d6d0f0817
https://dev.classmethod.jp/articles/relay_looking_back_aws_glue/
https://blog.serverworks.co.jp/run-glue-workflow-when-the-files-is-reached

Discussion