🎼

CloudComposer始めました

2024/07/01に公開

WEDでデータエンジニアをしているthimi0412こと清水です。
WEDでは現在GKE上にAirflowの環境を構築し、Embulk使用してアプリケーションで使用しているCloudSQLから分析用のBigQueryにデータを転送しています。そして、自前のGEK上のAirflowからCloudComposerへの移行を現在行っている最中です。
今回はCloudComposerの作成と開発運用について紹介します。

なぜCloudComposer

CloudComposerはv1とv2があり今回は新しいv2で作りました。(Airflowが2系なので使ってみたい&GKEがAutopilotモード等の理由)
GCPのリソースはTerraformで管理してるのでこんな感じでかけます。
ハマった点としてはGoogleのproviderのバージョンが古くて各設定値が使えないことがあったので、バージョンはなるべく新しいものを使うことをお勧めします。
terraform apply するとCloudComposerが作成されますが、10分以上待たされるので気長に待ちましょう。

resource "google_composer_environment" "analytics" {
  name    = "analytics"
  project = var.project_id
  region  = var.default_region

  config {
    software_config {
      # 2022/05/25: latest version
      image_version = "composer-2.0.13-airflow-2.2.5"

      pypi_packages = {
        boto3                           = ""
        apache-airflow-providers-amazon = ""
      }
    }
    node_config {
      service_account = var.composer_sa_email
    }
  }

https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/composer_environment

作成が完了するとGCPのコンソールで見るとこんな感じ。

AirflowウェブサーバーからAirflowの管理画面にいけます。

DAGの管理

DAGはAirflowで使われるタスクの依存関係を整理して、どのように実行するか定義されているものです。
Pythonファイルで定義と処理を記述できます。
こんな感じで。

from datetime import timedelta

import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

default_args = {
    "start_date": airflow.utils.dates.days_ago(0),
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

def test(**context):
    print("Hello World")

with DAG(
    "example_dag",
    default_args=default_args,
    description="test dag",
    schedule_interval=None,
    dagrun_timeout=timedelta(minutes=20),
) as dag:
    task1 = PythonOperator(
        task_id="test",
        python_callable=test,
        provide_context=True,
        dag=dag,
    )

CloudComposerではGCS上に dags/ 配下にファイルを配置すると読み取りを行ってくれて、DAGの追加更新が行えます。
GCS上にファイルをアップロードするだけなのでCIも簡単です。
WEDではGitHub Actionsを使用していて、Workload Identityを使用してGCSのファイルをアップロードしています。
GitHubのリポジトリはこのような構成になってます。

.
├── README.md
├── dags
│   └── exapmle
│       └── dag.py
├── poetry.lock
└── pyproject.toml

staging環境とproduction環境それぞれ環境を作成していてstaging環境はPR作成時、production環境はmain branchにpushされたらGCSにファイルをアップロードするようになっています。

name: production gcs upload

on:
  push:
    branches:
      - "main"
    paths:
      - "**.py"
      - "pyproject.toml"
      - ".github/workflows/production_gcs_upload.yml"

jobs:
  upload:
    name: upload
    runs-on: ubuntu-latest
    permissions:
      contents: read
      id-token: write

    steps:
      - name: Check out code
        uses: actions/checkout@v3

      - id: auth
        uses: google-github-actions/auth@v0
        with:
          workload_identity_provider: ${{ secrets.WORKLOAD_IDENTITY_PROVIDER }}
          service_account: "<service_account>"
      - name: "Set up Cloud SDK"
        uses: "google-github-actions/setup-gcloud@v0"

      - id: upload-folder
        uses: google-github-actions/upload-cloud-storage@v0
        with:
          path: ./dags/
          destination: "<gcs_path>"

https://cloud.google.com/blog/ja/products/identity-security/enabling-keyless-authentication-from-github-actions

終わりに

まだデータ転送は現在移行中ですがデータ連携等のタスクは本番運用しています。データ転送の移行完了や運用面で気になったことやハマりどころなどあればまた記事を書こうと思っています。

WED Engineering Blog

Discussion