[Dagster Internal#1]DagsterのCode Locationと他のコンポーネントは何を通信しているか
DagsterのCode Locationというコンポーネントが何を通信しているかを確認したメモです。
同様にCode Locationの概念がモヤっとしている方向けです。たぶんDagsterを使う上で使う上で必要ない情報です。
Code Locationとは
A code location is a collection of Dagster definitions loadable and accessible by Dagster's tools, such as the CLI, UI, and Dagster+
A reference to a Python module that has an instance of Definitions in a top-level variable
ということで、各種定義(Assets, Resources..)を提供するコンポーネントと言われています。
クライアントの定義
Code Locationと通信するコンポーネントは複数ありますが(Daemon, CLI, WebServer...)ここではWebServerを見てみます。
WebServerのCLIコマンドはpython_modules/dagster-webserverにあり、
- ProcessContextの初期化
-
DagsterWebServerというGraphQL(とその他)を提供するクラスからサーバーを起動
- Starletteで実装されてます
- GraphQLのスキーマ・処理をcreate_schemaで定義
しています。
GraphQLのクエリーが色々定義されていますが、例えばスケジュールの取得を見てみると
- graphene_info.contextからget_code_locationでCode Locationを取得
- Code LocationからさらにRepositoryを取得
- そのRepositoryからget_schedulesでスケジュール情報を取得
しています。ContextのCode Location・Repository関係箇所にCode LocationとのgRPC通信がありそうですね。
Contextのget_code_locationは(いくつかのメソッドを経て)
_load_locationでGrpServerCodeLocationというCodeLocationをgRPC経由で呼び出すクラスのインスタンスを作成しています。
このGrpServerCodeLocationではgRPCクライアント(DagsterGrpcClient)を作成し、
- Repositoryの一覧を取得(sync_list_repositories_grpc)
- 各Repositoryに関して情報を取得(sync_get_streaming_external_repositories_data_grpcでgRPCのStreamingExternalRepository)
しています。
というので、スケジュールは結局streaming_external_repositoryというgRPCの処理で取得している予想できます。
サーバーの定義
Code LocationのgRPCサーバーはで実装されています。
- .protoファイルはdagster/_grpc/protos/dagster_api.proto
- 実装ロジックはserver.pyにあるDagsterGrpcServer
- クライアントのコード出てきたStreamingExternalRepositoryもserver.pyにあるのですが、ExternalRepositoryRequestを呼び出してちょっとずつ返してるだけで、実際のロジックはExternalRepositoryの方にあります
- ExternalRepositoryではRepositorySnap.from_defで各種定義を集めて、シリアライズして返しています
つまり?
Code Locationで各種定義を集めてExternalRepository/StreamingExternalRepositoryでgRPCエンドポイントで提供して、各種ツール(Daemon/WebServer..)に提供しています。
用語
ソースコードやAPI上では、あまりドキュメントも出てこない用語が時々出てきます
- Repository
- External
- Snapshot
なんとなく
- Repository = コードの集まり
- External = 起動したコンポーネント外で定義されてる
- Snapshot = ある時点での定義
- (定義が時間変化することもあるので)
と想像しています(が確かではないので定義知ってる人いれば教えてください)。
通信をのぞいてみる
Code Locationのサーバーにtcpdumpを取ると、実際Streaming External Repositoryという処理が呼ばれていること、またそのレスポンスにDefsで定義している各種定義の名前(等)が含まれていることを確認できます。
やり方は色々ありますが、docker-composeで起動してる場合は
- tcpdump用のコンテナを作りtcpdumpファイルを保存
- .protoファイルをWiresharkで指定
- gRPCのレスポンスっぽいパケットをフィルタ
- 例えば「protobuf.message.name == "api.StreamingExternalRepositoryEvent"」(サイズで並び替えしても良さそう)
- Uncompressed Entity Body(or )で内容見てみる
と、想定通り
- StreamingExternalRepositoryがリクエスト・レスポンスされてること
- StreamingExternalRepositoryEventに定義の一覧ぽいレスポンスが含まれていること
を確認できます。
テキストとしてコピーして、jqした結果(長いので一部)を下に記載します。Asset Keyの一覧があったりそれっぽい結果ですね。
cat external_repository.json | jq keys
[
"__class__",
"external_asset_checks",
"external_asset_graph_data",
"external_job_refs",
"external_partition_set_datas",
"external_pipeline_datas",
"external_resource_data",
"external_schedule_datas",
"external_sensor_datas",
"metadata",
"name",
"utilized_env_vars"
]
cat external_repository.json | jq . | tail -n30
{
"__class__": "ExternalRepositoryData",
"external_asset_checks": [],
"external_asset_graph_data": [
{
"__class__": "ExternalAssetNode",
"asset_key": {
"__class__": "AssetKey",
"path": [
"example_asset1"
]
},
"atomic_execution_unit_id": null,
"auto_materialize_policy": null,
"auto_observe_interval_minutes": null,
"automation_condition_snapshot": null,
"backfill_policy": null,
"code_version": null,
"compute_kind": null,
"depended_by": [],
"dependencies": [],
"execution_type": {
"__enum__": "AssetExecutionType.MATERIALIZATION"
},
"freshness_policy": null,
"graph_name": null,
"group_name": "default",
"is_observable": false,
"is_source": false,
"job_names": [
Discussion