🔍

Kubernetes リソースに対して SQL のクエリを実行するツールを作りました

に公開

概要

kuqu は Kubernetes クラスタ内のリソースに対して SQL 構文でクエリを実行できる Rust 製のコマンドラインツールです。Apache DataFusion を活用し、動的スキーマ推論によって Kubernetes リソースをテーブルデータとして扱い、複雑な条件検索、集約、結合操作を SQL で実行します。

以下に、実行例を示します。

$ kuqu "SELECT metadata.name AS pod, status.phase AS phase
     FROM 'pod/kube-system' WHERE status.phase = 'Running'"
+--------------------------------------------+---------+
| pod                                        | phase   |
+--------------------------------------------+---------+
| coredns-6f6b679f8f-8rkh9                   | Running |
| coredns-6f6b679f8f-kgbjs                   | Running |
| etcd-kind-control-plane                    | Running |
| kindnet-khqtr                              | Running |
| kube-apiserver-kind-control-plane          | Running |
| kube-controller-manager-kind-control-plane | Running |
| kube-proxy-ccth7                           | Running |
| kube-scheduler-kind-control-plane          | Running |
+--------------------------------------------+---------+

この記事では、kuqu の内部実装や Apache DataFusion の活用方法について詳しく説明します。

Apache DataFusion

kuqu では Apache DataFusion (以下、DataFusion) を活用し、Kubernetes リソースを SQL テーブルとして扱います。

DataFusion は以下のような特徴を持つクエリエンジンです:

  • Apache Arrow (以下、Arrow) をインメモリフォーマットとして使用しています
  • SQLDataFrame API をサポートしています
  • TableProvider トレイトを通じて独自のデータソースとして拡張可能です
    • kuqu ではこの TableProvider を実装し、Kubernetes リソースを SQL テーブルとして扱うようにしています

kuqu の内部実装

以降のセクションでは、kuqu の内部実装について詳しく説明します。

テーブルの定義

まず kuqu では Kubernetes リソースに対応するテーブルを以下のように定義しています

SELECT * FROM ${KUBERNETES_RESOURCE}/${KUBERNETES_NAMESPACE}
  • ${KUBERNETES_RESOURCE}
    • リソースの指定
      • e.g. pod, node, service
      • Custom Resource Definition もサポートしています
    • リソースのフォーマットは以下のように指定できます
      • deployments (基本的なリソース)
      • deployment (単一のリソース)
      • deploy (短縮形)
      • deployments.apps (API グループを含むリソース)
  • ${KUBERNETES_NAMESPACE}
    • Kubernetes namespace の指定
      • e.g. kube-system, default
      • 指定しない場合は以下のフローで決定されます
        1. kubernetes context で指定された namespace
        2. default namespace
    • node のような cluster-wide のリソースに対する namespace の指定は無視されます

スキーマの推論

kuqu は Kubernetes の /openapi/v3 エンドポイントを使用する代わりに、実際のリソースデータからスキーマを動的に推論します。

この設計判断の理由は、OpenAPI 仕様では additionalProperties=true を持つフィールド(metadata.labelsmetadata.annotations など)の動的に決定されるキーに対するスキーマを事前に確定できないためです。

スキーマの推論については Arrow の infer_json_schema を利用できます。

以下のコードは kuqu でのスキーマ推論の実装から抜粋しています

async fn infer_schema(ndjson: &str) -> DataFusionResult<SchemaRef> {
    infer_json_schema(&mut Cursor::new(ndjson.as_bytes()), None)
        .map(|(schema, _)| Arc::new(schema))
        .map_err(|e| DataFusionError::External(Box::new(e)))
}

ただし、スキーマはクエリ実行時に推論されるため、リソース数が多い場合や複雑な構造を持つリソースが存在する場合、クエリに時間がかかる可能性があります(スキーマ推論に使用するリソース数を制限する仕組みを検討中です)。

テーブルの動的作成

SQL クエリを実行するためのテーブルの作成には、DataFusion の TableProvider トレイトを実装する必要があります、と述べました。

しかし、正確には TableProvider トレイトだけでは不十分です。というのも TableProvider トレイトはクエリに依存しないテーブルの定義を提供するためのもので kuqu ではクエリに応じた適切なテーブルを動的に生成する必要があるためです。

具体的には SELECT * FROM pod/kube-system と実行されたタイミングで kube-system namespace の pod リソースを取得し、テーブルを生成する必要があります。

調査したところ、DataFusion ではこのような動的なテーブルの生成を実現するための手段が用意されていました。UrlTableFactory のトレイトです。

UrlTableFactory の実装

実装するメソッドは try_new です。このメソッドは url: &str を受け取り TableProvider を返す非同期関数です。

async fn try_new(&self, url: &str) -> DataFusionResult<Option<Arc<dyn TableProvider>>> {}

url とは具体的にはクエリ時の FROM や JOIN 句で指定された変数が渡されます。例えば SELECT * FROM pod/kube-system の場合、urlpod/kube-system となります。kuqu では先ほどのルールに従ってリソースと namespace を分解し、Kubernetes API を通じてリソースを取得します。

以下は kuqu における UrlTableFactory の実装を抜粋したコード片です。

#[async_trait]
impl UrlTableFactory for KubernetesTableProviderFactory {
    /// Try to create a table provider from a Kubernetes URL
    async fn try_new(&self, url: &str) -> DataFusionResult<Option<Arc<dyn TableProvider>>> {
        let kubeurl =
            KubernetesUrl::parse(url, &self.context, &self.api_resources)?;

        let object_list = self
            .list_api_resources(&kubeurl.resource, &kubeurl.namespace)
            .await?;

        let ndjson = object_list
            .items
            .iter()
            .map(|item| serde_json::json!(item).to_string())
            .collect::<Vec<_>>()
            .join("\n");

        let schema = infer_schema(&ndjson).await?;

        Ok(Some(Arc::new(KubernetesTableProvider::new(
            schema,
            Arc::new(ndjson),
        ))))
    }
}

以降のコードでは Custom Table Provider のドキュメントに倣って TableProvider の実装 => ExecutionPlan の実装と続きます。

TableProvider の実装

TableProvider トレイトを実装することで、DataFusion は KubernetesTableProvider をテーブルとして認識し、クエリの実行時に適切なスキーマとデータを提供します。

ここで重要なのが scan メソッドです。scan メソッドで渡される projection 引数を使用して、クエリで指定されたカラムのみを選択することができます。

以下は kuqu における TableProvider の実装を抜粋したコード片です。

#[async_trait]
impl TableProvider for KubernetesTableProvider {
    async fn scan(
        &self,
        projection: Option<&Vec<usize>>,
        ...
    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
        let projected_schema = if let Some(proj) = projection {
            match self.schema.project(proj) {
                Ok(projected_schema) => Arc::new(projected_schema),
                Err(_) => self.schema.clone(),
            }
        } else {
            self.schema.clone()
        };

        Ok(Arc::new(KubernetesExec::new(
            projected_schema,
            self.ndjson.clone(),
        )))
    }

ExecutionPlan の実装

最後に、ExecutionPlan トレイトを実装することで、DataFusion は KubernetesExec を実行計画として認識し、クエリの実行時にデータを取得します。

...といいつつ kuqu ではスキーマ推論のため UrlTableFactory のフェーズですでにデータを取得しているため、ここではデータの取得は行いません。

したがって ExecutionPlan では UrlTableFactory で取得した NDJSON を RecordBatch に変換する処理のみを実装します。

変換に関する実装が知りたい場合は kuqu (v0.1.0) 時点でのコードを参照してください。

クエリの実行

先ほど実装したクエリの実行は以下のように行います(実際のコードから抜粋)。

let factory = Arc::new(KubernetesTableProviderFactory::new(
    client,
    context,
    api_resources,
));

let ctx = SessionContext::new();

let catalog_list = Arc::new(DynamicFileCatalog::new(
    Arc::clone(ctx.state().catalog_list()),
    factory as Arc<dyn UrlTableFactory>,
));

let ctx: SessionContext = ctx
    .into_state_builder()
    .with_catalog_list(catalog_list)
    .build()
    .into();

let df = ctx.sql(&args.query).await?;
df.show().await?;

Factory を with_catalog_list メソッドを通じて SessionContext に登録します。これにより、DataFusion はクエリ実行時に UrlTableFactory を使用して動的にテーブルを生成できるようになります。ctx.sql メソッドを使用して SQL クエリを実行し、結果を DataFrame として取得します。さらに DataFrame の show メソッドを呼び出すことで、クエリ結果をコンソールに表示します。

まとめ

kuqu における DataFusion の活用方法を紹介しました。DataFusion の機能を利用することで、Kubernetes リソースを SQL でクエリするための柔軟なテーブル定義と動的スキーマ推論を実現しています。

今後としては REPL 機能の追加を検討しています。REPL 機能を追加することで、ユーザーは対話的にクエリを実行し、結果を確認しながら操作できるようになります。

Discussion