Kubernetes リソースに対して SQL のクエリを実行するツールを作りました
概要
kuqu は Kubernetes クラスタ内のリソースに対して SQL 構文でクエリを実行できる Rust 製のコマンドラインツールです。Apache DataFusion を活用し、動的スキーマ推論によって Kubernetes リソースをテーブルデータとして扱い、複雑な条件検索、集約、結合操作を SQL で実行します。
以下に、実行例を示します。
$ kuqu "SELECT metadata.name AS pod, status.phase AS phase
FROM 'pod/kube-system' WHERE status.phase = 'Running'"
+--------------------------------------------+---------+
| pod | phase |
+--------------------------------------------+---------+
| coredns-6f6b679f8f-8rkh9 | Running |
| coredns-6f6b679f8f-kgbjs | Running |
| etcd-kind-control-plane | Running |
| kindnet-khqtr | Running |
| kube-apiserver-kind-control-plane | Running |
| kube-controller-manager-kind-control-plane | Running |
| kube-proxy-ccth7 | Running |
| kube-scheduler-kind-control-plane | Running |
+--------------------------------------------+---------+
この記事では、kuqu の内部実装や Apache DataFusion の活用方法について詳しく説明します。
Apache DataFusion
kuqu では Apache DataFusion (以下、DataFusion) を活用し、Kubernetes リソースを SQL テーブルとして扱います。
DataFusion は以下のような特徴を持つクエリエンジンです:
- Apache Arrow (以下、Arrow) をインメモリフォーマットとして使用しています
-
SQL と DataFrame API をサポートしています
- "DataFrame" については pandas や Apache Spark のものと類似しています
- TableProvider トレイトを通じて独自のデータソースとして拡張可能です
- kuqu ではこの TableProvider を実装し、Kubernetes リソースを SQL テーブルとして扱うようにしています
kuqu の内部実装
以降のセクションでは、kuqu の内部実装について詳しく説明します。
テーブルの定義
まず kuqu では Kubernetes リソースに対応するテーブルを以下のように定義しています
SELECT * FROM ${KUBERNETES_RESOURCE}/${KUBERNETES_NAMESPACE}
-
${KUBERNETES_RESOURCE}- リソースの指定
- e.g.
pod,node,service - Custom Resource Definition もサポートしています
- e.g.
- リソースのフォーマットは以下のように指定できます
-
deployments(基本的なリソース) -
deployment(単一のリソース) -
deploy(短縮形) -
deployments.apps(API グループを含むリソース)
-
- リソースの指定
-
${KUBERNETES_NAMESPACE}- Kubernetes namespace の指定
- e.g.
kube-system,default - 指定しない場合は以下のフローで決定されます
- kubernetes context で指定された namespace
-
defaultnamespace
- e.g.
-
nodeのような cluster-wide のリソースに対する namespace の指定は無視されます
- Kubernetes namespace の指定
スキーマの推論
kuqu は Kubernetes の /openapi/v3 エンドポイントを使用する代わりに、実際のリソースデータからスキーマを動的に推論します。
この設計判断の理由は、OpenAPI 仕様では additionalProperties=true を持つフィールド(metadata.labels や metadata.annotations など)の動的に決定されるキーに対するスキーマを事前に確定できないためです。
スキーマの推論については Arrow の infer_json_schema を利用できます。
以下のコードは kuqu でのスキーマ推論の実装から抜粋しています
async fn infer_schema(ndjson: &str) -> DataFusionResult<SchemaRef> {
infer_json_schema(&mut Cursor::new(ndjson.as_bytes()), None)
.map(|(schema, _)| Arc::new(schema))
.map_err(|e| DataFusionError::External(Box::new(e)))
}
ただし、スキーマはクエリ実行時に推論されるため、リソース数が多い場合や複雑な構造を持つリソースが存在する場合、クエリに時間がかかる可能性があります(スキーマ推論に使用するリソース数を制限する仕組みを検討中です)。
テーブルの動的作成
SQL クエリを実行するためのテーブルの作成には、DataFusion の TableProvider トレイトを実装する必要があります、と述べました。
しかし、正確には TableProvider トレイトだけでは不十分です。というのも TableProvider トレイトはクエリに依存しないテーブルの定義を提供するためのもので kuqu ではクエリに応じた適切なテーブルを動的に生成する必要があるためです。
具体的には SELECT * FROM pod/kube-system と実行されたタイミングで kube-system namespace の pod リソースを取得し、テーブルを生成する必要があります。
調査したところ、DataFusion ではこのような動的なテーブルの生成を実現するための手段が用意されていました。UrlTableFactory のトレイトです。
UrlTableFactory の実装
実装するメソッドは try_new です。このメソッドは url: &str を受け取り TableProvider を返す非同期関数です。
async fn try_new(&self, url: &str) -> DataFusionResult<Option<Arc<dyn TableProvider>>> {}
url とは具体的にはクエリ時の FROM や JOIN 句で指定された変数が渡されます。例えば SELECT * FROM pod/kube-system の場合、url は pod/kube-system となります。kuqu では先ほどのルールに従ってリソースと namespace を分解し、Kubernetes API を通じてリソースを取得します。
以下は kuqu における UrlTableFactory の実装を抜粋したコード片です。
#[async_trait]
impl UrlTableFactory for KubernetesTableProviderFactory {
/// Try to create a table provider from a Kubernetes URL
async fn try_new(&self, url: &str) -> DataFusionResult<Option<Arc<dyn TableProvider>>> {
let kubeurl =
KubernetesUrl::parse(url, &self.context, &self.api_resources)?;
let object_list = self
.list_api_resources(&kubeurl.resource, &kubeurl.namespace)
.await?;
let ndjson = object_list
.items
.iter()
.map(|item| serde_json::json!(item).to_string())
.collect::<Vec<_>>()
.join("\n");
let schema = infer_schema(&ndjson).await?;
Ok(Some(Arc::new(KubernetesTableProvider::new(
schema,
Arc::new(ndjson),
))))
}
}
以降のコードでは Custom Table Provider のドキュメントに倣って TableProvider の実装 => ExecutionPlan の実装と続きます。
TableProvider の実装
TableProvider トレイトを実装することで、DataFusion は KubernetesTableProvider をテーブルとして認識し、クエリの実行時に適切なスキーマとデータを提供します。
ここで重要なのが scan メソッドです。scan メソッドで渡される projection 引数を使用して、クエリで指定されたカラムのみを選択することができます。
以下は kuqu における TableProvider の実装を抜粋したコード片です。
#[async_trait]
impl TableProvider for KubernetesTableProvider {
async fn scan(
&self,
projection: Option<&Vec<usize>>,
...
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
let projected_schema = if let Some(proj) = projection {
match self.schema.project(proj) {
Ok(projected_schema) => Arc::new(projected_schema),
Err(_) => self.schema.clone(),
}
} else {
self.schema.clone()
};
Ok(Arc::new(KubernetesExec::new(
projected_schema,
self.ndjson.clone(),
)))
}
ExecutionPlan の実装
最後に、ExecutionPlan トレイトを実装することで、DataFusion は KubernetesExec を実行計画として認識し、クエリの実行時にデータを取得します。
...といいつつ kuqu ではスキーマ推論のため UrlTableFactory のフェーズですでにデータを取得しているため、ここではデータの取得は行いません。
したがって ExecutionPlan では UrlTableFactory で取得した NDJSON を RecordBatch に変換する処理のみを実装します。
変換に関する実装が知りたい場合は kuqu (v0.1.0) 時点でのコードを参照してください。
クエリの実行
先ほど実装したクエリの実行は以下のように行います(実際のコードから抜粋)。
let factory = Arc::new(KubernetesTableProviderFactory::new(
client,
context,
api_resources,
));
let ctx = SessionContext::new();
let catalog_list = Arc::new(DynamicFileCatalog::new(
Arc::clone(ctx.state().catalog_list()),
factory as Arc<dyn UrlTableFactory>,
));
let ctx: SessionContext = ctx
.into_state_builder()
.with_catalog_list(catalog_list)
.build()
.into();
let df = ctx.sql(&args.query).await?;
df.show().await?;
Factory を with_catalog_list メソッドを通じて SessionContext に登録します。これにより、DataFusion はクエリ実行時に UrlTableFactory を使用して動的にテーブルを生成できるようになります。ctx.sql メソッドを使用して SQL クエリを実行し、結果を DataFrame として取得します。さらに DataFrame の show メソッドを呼び出すことで、クエリ結果をコンソールに表示します。
まとめ
kuqu における DataFusion の活用方法を紹介しました。DataFusion の機能を利用することで、Kubernetes リソースを SQL でクエリするための柔軟なテーブル定義と動的スキーマ推論を実現しています。
今後としては REPL 機能の追加を検討しています。REPL 機能を追加することで、ユーザーは対話的にクエリを実行し、結果を確認しながら操作できるようになります。
Discussion