Airflowとdbtを使ったパイプライン構築

2022/07/20に公開

順を追って作業したので、備忘程度に記載する。私自身のキャッチアップも兼ねて

  • Dockerによる環境の準備
  • dbtの設定・構築
  • Airflow側の設定・構築
    • dagでの記載例 (trocco => dbt)

環境の準備

Dockerを使ってAirflowを用意します。早く環境を立ち上げたいので、dbt等も一緒にイメージに入れてしまいます。

docker-compose

ちょっと古いですが、2.1.0のversionを使います(composeのコードも短かったので)

curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.1.0/docker-compose.yaml'

一部書き換えます。書き換えている内容としては

  • Dockerfileからイメージを作成するように変更
  • 編集しやすいようにvolumeに構成したフォルダをマウントさせます
  • DWHにはBigQueryを使うので、GCPのサービスアカウントをパスを設定しています
docker-compose.yaml

~~~~~~~ 省略 ~~~~~~~
---
version: '3'

x-airflow-common:
  &airflow-common
  # image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.0}
  
  # dockerfileを見るように変更
  build:
    context: .
    dockerfile: Dockerfile
    
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
    AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
    
  # 各種サービス用にフォルダを作成し、マウント
  volumes:
    - ./src/airflow/dags:/opt/airflow/dags
    - ./src/airflow/logs:/opt/airflow/logs
    - ./src/airflow/plugins:/opt/airflow/plugins
    - ./src/dbt:/usr/app/dbt

~~~~~~~ 省略 ~~~~~~~

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - 8080:8080
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    
    #secretsを追加
    secrets:
      - gcp_secret

~~~~~~~ 省略 ~~~~~~~

  airflow-worker:
    <<: *airflow-common
    command: celery worker
    healthcheck:
      test:
        - "CMD-SHELL"
        - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    
  #secretsを追加
    secrets:
      - gcp_secret

~~~~~~~ 省略 ~~~~~~~

  flower:
    <<: *airflow-common
    command: celery flower
    ports:
      - 5555:5555
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    
    #secretsを追加
    secrets:
      - gcp_secret
      
volumes:
  postgres-db-volume:
  
#secretsの配置場所を記載
secrets:
  gcp_secret:
    # GCPのサービスアカウントのjsonファイルを配置し、パスを記載する
    file: ./secrets/xxxx.json

Dockerfile

Dockerfileも準備します。補足としては

  • Airflowユーザーで入るので、一応Userを変えて権限を無理やり付与した
  • dbtとDWHの接続に必要なライブラリは選定が必要(今回はBQとRedshiftを入れた)
  • runやtestを個別にAirflowで呼べる、airflow-dbtも入れてみた
FROM apache/airflow:2.1.0

user root
RUN echo 'airflow ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers

user airflow
RUN sudo apt-get update -y
RUN sudo apt-get install --no-install-recommends -y -q \
    git libpq-dev python3-dev build-essential && \ 
    sudo apt-get clean

RUN pip install --upgrade pip && \
    pip install pybigquery && \
    pip install dbt-core && \
    pip install dbt-bigquery && \
    pip install dbt-redshift && \
    pip install airflow-dbt

RUN curl -sSL https://sdk.cloud.google.com | bash

ENV PATH $PATH:/root/google-cloud-sdk/bin

ENV PYTHONIOENCODING=utf-8
ENV LANG C.UTF-8

フォルダの構成

完成系はこんな感じ、まず、airflowとdbtをmkdirしておく(dockerで自動作成されると思うけど)

.
├── Dockerfile
├── docker-compose.yml
└── src
    ├── airflow(airflowの設定、DAGの配置など)
        ├── dags
        ├── logs
        └── plugins
    └── dbt(dbt projectが配下に格納される)
└── secrets (この配下に直接GCPのサービスアカウントJSONを置いた)

したがって /datapipelineの配下で実行しておくこと

mkdir ./src
mkdir ./src/airflow ./src/dbt

/datapipeline/src/airflowの配下で実行しておくこと

mkdir ./dags ./logs ./plugins
echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env

docker compose

ここまで来たら、下記コマンドを実行してairflowに初期データを準備する

docker compose up airflow-init

公式Docにもある通り下記が出ればOK

airflow-init_1       | Upgrades done
airflow-init_1       | Admin user airflow created
airflow-init_1       | 2.1.0
start_airflow-init_1 exited with code 0

データが入れ終わったので、つづいて本丸をdocker composeする

docker compose up -d

ログ等にエラーがなく、各コンテナが立ち上がり

NAME                               COMMAND                  SERVICE             STATUS              PORTS
datapipeline-airflow-init-1        "/usr/bin/dumb-init …"   airflow-init        exited (0)
datapipeline-airflow-scheduler-1   "/usr/bin/dumb-init …"   airflow-scheduler   running (healthy)   8080/tcp
datapipeline-airflow-webserver-1   "/usr/bin/dumb-init …"   airflow-webserver   running (healthy)   0.0.0.0:8080->8080/tcp
datapipeline-airflow-worker-1      "/usr/bin/dumb-init …"   airflow-worker      running (healthy)   8080/tcp
datapipeline-flower-1              "/usr/bin/dumb-init …"   flower              running (healthy)   0.0.0.0:5555->5555/tcp, 8080/tcp
datapipeline-postgres-1            "docker-entrypoint.s…"   postgres            running (healthy)   5432/tcp
datapipeline-redis-1               "docker-entrypoint.s…"   redis               running (healthy)   0.0.0.0:6379->6379/tcp

http://localhost:8080 にアクセスできればOK(localhostはご自身の環境次第で変更)
Username:airflow
Password:airflow でログイン

一旦環境の準備はOK

dbt

dbtのプロジェクト立てる

ローカルの環境にdbtをインストールしても構わないが面倒なので、コンテナに直接入りそちら側でPJTを作成し稼働確認を行う。

 docker exec -it どれかのコンテナID等 bash

コンテナへアクセス後、マウントしたフォルダに移動しPJを作成
※ etl_pjはPJ名ですので、お好きなように

cd /usr/app/dbt
dbt init etl_pj

実行するとetl_pjフォルダが作成され、中身は下記のような結果となります。

airflow@xxxxxxxxx:/usr/app/dbt/etl_pj$ ls -al
total 4
drwxrwxrwx 1 1000 1000 4096 Jul 18 06:30 .
drwxrwxrwx 1 1000 1000 4096 Jul 20 09:04 ..
-rwxrwxrwx 1 1000 1000  571 Jul 18 05:44 README.md
drwxrwxrwx 1 1000 1000 4096 Jul 18 06:18 analysis
drwxrwxrwx 1 1000 1000 4096 Jul 18 06:18 data
drwxrwxrwx 1 1000 1000 4096 Jul 18 06:20 dbt_modules
-rwxrwxrwx 1 1000 1000 1238 Jul 18 06:28 dbt_project.yml
drwxrwxrwx 1 1000 1000 4096 Jul 18 06:20 logs
drwxrwxrwx 1 1000 1000 4096 Jul 18 06:18 macros
drwxrwxrwx 1 1000 1000 4096 Jul 18 06:39 models
drwxrwxrwx 1 1000 1000 4096 Jul 18 06:18 snapshots
drwxrwxrwx 1 1000 1000 4096 Jul 18 06:35 target
drwxrwxrwx 1 1000 1000 4096 Jul 18 06:18 tests

dbt_project.ymlを更新する

まず、dbt_project.ymlをいじります。プロジェクトで使用するフォルダの役割を決めます。
下記はサンプルです。詳細はDoc

dbt_project.yml
# Name your project! Project names should contain only lowercase characters
# and underscores. A good package name should reflect your organization's
# name or the intended use of these models
name: 'etl_pj'
version: '1.0.0'
config-version: 2

# This setting configures which "profile" dbt uses for this project.
# ここは今回使うDWHを使う
profile: 'bigquery'

# These configurations specify where dbt should look for different types of files.
# The `source-paths` config, for example, states that models in this project can be
# found in the "models/" directory. You probably won't need to change these!
source-paths: ["models"]
analysis-paths: ["analysis"]
test-paths: ["tests"]
data-paths: ["data"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

target-path: "target"  # directory which will store compiled SQL files
clean-targets:         # directories to be removed by `dbt clean`
  - "target"
  - "dbt_modules"


# Configuring models
# Full documentation: https://docs.getdbt.com/docs/configuring-models

# In this example config, we tell dbt to build all models in the example/ directory
# as tables. These settings can be overridden in the individual model files
# using the `{{ config(...) }}` macro.
models:
  etl_pj:
    +materialized: view

profiles.ymlを作成する

dbtで扱う各種接続情報などを管理するファイルとなります。
クレデンシャル情報に当たるので、デフォルトだと ~/.dbt/配置となるが
今任意のコンテナ上で作業しているので、/usr/app/dbtの配下にファイルを作ってしまう。
補足としては、Dockerファイルでsecretsを使用したので、/run/secrets/gcp_secretにはサービスアカウントの情報がマウントされた状態となっている。

profiles.yml
bigquery:
  target: dev
  outputs:
    dev:
      type: bigquery
      method: service-account
      project: GCPのプロジェクト名を記載
      dataset: BQで使用するデータセット名を記載
      threads: 1
      keyfile: /run/secrets/gcp_secret

SQLや schema.yml

先ほど作成したdbtプロジェクト配下にあるmodels配下に、構築したいマートSQLやschemaの設定を行う。具体的なDWHのコードは今回省略する

dbt coreでの稼働確認

クエリやスキーマの設定が完了したら、コマンドによる稼働確認を行ってみる。
コンテナ上にて実行。profileはデフォルト以外の場所に配置したので、ちゃんと場所を指定してあげる

cd /usr/app/dbt/etl_pj
dbt run --profiles-dir /usr/app/dbt

で、結果として下記みたいな内容が見えれば稼働している。dbt testも同じく。

Running with dbt=0.21.1
Found 1 model, 2 tests, 0 snapshots, 0 analyses, 184 macros, 0 operations, 0 seed files, 1 source, 0 exposures

09:41:40 | Concurrency: 1 threads (target='dev')
09:41:40 |
09:41:40 | 1 of 1 START view model GCPのデータセット名.マート名..................... [RUN]
09:41:41 | 1 of 1 OK created view model GCPのデータセット名.マート名................ [OK in 1.04s]
09:41:41 |
09:41:41 | Finished running 1 view model in 2.40s.

Completed successfully

Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1

Airflow

最後にdbtを使ったdagを書いてみる。dagは上記で書いたフォルダ構成の中の
/airflow/dagsの配下にあれば良いので、記載したら格納し自動的に読み込まれるのを待つ。

サンプルまでに一例として、下記のようなワークフローを組んでみた。

1. trocco_wf_kick:外部のAPI(trocco)をキックし、データをBQへ転送(非同期)
2. trocco_wf_check:1.のステータスチェック(poke)
3. dbt run:dbt runの実行
4. dbt test:dbt testの実行

ポイントとしては

  • troccoのAPIキーやジョブのIDは実行時のパラメータとして渡すようにした
  • APIをキックした結果をxcomでpullし、ステータスチェックをsensorで実施
  • API完了を待って、dbtの各モジュールの起動
etl_bigquery.py
import json
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.sensors.http_sensor import HttpSensor
from airflow import AirflowException

from airflow_dbt.operators.dbt_operator import (
  DbtRunOperator,
  DbtTestOperator
)
from airflow.utils.dates import days_ago

default_args = {
  'start_date': days_ago(0),
  'retries': 0,
}

# troccoの実行IDを取得
def get_id_from_xcom(**context):
  jsonvalue = context['ti'].xcom_pull(task_ids='trocco_wf_kick')
  value = json.loads(jsonvalue)
  return value['id']

#キューイングの状態を確認
def is_check_wf_queue(response):
  result = response.json()
  if result["status"] == "queued":
    return True
  else:
    return False

#ジョブのステータス状態を確認
def is_check_wf_status(response):
  result = response.json()
  if result["status"] == "succeeded":
    return True
  else:
    return False

with DAG(dag_id='etl_bigquery', default_args=default_args, schedule_interval=None) as dag:

  trocco_wf_kick = SimpleHttpOperator(
    task_id='trocco_wf_kick',
    http_conn_id='',
    method='POST',
    endpoint="https://trocco.io/api/pipeline_jobs?pipeline_definition_id={{dag_run.conf['trocco_wf_id']}}",
    headers={"Authorization": "Token {{dag_run.conf['trocco_key']}}"},
    log_response=True,
    response_check=is_check_wf_queue
  )

  get_trocco_id = PythonOperator(
      task_id='get_trocco_id',
      python_callable=get_id_from_xcom,
      provide_context=True
  )

  trocco_wf_check = HttpSensor(
    task_id='trocco_wf_check',
    http_conn_id='',
    method='GET',
    poke_interval=20,
    timeout=180,
    endpoint="https://trocco.io/api/pipeline_jobs/{{ ti.xcom_pull(task_ids='get_trocco_id') }}",
    headers={"Authorization": "Token {{dag_run.conf['trocco_key']}}"},
    response_check=is_check_wf_status
  )

  dbt_run = DbtRunOperator(
    task_id='dbt_run',
    profiles_dir='/usr/app/dbt',
    dir='/usr/app/dbt/etl_pj'
  )

  dbt_test = DbtTestOperator(
    task_id='dbt_test',
    retries=0,
    profiles_dir='/usr/app/dbt',
    dir='/usr/app/dbt/etl_pj'
  )

  trocco_wf_kick >> get_trocco_id >> trocco_wf_check >> dbt_run >> dbt_test

で実行した結果としても all greenとなり、下記まで完了できた。

  • 想定したデータが転送される
  • 転送されたテーブルでviewが作成されている
  • 作成されたviewのデータチェックが完了している


総括

  • データパイプラインの構築において、かゆいところにも手が届きそう
  • 各サービスの機能がまたがってくる部分も多いので、役割を明確にすることが重要
  • ただし、やはりキャッチアップはそれなりに継続してやっていく必要あり

宣伝

データ分析基盤の総合支援サービスです。
https://trocco.io/lp/index.html

Discussion