🎓

ElasticsearchでもSQL書きたい! Opendistro for Elasticsearch SQL入門

12 min read

本記事は ZOZO テクノロジーズ #2 Advent Calendar 2020 の 19 日目の記事です

みなさん、SQL は好きですか。

Elasticsearch 書学者としての第一関門として、独自のクエリ構文に慣れないというケースはあるあるかと思います。また、Elasticsearch のクエリは書けないけど、SQL なら書けるという方も大勢いるかと思います。

本投稿では、そんな方々のために Elasticsearch に対して SQL を発行し、データの読み取りを行う方法をご紹介いたします。
なお、今回の検証では、Kibana 起動時にデフォルトでセットアップ可能なSample eCommerce ordersデータを用います。

今回の検証では、Apache License で利用可能なOpen Distro for Elasticsearch SQL(以下、odfe)を用います。ローカルでのクラスタの立ち上げには docker-compose を用いますが、詳細は同期の@ke-boが書いたこちらの記事をご覧ください。

Kibana 上から SQL を叩いてみる

odfe プラグインが適用された Kibana を起動すると、サイドバーから Query Workbench が選択可能です。
kibana

Workbench を開くと、早速クエリを入力する画面が表示されます。
workbench

初期値で入力されているクエリをRUNを押して実行すると、クラスタ内全てのインデックスがテーブル形式で取得できます。
index_list

また、横にあるExplainボタンを押すと、内部でどの様なクエリに変換されているかが確認可能です。
試しに以下のクエリを変換してみます。

SELECT * FROM kibana_sample_data_ecommerce;

explain

実践

ここからは実際の操作を想定したクエリと、どの様に解釈されているかを実際に実行しながら確かめてみます。

order_id 降順で Top10 をとる

SELECT * FROM kibana_sample_data_ecommerce ORDER BY order_id DESC LIMIT 10;
{
  "from": 0,
  "size": 10,
  "sort": [
    {
      "order_id": {
        "order": "desc"
      }
    }
  ]
}

top10

customer_first_name を重複排除して取得

SELECT DISTINCT customer_first_name from kibana_sample_data_ecommerce;
{
  "from": 0,
  "size": 0,
  "_source": {
    "includes": ["customer_first_name"],
    "excludes": []
  },
  "stored_fields": "customer_first_name",
  "aggregations": {
    "customer_first_name": {
      "terms": {
        "field": "customer_first_name",
        "size": 200,
        "min_doc_count": 1,
        "shard_min_doc_count": 0,
        "show_term_doc_count_error": false,
        "order": [
          {
            "_count": "desc"
          },
          {
            "_key": "asc"
          }
        ]
      }
    }
  }
}

これは実際に実行すると、customer_first_name が text type のため怒られてしまいますね。正しくは以下のクエリとなりそうです。

{
  "from": 0,
  "size": 0,
  "_source": {
    "includes": ["customer_first_name"],
    "excludes": []
  },
  "stored_fields": "customer_first_name",
  "aggregations": {
    "customer_first_name": {
      "terms": {
        "field": "customer_first_name.keyword",
        "size": 200,
        "min_doc_count": 1,
        "shard_min_doc_count": 0,
        "show_term_doc_count_error": false,
        "order": [
          {
            "_count": "desc"
          },
          {
            "_key": "asc"
          }
        ]
      }
    }
  }
}

SQL で明示的に keyword を指定することでも対応可能でした。

SELECT DISTINCT customer_first_name.keyword from kibana_sample_data_ecommerce;

distinct

taxless_total_price が 100 以上 200 以下のレコードを取得

SELECT * FROM kibana_sample_data_ecommerce WHERE taxless_total_price BETWEEN 100 AND 200;
{
  "from": 0,
  "size": 200,
  "query": {
    "bool": {
      "filter": [
        {
          "bool": {
            "must": [
              {
                "range": {
                  "taxless_total_price": {
                    "from": 100,
                    "to": 200,
                    "include_lower": true,
                    "include_upper": true,
                    "boost": 1
                  }
                }
              }
            ],
            "adjust_pure_negative": true,
            "boost": 1
          }
        }
      ],
      "adjust_pure_negative": true,
      "boost": 1
    }
  }
}

where

顧客名ごとに注文件数を取得

この辺りの複雑なところから、workbench 側で対応仕切れずエラーになり始めます。が、Explain ででてくるクエリを Dev Tools 上で投げることで結果の取得ができますが、これらも意識して keyword 型を指定することで回避可能です。(ちょっと SQL らしさは薄れてきますね)

SELECT COUNT(customer_full_name.keyword), customer_full_name.keyword FROM kibana_sample_data_ecommerce GROUP BY customer_full_name.keyword;
{
  "from": 0,
  "size": 0,
  "_source": {
    "includes": ["customer_full_name.keyword", "COUNT"],
    "excludes": []
  },
  "stored_fields": "customer_full_name.keyword",
  "aggregations": {
    "customer_full_name#keyword": {
      "terms": {
        "field": "customer_full_name.keyword",
        "size": 200,
        "min_doc_count": 1,
        "shard_min_doc_count": 0,
        "show_term_doc_count_error": false,
        "order": [
          {
            "_count": "desc"
          },
          {
            "_key": "asc"
          }
        ]
      },
      "aggregations": {
        "COUNT_0": {
          "value_count": {
            "field": "customer_full_name.keyword"
          }
        }
      }
    }
  }
}

grpup by

Aggregation を使って GROUP BY を再現しているので、降順に並んでいるのが面白いですね。

さらに複雑なクエリ構文へ・・・

ここからは JOIN を使ってみます。実はこれらの機能は執筆時点で ElasticLicense 下の SQL 機能には実装されておらず、odfe の SQL プラグイン独自の機能となります。

JOIN

データの準備

JOIN 先のテーブルを作成するため、以下の定義でインデックスを作成します。

PUT /ecommerce_customer_info
{
  "mappings": {
    "properties": {
      "customer_id": {
        "type": "integer",
        "fields": {
          "keyword": {
            "type": "keyword"
          }
        }
      },
      "customer_full_name": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword"
          }
        }
      }
    }
  }
}

仮でいくつかデータを投入します。

POST /_bulk
{"index": {"_index": "ecommerce_customer_info"}}
{"customer_id": 1, "customer_full_name": "Eddie Underwood"}
{"index": {"_index": "ecommerce_customer_info"}}
{"customer_id": 2, "customer_full_name": "Mary Bailey"}
{"index": {"_index": "ecommerce_customer_info"}}
{"customer_id": 3, "customer_full_name": "Gwen Butler"}
{"index": {"_index": "ecommerce_customer_info"}}
{"customer_id": 4, "customer_full_name": "Diane Chandler"}

クエリの実行

ここから先は現在の Query Workbench では対応してくれなかったので、Dev Tools から実行してみます。

SELECT e.customer_id
FROM kibana_sample_data_ecommerce k
INNER JOIN ecommerce_customer_info e
  ON k.customer_full_name = e.customer_full_name
リクエスト
POST _opendistro/_sql
{
  "query": """
  SELECT e.customer_id
  FROM kibana_sample_data_ecommerce k
  INNER JOIN ecommerce_customer_info e
    ON k.customer_full_name = e.customer_full_name
  """
}
返却値
{
  "schema": [{
    "name": "e.customer_id",
    "type": "keyword"
  }],
  "total": 8,
  "datarows": [
    [1],
    [2],
    [2],
    [2],
    [3],
    [3],
    [4],
    [4]
  ],
  "size": 8,
  "status": 200
}

Explain API を用いて実際どの様なリクエストになっているかみてみましょう

リクエスト
POST _opendistro/_sql/_explain
{
  "query": """
  SELECT e.customer_id
  FROM kibana_sample_data_ecommerce k
  INNER JOIN ecommerce_customer_info e
    ON k.customer_full_name = e.customer_full_name
  """
}
返却値
{
  "Physical Plan" : {
    "Project [ columns=[e.customer_id] ]" : {
      "Top [ count=200 ]" : {
        "BlockHashJoin[ conditions=( k.customer_full_name = e.customer_full_name ), type=INNER_JOIN, blockSize=[FixedBlockSize with size=10000] ]" : {
          "Scroll [ kibana_sample_data_ecommerce as k, pageSize=10000 ]" : {
            "request" : {
              "size" : 200,
              "from" : 0
            }
          },
          "useTermsFilterOptimization" : false,
          "Scroll [ ecommerce_customer_info as e, pageSize=10000 ]" : {
            "request" : {
              "size" : 200,
              "from" : 0,
              "_source" : {
                "excludes" : [ ],
                "includes" : [
                  "customer_id",
                  "customer_full_name"
                ]
              }
            }
          }
        }
      }
    }
  },
  "description" : "Hash Join algorithm builds hash table based on result of first query, and then probes hash table to find matched rows for each row returned by second query",
  "Logical Plan" : {
    "Project [ columns=[e.customer_id] ]" : {
      "Top [ count=200 ]" : {
        "Join [ conditions=( k.customer_full_name = e.customer_full_name ) type=INNER_JOIN ]" : {
          "Group" : [
            {
              "Project [ columns=[k.customer_full_name] ]" : {
                "TableScan" : {
                  "tableAlias" : "k",
                  "tableName" : "kibana_sample_data_ecommerce"
                }
              }
            },
            {
              "Project [ columns=[e.customer_full_name, e.customer_id] ]" : {
                "TableScan" : {
                  "tableAlias" : "e",
                  "tableName" : "ecommerce_customer_info"
                }
              }
            }
          ]
        }
      }
    }
  }
}

Elasticsearch のクエリでない何かがでてきました。公式のドキュメントを参照すると、以下の様な記述があります。

Inner join creates a new result set by combining columns of two indices based on your join predicates. It iterates the two indices and compares each document to find the ones that satisfy the join predicates. You can optionally precede the JOIN clause with an INNER keyword.

ざっくり訳すと、 Inner Join の結果を作るために、2 つのインデックスの中から join のキーになるカラムを走査して照らし合わせている様です。すごい。

全文検索

ここまでは一般的な SQL のクエリ構文をみてきましたが、odfe の SQL プラグインでは Match や Match Phrase 等、一部のクエリにも対応しています。例を見てみましょう。

名前が smith にマッチする顧客のレコードを取得

SELECT * FROM kibana_sample_data_ecommerce WHERE MATCH_QUERY(customer_full_name, 'smith');
{
  "from": 0,
  "size": 200,
  "query": {
    "bool": {
      "filter": [
        {
          "bool": {
            "must": [
              {
                "match": {
                  "customer_full_name": {
                    "query": "smith",
                    "operator": "OR",
                    "prefix_length": 0,
                    "max_expansions": 50,
                    "fuzzy_transpositions": true,
                    "lenient": false,
                    "zero_terms_query": "NONE",
                    "auto_generate_synonyms_phrase_query": true,
                    "boost": 1
                  }
                }
              }
            ],
            "adjust_pure_negative": true,
            "boost": 1
          }
        }
      ],
      "adjust_pure_negative": true,
      "boost": 1
    }
  }
}

match_result
見事、customer_full_name に smith が含まれるレコードを取得できました。

最後に

本投稿では、Open Distro for Elasticseach の SQL プラグインで利用可能な機能を一部紹介いたしました。
実際に使ってみると、SQL でリクエストが可能だけではなく、そのクエリがどう変換されるかをみることでより Elasticsearch のクエリへの理解を深めることができました。

参考

公式ドキュメント
Docker を使用してマルチノードで Open Distro for Elasticsearch のローカル開発環境を構築