ElasticsearchでもSQL書きたい! Opendistro for Elasticsearch SQL入門
みなさん、SQL は好きですか。
Elasticsearch 書学者としての第一関門として、独自のクエリ構文に慣れないというケースはあるあるかと思います。また、Elasticsearch のクエリは書けないけど、SQL なら書けるという方も大勢いるかと思います。
本投稿では、そんな方々のために Elasticsearch に対して SQL を発行し、データの読み取りを行う方法をご紹介いたします。
なお、今回の検証では、Kibana 起動時にデフォルトでセットアップ可能なSample eCommerce ordersデータを用います。
今回の検証では、Apache License で利用可能なOpen Distro for Elasticsearch SQL(以下、odfe)を用います。ローカルでのクラスタの立ち上げには docker-compose を用いますが、詳細は同期の@ke-boが書いたこちらの記事をご覧ください。
Kibana 上から SQL を叩いてみる
odfe プラグインが適用された Kibana を起動すると、サイドバーから Query Workbench が選択可能です。
Workbench を開くと、早速クエリを入力する画面が表示されます。
初期値で入力されているクエリをRUN
を押して実行すると、クラスタ内全てのインデックスがテーブル形式で取得できます。
また、横にあるExplain
ボタンを押すと、内部でどの様なクエリに変換されているかが確認可能です。
試しに以下のクエリを変換してみます。
SELECT * FROM kibana_sample_data_ecommerce;
実践
ここからは実際の操作を想定したクエリと、どの様に解釈されているかを実際に実行しながら確かめてみます。
order_id 降順で Top10 をとる
SELECT * FROM kibana_sample_data_ecommerce ORDER BY order_id DESC LIMIT 10;
{
"from": 0,
"size": 10,
"sort": [
{
"order_id": {
"order": "desc"
}
}
]
}
customer_first_name を重複排除して取得
SELECT DISTINCT customer_first_name from kibana_sample_data_ecommerce;
{
"from": 0,
"size": 0,
"_source": {
"includes": ["customer_first_name"],
"excludes": []
},
"stored_fields": "customer_first_name",
"aggregations": {
"customer_first_name": {
"terms": {
"field": "customer_first_name",
"size": 200,
"min_doc_count": 1,
"shard_min_doc_count": 0,
"show_term_doc_count_error": false,
"order": [
{
"_count": "desc"
},
{
"_key": "asc"
}
]
}
}
}
}
これは実際に実行すると、customer_first_name が text type のため怒られてしまいますね。正しくは以下のクエリとなりそうです。
{
"from": 0,
"size": 0,
"_source": {
"includes": ["customer_first_name"],
"excludes": []
},
"stored_fields": "customer_first_name",
"aggregations": {
"customer_first_name": {
"terms": {
"field": "customer_first_name.keyword",
"size": 200,
"min_doc_count": 1,
"shard_min_doc_count": 0,
"show_term_doc_count_error": false,
"order": [
{
"_count": "desc"
},
{
"_key": "asc"
}
]
}
}
}
}
SQL で明示的に keyword を指定することでも対応可能でした。
SELECT DISTINCT customer_first_name.keyword from kibana_sample_data_ecommerce;
taxless_total_price が 100 以上 200 以下のレコードを取得
SELECT * FROM kibana_sample_data_ecommerce WHERE taxless_total_price BETWEEN 100 AND 200;
{
"from": 0,
"size": 200,
"query": {
"bool": {
"filter": [
{
"bool": {
"must": [
{
"range": {
"taxless_total_price": {
"from": 100,
"to": 200,
"include_lower": true,
"include_upper": true,
"boost": 1
}
}
}
],
"adjust_pure_negative": true,
"boost": 1
}
}
],
"adjust_pure_negative": true,
"boost": 1
}
}
}
顧客名ごとに注文件数を取得
この辺りの複雑なところから、workbench 側で対応仕切れずエラーになり始めます。が、Explain ででてくるクエリを Dev Tools 上で投げることで結果の取得ができますが、これらも意識して keyword 型を指定することで回避可能です。(ちょっと SQL らしさは薄れてきますね)
SELECT COUNT(customer_full_name.keyword), customer_full_name.keyword FROM kibana_sample_data_ecommerce GROUP BY customer_full_name.keyword;
{
"from": 0,
"size": 0,
"_source": {
"includes": ["customer_full_name.keyword", "COUNT"],
"excludes": []
},
"stored_fields": "customer_full_name.keyword",
"aggregations": {
"customer_full_name#keyword": {
"terms": {
"field": "customer_full_name.keyword",
"size": 200,
"min_doc_count": 1,
"shard_min_doc_count": 0,
"show_term_doc_count_error": false,
"order": [
{
"_count": "desc"
},
{
"_key": "asc"
}
]
},
"aggregations": {
"COUNT_0": {
"value_count": {
"field": "customer_full_name.keyword"
}
}
}
}
}
}
Aggregation を使って GROUP BY を再現しているので、降順に並んでいるのが面白いですね。
さらに複雑なクエリ構文へ・・・
ここからは JOIN を使ってみます。実はこれらの機能は執筆時点で ElasticLicense 下の SQL 機能には実装されておらず、odfe の SQL プラグイン独自の機能となります。
JOIN
データの準備
JOIN 先のテーブルを作成するため、以下の定義でインデックスを作成します。
PUT /ecommerce_customer_info
{
"mappings": {
"properties": {
"customer_id": {
"type": "integer",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"customer_full_name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
}
}
}
}
仮でいくつかデータを投入します。
POST /_bulk
{"index": {"_index": "ecommerce_customer_info"}}
{"customer_id": 1, "customer_full_name": "Eddie Underwood"}
{"index": {"_index": "ecommerce_customer_info"}}
{"customer_id": 2, "customer_full_name": "Mary Bailey"}
{"index": {"_index": "ecommerce_customer_info"}}
{"customer_id": 3, "customer_full_name": "Gwen Butler"}
{"index": {"_index": "ecommerce_customer_info"}}
{"customer_id": 4, "customer_full_name": "Diane Chandler"}
クエリの実行
ここから先は現在の Query Workbench では対応してくれなかったので、Dev Tools から実行してみます。
SELECT e.customer_id
FROM kibana_sample_data_ecommerce k
INNER JOIN ecommerce_customer_info e
ON k.customer_full_name = e.customer_full_name
POST _opendistro/_sql
{
"query": """
SELECT e.customer_id
FROM kibana_sample_data_ecommerce k
INNER JOIN ecommerce_customer_info e
ON k.customer_full_name = e.customer_full_name
"""
}
{
"schema": [{
"name": "e.customer_id",
"type": "keyword"
}],
"total": 8,
"datarows": [
[1],
[2],
[2],
[2],
[3],
[3],
[4],
[4]
],
"size": 8,
"status": 200
}
Explain API を用いて実際どの様なリクエストになっているかみてみましょう
POST _opendistro/_sql/_explain
{
"query": """
SELECT e.customer_id
FROM kibana_sample_data_ecommerce k
INNER JOIN ecommerce_customer_info e
ON k.customer_full_name = e.customer_full_name
"""
}
{
"Physical Plan" : {
"Project [ columns=[e.customer_id] ]" : {
"Top [ count=200 ]" : {
"BlockHashJoin[ conditions=( k.customer_full_name = e.customer_full_name ), type=INNER_JOIN, blockSize=[FixedBlockSize with size=10000] ]" : {
"Scroll [ kibana_sample_data_ecommerce as k, pageSize=10000 ]" : {
"request" : {
"size" : 200,
"from" : 0
}
},
"useTermsFilterOptimization" : false,
"Scroll [ ecommerce_customer_info as e, pageSize=10000 ]" : {
"request" : {
"size" : 200,
"from" : 0,
"_source" : {
"excludes" : [ ],
"includes" : [
"customer_id",
"customer_full_name"
]
}
}
}
}
}
}
},
"description" : "Hash Join algorithm builds hash table based on result of first query, and then probes hash table to find matched rows for each row returned by second query",
"Logical Plan" : {
"Project [ columns=[e.customer_id] ]" : {
"Top [ count=200 ]" : {
"Join [ conditions=( k.customer_full_name = e.customer_full_name ) type=INNER_JOIN ]" : {
"Group" : [
{
"Project [ columns=[k.customer_full_name] ]" : {
"TableScan" : {
"tableAlias" : "k",
"tableName" : "kibana_sample_data_ecommerce"
}
}
},
{
"Project [ columns=[e.customer_full_name, e.customer_id] ]" : {
"TableScan" : {
"tableAlias" : "e",
"tableName" : "ecommerce_customer_info"
}
}
}
]
}
}
}
}
}
Elasticsearch のクエリでない何かがでてきました。公式のドキュメントを参照すると、以下の様な記述があります。
Inner join creates a new result set by combining columns of two indices based on your join predicates. It iterates the two indices and compares each document to find the ones that satisfy the join predicates. You can optionally precede the JOIN clause with an INNER keyword.
ざっくり訳すと、 Inner Join の結果を作るために、2 つのインデックスの中から join のキーになるカラムを走査して照らし合わせている様です。すごい。
全文検索
ここまでは一般的な SQL のクエリ構文をみてきましたが、odfe の SQL プラグインでは Match や Match Phrase 等、一部のクエリにも対応しています。例を見てみましょう。
名前が smith にマッチする顧客のレコードを取得
SELECT * FROM kibana_sample_data_ecommerce WHERE MATCH_QUERY(customer_full_name, 'smith');
{
"from": 0,
"size": 200,
"query": {
"bool": {
"filter": [
{
"bool": {
"must": [
{
"match": {
"customer_full_name": {
"query": "smith",
"operator": "OR",
"prefix_length": 0,
"max_expansions": 50,
"fuzzy_transpositions": true,
"lenient": false,
"zero_terms_query": "NONE",
"auto_generate_synonyms_phrase_query": true,
"boost": 1
}
}
}
],
"adjust_pure_negative": true,
"boost": 1
}
}
],
"adjust_pure_negative": true,
"boost": 1
}
}
}
見事、customer_full_name に smith が含まれるレコードを取得できました。
最後に
本投稿では、Open Distro for Elasticseach の SQL プラグインで利用可能な機能を一部紹介いたしました。
実際に使ってみると、SQL でリクエストが可能だけではなく、そのクエリがどう変換されるかをみることでより Elasticsearch のクエリへの理解を深めることができました。
参考
公式ドキュメント
Docker を使用してマルチノードで Open Distro for Elasticsearch のローカル開発環境を構築
Discussion