👏

Astronomer-cosmos環境構築からdbt実行までサクッと試す

2024/01/26に公開

突貫でcosmosの環境を構築してdbt実行のパイプラインを試してみました。
あんまりサクッと行きませんでした。。。
一応あとで色々追記予定です。時間が無く最低限のメモ書きです。

参考にさせていただいた記事

https://docs.astronomer.io/learn/category/get-started
https://medium.com/snowflake/using-astronomers-new-cosmos-to-deploy-dbt-pipelines-onto-snowflake-2e211e84690

cosmos環境構築

環境

Windows11のWSL2上に構築しました。
コンテナの構築方法は割愛します。

Astronomer CLIのインストール

これはWSL上で実行します。

$ curl -sSL install.astronomer.io | sudo bash -s

cosmosのDockerのセットアップ

これはWSL上で実行します。

$ astro dev init

わたしは作業用ディレクトリとしてcosmosを作成し、上記コマンドの実行結果として以下のようになります。

$ ls cosmos/
Dockerfile  README.md  airflow_settings.yaml  dags  include  packages.txt  plugins  requirements.txt  tests

Dockerfileとrequirements.txtの修正

Dockerfileに以下を追記します。

Dockerfile
RUN python3 -m venv cosmos_venv && source cosmos_venv/bin/activate && pip install --no-cache-dir dbt-snowflake && deactivate

requirements.txtに以下を追記します。

requirements.txt
astronomer-cosmos
apache-airflow-providers-snowflake

dbtプロジェクトの用意

いつものごとくJaffle Shopを利用させていただきました。
dbtプロジェクトの配置場所は作業ディレクトリcosmosの直下のdagsとします。
後ほどAirflow側のConnections設定でSnowflakeとの接続設定を行うため、profiles.ymlの作成は不要です。

$ ls cosmos/dags/jaffle_shop/
LICENSE  README.md  dbt_project.yml  etc  logs  models  seeds

dbt実行用DAGの作成

cosmos/dags配下にmy_cosmos_dag.pyとしてDAGを作成します。

from datetime import datetime
import os
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig
from cosmos.profiles import PostgresUserPasswordProfileMapping
from cosmos.profiles import SnowflakeUserPasswordProfileMapping


profile_config = ProfileConfig(profile_name="default",
                               target_name="dev",
                               profile_mapping=SnowflakeUserPasswordProfileMapping(conn_id="snowflake_conn", 
                                                    profile_args={
                                                        "database": "COSMOS",
                                                        "schema": "public"
                                                        },
                                                    ))


dbt_snowflake_dag = DbtDag(project_config=ProjectConfig("/usr/local/airflow/dags/jaffle_shop/",),
                    operator_args={"install_deps": True},
                    profile_config=profile_config,
                    execution_config=ExecutionConfig(dbt_executable_path=f"{os.environ['AIRFLOW_HOME']}/cosmos_venv/bin/dbt",),
                    schedule_interval="@daily",
                    start_date=datetime(2024, 1, 26),
                    catchup=False,
                    dag_id="dbt_snowflake_dag",)

comosコンテナの起動

Dockerfileが存在するディレクトリで実行します。
これはWSL上で実行します。

$ astro dev start

ブラウザからAirflowへアクセスする

http://localhost:8080/login/

デフォルトのユーザ/パスワードはadmin/adminでログイン可能です。

DAGのインポートエラー

Airflowにログインすると、以下のようなインポートエラーが発生しました。

airflow.exceptions.AirflowTaskTimeout: DagBag import timeout for my_cosmos_dag.py after 30.0s.
Please take a look at these docs to improve your DAG import time:
* https://***.apache.org/docs/apache-***/2.2.2/best-practices.html#top-level-python-code
* https://***.apache.org/docs/apache-***/2.2.2/best-practices.html#reducing-dag-complexity, PID: 909

ERROR - Failed to execute task Dag 'dbt_snowflake_dag' could not be found; either it does not exist or it failed to parse..

解決方法として、airflow.cfgのdagbag_import_timeoutの値を変更します。
airflow.cfgはcosmosのschedulerコンテナに入って値を変更します。

schedulerコンテナるが、rootユーザで入ること
$ docker exec -it -u root cosmos_7e96a1-scheduler-1 bash

airflow.cfgは/usr/local/airflow/配下に存在します。
変更後、ブラウザをリロードするとエラーが解消されました。
原因はインポートする時間が足りなくてタイムアウトしちゃうのです。
なので、インポートする時間を延ばしてあげます。

airflow.cfgの値を変更した結果
$ cat airflow.cfg | grep "dagbag_import_time"
# dagbag_import_timeout = 30.0
dagbag_import_timeout = 180.0

AWS MWAAでも同じ事象が起きて、解決された記事を参考にさせていただきました。
https://qiita.com/Hisaaki-Kato/items/6ee45e0d3a39d524a93e

AirflowのConnectionsにSnowflakeへの接続情報を設定する

設定する項目

Connections key input key 備考
Connection Id snowflake_conn DAGでsnowflake_connという名前で設定しているのでここは固定
変更する場合はDAGも変更すること
Connection Type Snowflake apache-airflow-providers-snowflakeをインストールしていないと選択肢に出てこないので注意
Schema PUBLIC dbtプロジェクトで使うスキーマの指定
Login Snowflakeログインユーザ名 -
Password Snowflakeログインユーザパスワード -
Account ******.ap-northeast-1.aws -
Warehouse COMPUTE_WH dbtプロジェクトで使うウェアハウスの指定
Database COSMOS dbtプロジェクトで使うDBの指定
Role ACCOUNTADMIN dbtプロジェクトで使うロールの指定

以上で、環境構築完了です。

DAGの実行結果

無事dbtの実行が通りました。(それまでにめっちゃ失敗しましたけど。。。)

Discussion