Astronomer-cosmos環境構築からdbt実行までサクッと試す
突貫でcosmosの環境を構築してdbt実行のパイプラインを試してみました。
あんまりサクッと行きませんでした。。。
一応あとで色々追記予定です。時間が無く最低限のメモ書きです。
参考にさせていただいた記事
cosmos環境構築
環境
Windows11のWSL2上に構築しました。
コンテナの構築方法は割愛します。
Astronomer CLIのインストール
これはWSL上で実行します。
$ curl -sSL install.astronomer.io | sudo bash -s
cosmosのDockerのセットアップ
これはWSL上で実行します。
$ astro dev init
わたしは作業用ディレクトリとしてcosmosを作成し、上記コマンドの実行結果として以下のようになります。
$ ls cosmos/
Dockerfile README.md airflow_settings.yaml dags include packages.txt plugins requirements.txt tests
Dockerfileとrequirements.txtの修正
Dockerfileに以下を追記します。
RUN python3 -m venv cosmos_venv && source cosmos_venv/bin/activate && pip install --no-cache-dir dbt-snowflake && deactivate
requirements.txtに以下を追記します。
astronomer-cosmos
apache-airflow-providers-snowflake
dbtプロジェクトの用意
いつものごとくJaffle Shopを利用させていただきました。
dbtプロジェクトの配置場所は作業ディレクトリcosmosの直下のdagsとします。
後ほどAirflow側のConnections設定でSnowflakeとの接続設定を行うため、profiles.ymlの作成は不要です。
$ ls cosmos/dags/jaffle_shop/
LICENSE README.md dbt_project.yml etc logs models seeds
dbt実行用DAGの作成
cosmos/dags配下にmy_cosmos_dag.pyとしてDAGを作成します。
from datetime import datetime
import os
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig
from cosmos.profiles import PostgresUserPasswordProfileMapping
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
profile_config = ProfileConfig(profile_name="default",
target_name="dev",
profile_mapping=SnowflakeUserPasswordProfileMapping(conn_id="snowflake_conn",
profile_args={
"database": "COSMOS",
"schema": "public"
},
))
dbt_snowflake_dag = DbtDag(project_config=ProjectConfig("/usr/local/airflow/dags/jaffle_shop/",),
operator_args={"install_deps": True},
profile_config=profile_config,
execution_config=ExecutionConfig(dbt_executable_path=f"{os.environ['AIRFLOW_HOME']}/cosmos_venv/bin/dbt",),
schedule_interval="@daily",
start_date=datetime(2024, 1, 26),
catchup=False,
dag_id="dbt_snowflake_dag",)
comosコンテナの起動
Dockerfileが存在するディレクトリで実行します。
これはWSL上で実行します。
$ astro dev start
ブラウザからAirflowへアクセスする
デフォルトのユーザ/パスワードはadmin/adminでログイン可能です。
DAGのインポートエラー
Airflowにログインすると、以下のようなインポートエラーが発生しました。
airflow.exceptions.AirflowTaskTimeout: DagBag import timeout for my_cosmos_dag.py after 30.0s.
Please take a look at these docs to improve your DAG import time:
* https://***.apache.org/docs/apache-***/2.2.2/best-practices.html#top-level-python-code
* https://***.apache.org/docs/apache-***/2.2.2/best-practices.html#reducing-dag-complexity, PID: 909
ERROR - Failed to execute task Dag 'dbt_snowflake_dag' could not be found; either it does not exist or it failed to parse..
解決方法として、airflow.cfgのdagbag_import_timeoutの値を変更します。
airflow.cfgはcosmosのschedulerコンテナに入って値を変更します。
$ docker exec -it -u root cosmos_7e96a1-scheduler-1 bash
airflow.cfgは/usr/local/airflow/配下に存在します。
変更後、ブラウザをリロードするとエラーが解消されました。
原因はインポートする時間が足りなくてタイムアウトしちゃうのです。
なので、インポートする時間を延ばしてあげます。
$ cat airflow.cfg | grep "dagbag_import_time"
# dagbag_import_timeout = 30.0
dagbag_import_timeout = 180.0
AWS MWAAでも同じ事象が起きて、解決された記事を参考にさせていただきました。
AirflowのConnectionsにSnowflakeへの接続情報を設定する
設定する項目
Connections key | input key | 備考 |
---|---|---|
Connection Id | snowflake_conn | DAGでsnowflake_connという名前で設定しているのでここは固定 変更する場合はDAGも変更すること |
Connection Type | Snowflake | apache-airflow-providers-snowflakeをインストールしていないと選択肢に出てこないので注意 |
Schema | PUBLIC | dbtプロジェクトで使うスキーマの指定 |
Login | Snowflakeログインユーザ名 | - |
Password | Snowflakeログインユーザパスワード | - |
Account | ******.ap-northeast-1.aws | - |
Warehouse | COMPUTE_WH | dbtプロジェクトで使うウェアハウスの指定 |
Database | COSMOS | dbtプロジェクトで使うDBの指定 |
Role | ACCOUNTADMIN | dbtプロジェクトで使うロールの指定 |
以上で、環境構築完了です。
DAGの実行結果
無事dbtの実行が通りました。(それまでにめっちゃ失敗しましたけど。。。)
Discussion