🐷

[Dagster Internal#1]DagsterのCode Locationと他のコンポーネントは何を通信しているか

に公開

DagsterのCode Locationというコンポーネントが何を通信しているかを確認したメモです。
同様にCode Locationの概念がモヤっとしている方向けです。たぶんDagsterを使う上で使う上で必要ない情報です。

Code Locationとは

Dagsterのドキュメント曰く

A code location is a collection of Dagster definitions loadable and accessible by Dagster's tools, such as the CLI, UI, and Dagster+
A reference to a Python module that has an instance of Definitions in a top-level variable

ということで、各種定義(Assets, Resources..)を提供するコンポーネントと言われています。

クライアントの定義

Code Locationと通信するコンポーネントは複数ありますが(Daemon, CLI, WebServer...)ここではWebServerを見てみます。

WebServerのCLIコマンドはpython_modules/dagster-webserverにあり、

  • ProcessContextの初期化
  • DagsterWebServerというGraphQL(とその他)を提供するクラスからサーバーを起動
    • Starletteで実装されてます
  • GraphQLのスキーマ・処理をcreate_schemaで定義

しています。

GraphQLのクエリーが色々定義されていますが、例えばスケジュールの取得を見てみると

  • graphene_info.contextからget_code_locationでCode Locationを取得
  • Code LocationからさらにRepositoryを取得
  • そのRepositoryからget_schedulesでスケジュール情報を取得

しています。ContextのCode Location・Repository関係箇所にCode LocationとのgRPC通信がありそうですね。

Contextのget_code_locationは(いくつかのメソッドを経て)
_load_locationGrpServerCodeLocationというCodeLocationをgRPC経由で呼び出すクラスのインスタンスを作成しています。

このGrpServerCodeLocationではgRPCクライアント(DagsterGrpcClient)を作成し、

  • Repositoryの一覧を取得(sync_list_repositories_grpc)
  • 各Repositoryに関して情報を取得(sync_get_streaming_external_repositories_data_grpcでgRPCのStreamingExternalRepository)

しています。

というので、スケジュールは結局streaming_external_repositoryというgRPCの処理で取得している予想できます。

サーバーの定義

Code LocationのgRPCサーバーはで実装されています。

つまり?

Code Locationで各種定義を集めてExternalRepository/StreamingExternalRepositoryでgRPCエンドポイントで提供して、各種ツール(Daemon/WebServer..)に提供しています。

用語

ソースコードやAPI上では、あまりドキュメントも出てこない用語が時々出てきます

  • Repository
  • External
  • Snapshot

なんとなく

  • Repository = コードの集まり
  • External = 起動したコンポーネント外で定義されてる
  • Snapshot = ある時点での定義
    • (定義が時間変化することもあるので)

と想像しています(が確かではないので定義知ってる人いれば教えてください)。

通信をのぞいてみる

Code Locationのサーバーにtcpdumpを取ると、実際Streaming External Repositoryという処理が呼ばれていること、またそのレスポンスにDefsで定義している各種定義の名前(等)が含まれていることを確認できます。

やり方は色々ありますが、docker-composeで起動してる場合は

  1. tcpdump用のコンテナを作りtcpdumpファイルを保存
  2. .protoファイルをWiresharkで指定
  3. gRPCのレスポンスっぽいパケットをフィルタ
    • 例えば「protobuf.message.name == "api.StreamingExternalRepositoryEvent"」(サイズで並び替えしても良さそう)
  4. Uncompressed Entity Body(or )で内容見てみる

と、想定通り

  • StreamingExternalRepositoryがリクエスト・レスポンスされてること
  • StreamingExternalRepositoryEventに定義の一覧ぽいレスポンスが含まれていること

を確認できます。

テキストとしてコピーして、jqした結果(長いので一部)を下に記載します。Asset Keyの一覧があったりそれっぽい結果ですね。

 cat external_repository.json | jq keys
[
  "__class__",
  "external_asset_checks",
  "external_asset_graph_data",
  "external_job_refs",
  "external_partition_set_datas",
  "external_pipeline_datas",
  "external_resource_data",
  "external_schedule_datas",
  "external_sensor_datas",
  "metadata",
  "name",
  "utilized_env_vars"
]

 cat external_repository.json | jq . | tail -n30
{
  "__class__": "ExternalRepositoryData",
  "external_asset_checks": [],
  "external_asset_graph_data": [
    {
      "__class__": "ExternalAssetNode",
      "asset_key": {
        "__class__": "AssetKey",
        "path": [
          "example_asset1"
        ]
      },
      "atomic_execution_unit_id": null,
      "auto_materialize_policy": null,
      "auto_observe_interval_minutes": null,
      "automation_condition_snapshot": null,
      "backfill_policy": null,
      "code_version": null,
      "compute_kind": null,
      "depended_by": [],
      "dependencies": [],
      "execution_type": {
        "__enum__": "AssetExecutionType.MATERIALIZATION"
      },
      "freshness_policy": null,
      "graph_name": null,
      "group_name": "default",
      "is_observable": false,
      "is_source": false,
      "job_names": [

Discussion