Airflowとdbtを使ったパイプライン構築
順を追って作業したので、備忘程度に記載する。私自身のキャッチアップも兼ねて
- Dockerによる環境の準備
- dbtの設定・構築
- Airflow側の設定・構築
- dagでの記載例 (trocco => dbt)
環境の準備
Dockerを使ってAirflowを用意します。早く環境を立ち上げたいので、dbt等も一緒にイメージに入れてしまいます。
docker-compose
ちょっと古いですが、2.1.0のversionを使います(composeのコードも短かったので)
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.1.0/docker-compose.yaml'
一部書き換えます。書き換えている内容としては
- Dockerfileからイメージを作成するように変更
- 編集しやすいようにvolumeに構成したフォルダをマウントさせます
- DWHにはBigQueryを使うので、GCPのサービスアカウントをパスを設定しています
~~~~~~~ 省略 ~~~~~~~
---
version: '3'
x-airflow-common:
&airflow-common
# image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.0}
# dockerfileを見るように変更
build:
context: .
dockerfile: Dockerfile
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
# 各種サービス用にフォルダを作成し、マウント
volumes:
- ./src/airflow/dags:/opt/airflow/dags
- ./src/airflow/logs:/opt/airflow/logs
- ./src/airflow/plugins:/opt/airflow/plugins
- ./src/dbt:/usr/app/dbt
~~~~~~~ 省略 ~~~~~~~
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- 8080:8080
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always
#secretsを追加
secrets:
- gcp_secret
~~~~~~~ 省略 ~~~~~~~
airflow-worker:
<<: *airflow-common
command: celery worker
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 10s
timeout: 10s
retries: 5
restart: always
#secretsを追加
secrets:
- gcp_secret
~~~~~~~ 省略 ~~~~~~~
flower:
<<: *airflow-common
command: celery flower
ports:
- 5555:5555
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
interval: 10s
timeout: 10s
retries: 5
restart: always
#secretsを追加
secrets:
- gcp_secret
volumes:
postgres-db-volume:
#secretsの配置場所を記載
secrets:
gcp_secret:
# GCPのサービスアカウントのjsonファイルを配置し、パスを記載する
file: ./secrets/xxxx.json
Dockerfile
Dockerfileも準備します。補足としては
- Airflowユーザーで入るので、一応Userを変えて権限を無理やり付与した
- dbtとDWHの接続に必要なライブラリは選定が必要(今回はBQとRedshiftを入れた)
- runやtestを個別にAirflowで呼べる、airflow-dbtも入れてみた
FROM apache/airflow:2.1.0
user root
RUN echo 'airflow ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers
user airflow
RUN sudo apt-get update -y
RUN sudo apt-get install --no-install-recommends -y -q \
git libpq-dev python3-dev build-essential && \
sudo apt-get clean
RUN pip install --upgrade pip && \
pip install pybigquery && \
pip install dbt-core && \
pip install dbt-bigquery && \
pip install dbt-redshift && \
pip install airflow-dbt
RUN curl -sSL https://sdk.cloud.google.com | bash
ENV PATH $PATH:/root/google-cloud-sdk/bin
ENV PYTHONIOENCODING=utf-8
ENV LANG C.UTF-8
フォルダの構成
完成系はこんな感じ、まず、airflowとdbtをmkdir
しておく(dockerで自動作成されると思うけど)
.
├── Dockerfile
├── docker-compose.yml
└── src
├── airflow(airflowの設定、DAGの配置など)
├── dags
├── logs
└── plugins
└── dbt(dbt projectが配下に格納される)
└── secrets (この配下に直接GCPのサービスアカウントJSONを置いた)
したがって /datapipeline
の配下で実行しておくこと
mkdir ./src
mkdir ./src/airflow ./src/dbt
/datapipeline/src/airflow
の配下で実行しておくこと
mkdir ./dags ./logs ./plugins
echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env
docker compose
ここまで来たら、下記コマンドを実行してairflowに初期データを準備する
docker compose up airflow-init
公式Docにもある通り下記が出ればOK
airflow-init_1 | Upgrades done
airflow-init_1 | Admin user airflow created
airflow-init_1 | 2.1.0
start_airflow-init_1 exited with code 0
データが入れ終わったので、つづいて本丸をdocker composeする
docker compose up -d
ログ等にエラーがなく、各コンテナが立ち上がり
NAME COMMAND SERVICE STATUS PORTS
datapipeline-airflow-init-1 "/usr/bin/dumb-init …" airflow-init exited (0)
datapipeline-airflow-scheduler-1 "/usr/bin/dumb-init …" airflow-scheduler running (healthy) 8080/tcp
datapipeline-airflow-webserver-1 "/usr/bin/dumb-init …" airflow-webserver running (healthy) 0.0.0.0:8080->8080/tcp
datapipeline-airflow-worker-1 "/usr/bin/dumb-init …" airflow-worker running (healthy) 8080/tcp
datapipeline-flower-1 "/usr/bin/dumb-init …" flower running (healthy) 0.0.0.0:5555->5555/tcp, 8080/tcp
datapipeline-postgres-1 "docker-entrypoint.s…" postgres running (healthy) 5432/tcp
datapipeline-redis-1 "docker-entrypoint.s…" redis running (healthy) 0.0.0.0:6379->6379/tcp
http://localhost:8080 にアクセスできればOK(localhostはご自身の環境次第で変更)
Username:airflow
Password:airflow
でログイン
一旦環境の準備はOK
dbt
dbtのプロジェクト立てる
ローカルの環境にdbtをインストールしても構わないが面倒なので、コンテナに直接入りそちら側でPJTを作成し稼働確認を行う。
docker exec -it どれかのコンテナID等 bash
コンテナへアクセス後、マウントしたフォルダに移動しPJを作成
※ etl_pjはPJ名ですので、お好きなように
cd /usr/app/dbt
dbt init etl_pj
実行するとetl_pjフォルダが作成され、中身は下記のような結果となります。
airflow@xxxxxxxxx:/usr/app/dbt/etl_pj$ ls -al
total 4
drwxrwxrwx 1 1000 1000 4096 Jul 18 06:30 .
drwxrwxrwx 1 1000 1000 4096 Jul 20 09:04 ..
-rwxrwxrwx 1 1000 1000 571 Jul 18 05:44 README.md
drwxrwxrwx 1 1000 1000 4096 Jul 18 06:18 analysis
drwxrwxrwx 1 1000 1000 4096 Jul 18 06:18 data
drwxrwxrwx 1 1000 1000 4096 Jul 18 06:20 dbt_modules
-rwxrwxrwx 1 1000 1000 1238 Jul 18 06:28 dbt_project.yml
drwxrwxrwx 1 1000 1000 4096 Jul 18 06:20 logs
drwxrwxrwx 1 1000 1000 4096 Jul 18 06:18 macros
drwxrwxrwx 1 1000 1000 4096 Jul 18 06:39 models
drwxrwxrwx 1 1000 1000 4096 Jul 18 06:18 snapshots
drwxrwxrwx 1 1000 1000 4096 Jul 18 06:35 target
drwxrwxrwx 1 1000 1000 4096 Jul 18 06:18 tests
dbt_project.ymlを更新する
まず、dbt_project.yml
をいじります。プロジェクトで使用するフォルダの役割を決めます。
下記はサンプルです。詳細はDocへ
# Name your project! Project names should contain only lowercase characters
# and underscores. A good package name should reflect your organization's
# name or the intended use of these models
name: 'etl_pj'
version: '1.0.0'
config-version: 2
# This setting configures which "profile" dbt uses for this project.
# ここは今回使うDWHを使う
profile: 'bigquery'
# These configurations specify where dbt should look for different types of files.
# The `source-paths` config, for example, states that models in this project can be
# found in the "models/" directory. You probably won't need to change these!
source-paths: ["models"]
analysis-paths: ["analysis"]
test-paths: ["tests"]
data-paths: ["data"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
target-path: "target" # directory which will store compiled SQL files
clean-targets: # directories to be removed by `dbt clean`
- "target"
- "dbt_modules"
# Configuring models
# Full documentation: https://docs.getdbt.com/docs/configuring-models
# In this example config, we tell dbt to build all models in the example/ directory
# as tables. These settings can be overridden in the individual model files
# using the `{{ config(...) }}` macro.
models:
etl_pj:
+materialized: view
profiles.ymlを作成する
dbtで扱う各種接続情報などを管理するファイルとなります。
クレデンシャル情報に当たるので、デフォルトだと ~/.dbt/
配置となるが
今任意のコンテナ上で作業しているので、/usr/app/dbt
の配下にファイルを作ってしまう。
補足としては、Dockerファイルでsecretsを使用したので、/run/secrets/gcp_secret
にはサービスアカウントの情報がマウントされた状態となっている。
bigquery:
target: dev
outputs:
dev:
type: bigquery
method: service-account
project: GCPのプロジェクト名を記載
dataset: BQで使用するデータセット名を記載
threads: 1
keyfile: /run/secrets/gcp_secret
SQLや schema.yml
先ほど作成したdbtプロジェクト配下にあるmodels
配下に、構築したいマートSQLやschemaの設定を行う。具体的なDWHのコードは今回省略する
dbt coreでの稼働確認
クエリやスキーマの設定が完了したら、コマンドによる稼働確認を行ってみる。
コンテナ上にて実行。profileはデフォルト以外の場所に配置したので、ちゃんと場所を指定してあげる
cd /usr/app/dbt/etl_pj
dbt run --profiles-dir /usr/app/dbt
で、結果として下記みたいな内容が見えれば稼働している。dbt testも同じく。
Running with dbt=0.21.1
Found 1 model, 2 tests, 0 snapshots, 0 analyses, 184 macros, 0 operations, 0 seed files, 1 source, 0 exposures
09:41:40 | Concurrency: 1 threads (target='dev')
09:41:40 |
09:41:40 | 1 of 1 START view model GCPのデータセット名.マート名..................... [RUN]
09:41:41 | 1 of 1 OK created view model GCPのデータセット名.マート名................ [OK in 1.04s]
09:41:41 |
09:41:41 | Finished running 1 view model in 2.40s.
Completed successfully
Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
Airflow
最後にdbtを使ったdagを書いてみる。dagは上記で書いたフォルダ構成の中の
/airflow/dags
の配下にあれば良いので、記載したら格納し自動的に読み込まれるのを待つ。
サンプルまでに一例として、下記のようなワークフローを組んでみた。
1. trocco_wf_kick:外部のAPI(trocco)をキックし、データをBQへ転送(非同期)
2. trocco_wf_check:1.のステータスチェック(poke)
3. dbt run:dbt runの実行
4. dbt test:dbt testの実行
ポイントとしては
- troccoのAPIキーやジョブのIDは実行時のパラメータとして渡すようにした
- APIをキックした結果をxcomでpullし、ステータスチェックをsensorで実施
- API完了を待って、dbtの各モジュールの起動
import json
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.sensors.http_sensor import HttpSensor
from airflow import AirflowException
from airflow_dbt.operators.dbt_operator import (
DbtRunOperator,
DbtTestOperator
)
from airflow.utils.dates import days_ago
default_args = {
'start_date': days_ago(0),
'retries': 0,
}
# troccoの実行IDを取得
def get_id_from_xcom(**context):
jsonvalue = context['ti'].xcom_pull(task_ids='trocco_wf_kick')
value = json.loads(jsonvalue)
return value['id']
#キューイングの状態を確認
def is_check_wf_queue(response):
result = response.json()
if result["status"] == "queued":
return True
else:
return False
#ジョブのステータス状態を確認
def is_check_wf_status(response):
result = response.json()
if result["status"] == "succeeded":
return True
else:
return False
with DAG(dag_id='etl_bigquery', default_args=default_args, schedule_interval=None) as dag:
trocco_wf_kick = SimpleHttpOperator(
task_id='trocco_wf_kick',
http_conn_id='',
method='POST',
endpoint="https://trocco.io/api/pipeline_jobs?pipeline_definition_id={{dag_run.conf['trocco_wf_id']}}",
headers={"Authorization": "Token {{dag_run.conf['trocco_key']}}"},
log_response=True,
response_check=is_check_wf_queue
)
get_trocco_id = PythonOperator(
task_id='get_trocco_id',
python_callable=get_id_from_xcom,
provide_context=True
)
trocco_wf_check = HttpSensor(
task_id='trocco_wf_check',
http_conn_id='',
method='GET',
poke_interval=20,
timeout=180,
endpoint="https://trocco.io/api/pipeline_jobs/{{ ti.xcom_pull(task_ids='get_trocco_id') }}",
headers={"Authorization": "Token {{dag_run.conf['trocco_key']}}"},
response_check=is_check_wf_status
)
dbt_run = DbtRunOperator(
task_id='dbt_run',
profiles_dir='/usr/app/dbt',
dir='/usr/app/dbt/etl_pj'
)
dbt_test = DbtTestOperator(
task_id='dbt_test',
retries=0,
profiles_dir='/usr/app/dbt',
dir='/usr/app/dbt/etl_pj'
)
trocco_wf_kick >> get_trocco_id >> trocco_wf_check >> dbt_run >> dbt_test
で実行した結果としても all greenとなり、下記まで完了できた。
- 想定したデータが転送される
- 転送されたテーブルでviewが作成されている
- 作成されたviewのデータチェックが完了している
総括
- データパイプラインの構築において、かゆいところにも手が届きそう
- 各サービスの機能がまたがってくる部分も多いので、役割を明確にすることが重要
- ただし、やはりキャッチアップはそれなりに継続してやっていく必要あり
宣伝
データ分析基盤の総合支援サービスです。
Discussion