💨

DagsterのgRPCサーバーとお話したい

に公開
  • Dagsterには実際のソースコードを持つCode Locationと、UIを持つWeb Server、スケジュールの制御などをするDaemonといったコンポーネントがあります
  • DaemonやWebserverとCode LocationはgRPCで会話しています(上のArchitecture図やworkspace.yamlに記載)
  • Code LocationのgRPC ServerにCLIから接続してみたくなったので、そのメモです

なんでそんなことやりたいかと言うと

  • gRPCでどんな内容やり取りしてるか気になった
    • Dagsterの各種コンポーネントをぼんやりしたとしか理解しておらず、それを調べる中での流れ
  • トラブルシュートが起きる前に知っておくと便利かも

ためです(知らなくても良い内容だとは思います)。

手順

curlで一発ポン!という分けにはいかず微妙に面倒です(より良いやり方知ってたら教えて欲しい)。

  1. お好きな方法でgrpcurlをインストール
  2. DagsterのコードをローカルにClone
    • 理由は後述
  3. 適当にDagsterのCode LocationのgRPCサーバーを起動
  4. grpcurlで接続

適当にDagsterのCode LocationのgRPCサーバーを起動

方法は何でも良いですが、ここではexampleディレクトリにあるdocker-composeを使います。
このexampleでは

  • Code LocationのgRPC Server
  • Webserver
  • Daemon
  • PostgreSQL

がdocker-composeで起動します。

ローカルから接続しやすいように、Code Locationの4000番ポート(dagster api grpcコマンドで指定しているポート)は解放しておきます。

# docker-compose.yaml
# (前略)
  docker_example_user_code:
    build:
      context: .
      dockerfile: ./Dockerfile_user_code
    container_name: docker_example_user_code
    image: docker_example_user_code_image
    restart: always
    # ローカルから接続できるように変更
    expose:
      - '4000'
    ports:
      - '127.0.0.1:4000:4000'
# (後略)

grpcurlで接続

grpcurlはReflectionをサーバー側でサポートしてないとデフォルトではエラーになります。Code LocationのgRPCサーバーは(少なくともデフォルトでは)サポートしておらず、ナイーブに接続するとエラーになります。

grpcurl  -plaintext 127.0.0.1:4000 api.DagsterApi.ListRepositories
Error invoking method "api.DagsterApi.ListRepositories": failed to query for service descriptor "api.DagsterApi": server does not support the reflection API

これに対応するためには明示的にAPIを定義してる.protoファイルを渡してあげる必要があります。
(Dagsterのソースコードが必要なのはこのため)

API接続の前に.protoファイルを渡せてるか試してみます。ローカルにダウンロードしたDagsterのソースコードの_grpcディレクトリに移動して、grpcurlコマンドに.protoを渡して実行してみます。

grpcurl -plaintext -import-path ./protos -proto api.proto list api.DagsterApi
api.DagsterApi.CanCancelExecution
api.DagsterApi.CancelExecution
api.DagsterApi.ExecutionPlanSnapshot
api.DagsterApi.ExternalJob
api.DagsterApi.ExternalNotebookData
api.DagsterApi.ExternalPartitionConfig
api.DagsterApi.ExternalPartitionNames
api.DagsterApi.ExternalPartitionSetExecutionParams
api.DagsterApi.ExternalPartitionTags
api.DagsterApi.ExternalPipelineSubsetSnapshot
api.DagsterApi.ExternalRepository
api.DagsterApi.ExternalScheduleExecution
api.DagsterApi.ExternalSensorExecution
api.DagsterApi.GetCurrentImage
api.DagsterApi.GetCurrentRuns
api.DagsterApi.GetServerId
api.DagsterApi.Heartbeat
api.DagsterApi.ListRepositories
api.DagsterApi.Ping
api.DagsterApi.ShutdownServer
api.DagsterApi.StartRun
api.DagsterApi.StreamingExternalRepository
api.DagsterApi.StreamingPing

良さそうです。実際にAPIにも接続してみます。

echo '{"echo": "abc"}' |  grpcurl -plaintext -import-path ./protos -proto api.proto -d @ 127.0.0.1:4000 api.DagsterApi.Ping
{
  "echo": "abc"
}


grpcurl -plaintext -import-path ./protos -proto api.proto  127.0.0.1:4000 api.DagsterApi.ListRepositories
{
  "serializedListRepositoriesResponseOrError": "{\"__class__\": \"ListRepositoriesResponse\", \"container_context\": {}, \"container_image\": \"docker_example_user_code_image\", \"dagster_library_versions\": {\"dagster\": \"1.10.7\", \"dagster-shared\": \"0.26.7\"}, \"entry_point\": [\"dagster\"], \"executable_path\": \"/usr/local/bin/python3.10\", \"repository_code_pointer_dict\": {\"__repository__\": {\"__class__\": \"FileCodePointer\", \"fn_name\": \"defs\", \"python_file\": \"my_dagster_project/definitions.py\", \"working_directory\": \"/opt/dagster/app\"}}, \"repository_symbols\": [{\"__class__\": \"LoadableRepositorySymbol\", \"attribute\": \"defs\", \"repository_name\": \"__repository__\"}]}"
}

なんかそれっぽい値が帰ってきました(ソースコードではたぶんここら辺)。

Discussion