Dagsterのジョブをe2eテストのために実行したい
tl;dr
- Dagster CLIでジョブを実行できるよ
- execute_in_processでも実行できるよ
- CLIコマンドではexecuteとlaunchのどちらでもジョブ実行できるけど、ちょっと挙動が違うよ
モチベーション
- 複数のAsset・Resourceを跨いで、job全体として動くか確認したいこともあります
- 特に、DevinやClaude Codeに開発サポートしてもらう場合、その変更の確認、および、変更してない箇所を壊してないことの確認に使いたいこともあります
- 開発者が確認する場合はDagsterのWeb UIでジョブ実行して確認すれば済みますが、その方法では自動化が面倒なのでCLI完結で確認したいです
対象
- Dagster OSS
- Cloud使ってる時のローカルの開発でも使えるとは思いますが
- Dagster 1.11.2
対象のコード
scaffoldで用意したディレクトリに適当なAsset・Jobを定義します。
import dagster as dg
@dg.asset
def asset1(context: dg.AssetExecutionContext):
s = "Hello from asset1!"
context.log.info(s)
return s
asset_job1 = dg.define_asset_job(
name="asset_job1",
selection='+key:"asset1"',
)
CLIから実行
Dagsterプロジェクトのあるディレクトリでdagster jobコマンドから実行すれば、CLIからジョブが実行できます。
# scaffoldでsanbodx2というモジュールを作っていた場合
dagster job execute -m sandbox2.definitions -j asset_job1
2025-07-13 08:19:47 +0900 - dagster - DEBUG - asset_job1 - 4b6c78fb-7928-4303-9d86-c36adf62127c - 1801512 - RUN_START - Started execution of run for "asset_job1".
2025-07-13 08:19:47 +0900 - dagster - DEBUG - asset_job1 - 4b6c78fb-7928-4303-9d86-c36adf62127c - 1801512 - ENGINE_EVENT - Executing steps using multiprocess executor: parent process (pid: 1801512)
2025-07-13 08:19:47 +0900 - dagster - DEBUG - asset_job1 - 4b6c78fb-7928-4303-9d86-c36adf62127c - 1801512 - asset1 - STEP_WORKER_STARTING - Launching subprocess for "asset1".
2025-07-13 08:19:48 +0900 - dagster - DEBUG - asset_job1 - 4b6c78fb-7928-4303-9d86-c36adf62127c - 1801623 - asset1 - STEP_WORKER_STARTED - Executing step "asset1" in subprocess.
2025-07-13 08:19:48 +0900 - dagster - DEBUG - asset_job1 - 4b6c78fb-7928-4303-9d86-c36adf62127c - 1801623 - asset1 - RESOURCE_INIT_STARTED - Starting initialization of resources [io_manager].
2025-07-13 08:19:48 +0900 - dagster - DEBUG - asset_job1 - 4b6c78fb-7928-4303-9d86-c36adf62127c - 1801623 - asset1 - RESOURCE_INIT_SUCCESS - Finished initialization of resources [io_manager].
2025-07-13 08:19:49 +0900 - dagster - DEBUG - asset_job1 - 4b6c78fb-7928-4303-9d86-c36adf62127c - 1801623 - LOGS_CAPTURED - Started capturing logs in process (pid: 1801623).
2025-07-13 08:19:49 +0900 - dagster - DEBUG - asset_job1 - 4b6c78fb-7928-4303-9d86-c36adf62127c - 1801623 - asset1 - STEP_START - Started execution of step "asset1".
2025-07-13 08:19:49 +0900 - dagster - INFO - asset_job1 - 4b6c78fb-7928-4303-9d86-c36adf62127c - asset1 - Hello from asset1!
2025-07-13 08:19:49 +0900 - dagster - DEBUG - asset_job1 - 4b6c78fb-7928-4303-9d86-c36adf62127c - 1801623 - asset1 - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2025-07-13 08:19:49 +0900 - dagster - DEBUG - asset_job1 - 4b6c78fb-7928-4303-9d86-c36adf62127c - asset1 - Writing file at: /home/notrogue/project/dagster/sandbox/sandbox2/storage/asset1 using PickledObjectFilesystemIOManager...
2025-07-13 08:19:49 +0900 - dagster - DEBUG - asset_job1 - 4b6c78fb-7928-4303-9d86-c36adf62127c - 1801623 - asset1 - ASSET_MATERIALIZATION - Materialized value asset1.
2025-07-13 08:19:49 +0900 - dagster - DEBUG - asset_job1 - 4b6c78fb-7928-4303-9d86-c36adf62127c - 1801623 - asset1 - HANDLED_OUTPUT - Handled output "result" using IO manager "io_manager"
2025-07-13 08:19:49 +0900 - dagster - DEBUG - asset_job1 - 4b6c78fb-7928-4303-9d86-c36adf62127c - 1801623 - asset1 - STEP_SUCCESS - Finished execution of step "asset1" in 76ms.
2025-07-13 08:19:49 +0900 - dagster - DEBUG - asset_job1 - 4b6c78fb-7928-4303-9d86-c36adf62127c - 1801512 - ENGINE_EVENT - Multiprocess executor: parent process exiting after 2.05s (pid: 1801512)
2025-07-13 08:19:49 +0900 - dagster - DEBUG - asset_job1 - 4b6c78fb-7928-4303-9d86-c36adf62127c - 1801512 - RUN_SUCCESS - Finished execution of run for "asset_job1".
2025-07-13 08:19:49 +0900 - dagster - DEBUG - asset_job1 - 4b6c78fb-7928-4303-9d86-c36adf62127c - 1801512 - asset1 - RESOURCE_INIT_STARTED - Starting initialization of resources [io_manager].
2025-07-13 08:19:49 +0900 - dagster - DEBUG - asset_job1 - 4b6c78fb-7928-4303-9d86-c36adf62127c - 1801512 - asset1 - RESOURCE_INIT_SUCCESS - Finished initialization of resources [io_manager].
真ん中らへんに、このジョブから出力してるログ「Hello from asset1!」も表示されています(残りはDagsterのイベント)。
エラーの確認
e2eで確認する時は実行失敗も確認したいと思います。job executeで実行した場合の挙動を見てみましょう。
asset.pyをエラーでるように変更して
@dg.asset
def asset1(context: dg.AssetExecutionContext):
raise Exception()
dagster CLIを実行します
dagster job execute -m sandbox2.definitions -j asset_job1
# (一部省略)
dagster._core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "asset1"::
Exception
Stack Trace:
File "/home/notrogue/project/dagster/sandbox/sandbox2/.venv/lib/python3.12/site-packages/dagster/_core/execution/plan/utils.py", line 57, in op_execution_error_boundary
yield
File "/home/notrogue/project/dagster/sandbox/sandbox2/.venv/lib/python3.12/site-packages/dagster/_utils/__init__.py", line 392, in iterate_with_context
next_output = next(iterator)
^^^^^^^^^^^^^^
File "/home/notrogue/project/dagster/sandbox/sandbox2/.venv/lib/python3.12/site-packages/dagster/_core/execution/plan/compute_generator.py", line 137, in _coerce_op_compute_fn_to_iterator
result = invoke_compute_fn(
^^^^^^^^^^^^^^^^^^
File "/home/notrogue/project/dagster/sandbox/sandbox2/.venv/lib/python3.12/site-packages/dagster/_core/execution/plan/compute_generator.py", line 117, in invoke_compute_fn
return fn(context, **args_to_pass) if context_arg_provided else fn(**args_to_pass)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/notrogue/project/dagster/sandbox/sandbox2/src/sandbox2/defs/asset.py", line 5, in asset1
raise Exception()
2025-07-13 08:21:58 +0900 - dagster - DEBUG - asset_job1 - 550ab284-9f1b-4ca8-8736-5f8312a5e9b2 - 1807941 - ENGINE_EVENT - Multiprocess executor: parent process exiting after 1.6s (pid: 1807941)
2025-07-13 08:21:58 +0900 - dagster - ERROR - asset_job1 - 550ab284-9f1b-4ca8-8736-5f8312a5e9b2 - 1807941 - RUN_FAILURE - Execution of run for "asset_job1" failed. Steps failed: ['asset1'].
2025-07-13 08:21:58 +0900 - dagster - DEBUG - asset_job1 - 550ab284-9f1b-4ca8-8736-5f8312a5e9b2 - 1807941 - asset1 - RESOURCE_INIT_STARTED - Starting initialization of resources [io_manager].
2025-07-13 08:21:58 +0900 - dagster - DEBUG - asset_job1 - 550ab284-9f1b-4ca8-8736-5f8312a5e9b2 - 1807941 - asset1 - RESOURCE_INIT_SUCCESS - Finished initialization of resources [io_manager].
Error: Run 550ab284-9f1b-4ca8-8736-5f8312a5e9b2 resulted in failure.
エラーが起きたことがログにでていますね。statusコードでもエラーを確認できます。
# 私はfishを使ってるのでstatusで確認
echo $status
1
env
Dagsterでは.envファイルに各種設定(接続情報とか)を記載し、環境変数経由で参照できます。
dagster jobコマンドでも参照できることを一応見ておきましょう。
適当に.envファイルを作ります。
ENV1=abcd
Assetを変えます。
@dg.asset
def asset1(context: dg.AssetExecutionContext):
s = "Hello from asset1! with env variable: " + dg.EnvVar("ENV1").get_value(default="default_value")
context.log.info(s)
return s
実行します
dagster job execute -m sandbox2.definitions -j asset_job1
(省略)
2025-07-13 08:34:13 +0900 - dagster - INFO - asset_job1 - fca221fb-eb95-4f5c-9a5c-a8f8515d160d - asset1 - Hello from asset1! with env variable: abcd
参照できてそうです。
execute_in_process
上述の例はCLIからの実行でしたが、execute_in_processでPythonから実行することも出来ます。
# scaffoldで作ったモジュールがsandbox2の場合
% from sandbox2.definitions import defs
% job = defs().resolve_job_def("asset_job1")
2025-07-13 09:37:04 +0900 - dagster - DEBUG - asset_job1 - 789f839f-53d6-48d2-9475-074fb459bfad - 1980196 - RUN_START - Started execution of run for "asset_job1".
2025-07-13 09:37:04 +0900 - dagster - DEBUG - asset_job1 - 789f839f-53d6-48d2-9475-074fb459bfad - 1980196 - ENGINE_EVENT - Executing steps in process (pid: 1980196)
2025-07-13 09:37:04 +0900 - dagster - DEBUG - asset_job1 - 789f839f-53d6-48d2-9475-074fb459bfad - 1980196 - asset1 - RESOURCE_INIT_STARTED - Starting initialization of resources [io_manager].
2025-07-13 09:37:04 +0900 - dagster - DEBUG - asset_job1 - 789f839f-53d6-48d2-9475-074fb459bfad - 1980196 - asset1 - RESOURCE_INIT_SUCCESS - Finished initialization of resources [io_manager].
2025-07-13 09:37:04 +0900 - dagster - DEBUG - asset_job1 - 789f839f-53d6-48d2-9475-074fb459bfad - 1980196 - LOGS_CAPTURED - Started capturing logs in process (pid: 1980196).
2025-07-13 09:37:04 +0900 - dagster - DEBUG - asset_job1 - 789f839f-53d6-48d2-9475-074fb459bfad - 1980196 - asset1 - STEP_START - Started execution of step "asset1".
2025-07-13 09:37:04 +0900 - dagster - INFO - asset_job1 - 789f839f-53d6-48d2-9475-074fb459bfad - asset1 - Hello from asset1!
2025-07-13 09:37:04 +0900 - dagster - DEBUG - asset_job1 - 789f839f-53d6-48d2-9475-074fb459bfad - 1980196 - asset1 - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2025-07-13 09:37:04 +0900 - dagster - DEBUG - asset_job1 - 789f839f-53d6-48d2-9475-074fb459bfad - asset1 - Writing file at: /tmp/tmpw3y6vhud/storage/asset1 using PickledObjectFilesystemIOManager...
2025-07-13 09:37:04 +0900 - dagster - DEBUG - asset_job1 - 789f839f-53d6-48d2-9475-074fb459bfad - 1980196 - asset1 - ASSET_MATERIALIZATION - Materialized value asset1.
2025-07-13 09:37:04 +0900 - dagster - DEBUG - asset_job1 - 789f839f-53d6-48d2-9475-074fb459bfad - 1980196 - asset1 - HANDLED_OUTPUT - Handled output "result" using IO manager "io_manager"
2025-07-13 09:37:04 +0900 - dagster - DEBUG - asset_job1 - 789f839f-53d6-48d2-9475-074fb459bfad - 1980196 - asset1 - STEP_SUCCESS - Finished execution of step "asset1" in 6.56ms.
2025-07-13 09:37:04 +0900 - dagster - DEBUG - asset_job1 - 789f839f-53d6-48d2-9475-074fb459bfad - 1980196 - ENGINE_EVENT - Finished steps in process (pid: 1980196) in 11ms
2025-07-13 09:37:04 +0900 - dagster - DEBUG - asset_job1 - 789f839f-53d6-48d2-9475-074fb459bfad - 1980196 - RUN_SUCCESS - Finished execution of run for "asset_job1".
レスポンス(ExecuteInProcessResult)でジョブの実行状況の確認や、各Assetの戻り値を確認することもできます。
% result.success
True
% result.asset_value("asset1")
'Hello from asset1!'
ただし、CLI実行の時と違い.envファイルはデフォルトでは読み込んでくれないので、自分で読み込む必要があります。
例えば、Dagster CLIの.envの説明で使ったAssetを実行すると、default_value(=env読まれてない)になっています。
2025-07-13 09:36:44 +0900 - dagster - INFO - asset_job1 - 5954f61d-797d-4d34-8ce9-77f296597668 - asset1 - Hello from asset1! with env variable: default_value
.envを環境変数として読み込んであげると結果が変わります
% from dotenv import load_dotenv
% load_dotenv(".env")
% result = job.execute_in_process()
(省略)
2025-07-13 09:46:33 +0900 - dagster - INFO - asset_job1 - 29b77c80-b0df-4c0e-bf59-5826ec4dec8d - asset1 - Hello from asset1! with env variable: abcd
他の方法
job launch
dagster jobには、job launchというexecuteと似たコマンドがあり、実はこのコマンドでもジョブの実行はできます。
export run_id=$(uuidgen)
# 実行状況を確認するためにrun_idを指定して実行(任意の文字列ではなくUUIDが必要)
DAGSTER_HOME=(pwd) dagster job launch -j asset_job1 --run-id $run_id
# Jobを進めるためにDaemonを起動
# ちなみにdagster devでもDaemon起動するので、dagster devしてれば不要
DAGSTER_HOME=(pwd) dagster-daemon run
ただし、executeとは違い、細かい注意点がいくつかあります。
- デフォルト(scaffoldから作成状態)ではDagster Daemonを起動しないとJobが進まない(上の最後のコマンド)
- ジョブが失敗してもステータスコードからは気が付けない
- 失敗のログもDaemonには表示されますが、dagster job launchの方には表示されない
(正確にはこの話はscaffoldで作られるデフォルトの設定の場合で、Coordinator(DefaultRunCoordinator)・Launcherを設定すれば対応でる部分もあります)
job launchで実行すると、履歴データベース(runs)に保存されます。単に確認したいだけならexecuteの方が楽な気がしていますが、実行の履歴が必要な場合はLaunchで実行した方が良いかもしれません。
echo "select * from runs where run_id='$run_id2'" | sqlite3 history/runs.db -line
id = 29
run_id = 4782ada3-fa79-4cda-8d59-f70b152316eb
snapshot_id = 6418c90e8e6f80b7dff99e9eb12a12f5bfefc270
pipeline_name = asset_job1
mode =
status = SUCCESS
run_body = {"__class__": "PipelineRun", "asset_check_selection": null, "asset_selection": null, "execution_plan_snapshot_id": "1974798c21c3c845f6f3ebcde42083aa9b117c4f", "external_pipeline_origin": {"__class__": "ExternalPipelineOrigin", "external_repository_origin": {"__class__": "ExternalRepositoryOrigin", "repository_location_origin": {"__class__": "ManagedGrpcPythonEnvRepositoryLocationOrigin", "loadable_target_origin": {"__class__": "LoadableTargetOrigin", "attribute": null, "executable_path": null, "module_name": "sandbox2.definitions", "package_name": null, "python_file": null, "working_directory": "/home/notrogue/project/dagster/sandbox/sandbox2"}, "location_name": "sandbox2.definitions"}, "repository_name": "__repository__"}, "pipeline_name": "asset_job1"}, "has_repository_load_data": false, "mode": null, "parent_run_id": null, "pipeline_code_origin": {"__class__": "PipelinePythonOrigin", "pipeline_name": "asset_job1", "repository_origin": {"__class__": "RepositoryPythonOrigin", "code_pointer": {"__class__": "ModuleCodePointer", "fn_name": "defs", "module": "sandbox2.definitions", "working_directory": "/home/notrogue/project/dagster/sandbox/sandbox2"}, "container_context": {}, "container_image": null, "entry_point": ["dagster"], "executable_path": "/home/notrogue/project/dagster/sandbox/sandbox2/.venv/bin/python"}}, "pipeline_name": "asset_job1", "pipeline_snapshot_id": "6418c90e8e6f80b7dff99e9eb12a12f5bfefc270", "root_run_id": null, "run_config": {}, "run_id": "4782ada3-fa79-4cda-8d59-f70b152316eb", "run_op_concurrency": null, "solid_selection": null, "solids_to_execute": null, "status": {"__enum__": "PipelineRunStatus.SUCCESS"}, "step_keys_to_execute": ["asset1"], "tags": {}}
partition =
partition_set =
create_timestamp = 2025-07-12 23:51:58.504157
update_timestamp = 2025-07-12 23:52:00.468828
start_time = 1752364318.8682
end_time = 1752364320.46883
backfill_id =
gRPC
多分実行するAPIありますが調べてないです。
Discussion