🔎

ElasticsearchのComposite aggregationをREST High Level Clientで利用する

2021/06/10に公開

ElasticsearchにはComposite aggregationという機能があります。

この機能は、複数の集計結果(bucket)を取り扱う機能ですが、重要な特徴として、集計結果を「スクロール」できる点が挙げられます。

Elasticsearchでは一度に取得可能な件数(max_result_window)に上限が決められており、それ以上の結果を取得したい場合は、繰り返しリクエストを行って全件を取得する必要があります。

Elasticsearchのデフォルトでは、10,000件を超える結果の取得はできません。例えば100万件のデータを集計して、その結果が2万件になった場合、最初の1万件までしか取得できません。

この問題を解決してくれるのがComposite aggregationです。

なお、余談ですが通常の検索で10,000件以上取得したい時は、Scroll APIまたはsearch_afterを使います。

環境の準備

以前の記事で作成したのと同様に、DockerにElasticsearchとKibanaを立てて利用します。
https://zenn.dev/ryo511/articles/176ffe0571c3c7

また、本記事で使用したコードは下記リポジトリでも公開しているので、実際に動かして試してみたい方はこちらをご利用ください。
https://github.com/ryo-utsunomiya/elasticsearch-composite-aggregation-java

使用しているソフトウェアのバージョン

  • Elasticsearch, Kibana, REST High Level Client: 7.10.1
  • Java 11(AdoptOpenJDK 11.0.11+9)

Composite aggregationを試してみる

インデックスは以下のような定義(Kibanaで実行可能なクエリ)で、購入した製品(product)のIDと店舗(shop)のIDが格納されているとします。

PUT /product
{
  "mappings": {
    "properties": {
      "productId": {
        "type": "long"
      },
      "shopId": {
        "type": "long"
      }
    }
  }
}

簡単にテストデータを作ります。

POST /product/_doc
{
  "productId": 1,
  "shopId": 1
}
POST /product/_doc
{
  "productId": 2,
  "shopId": 1
}
POST /product/_doc
{
  "productId": 3,
  "shopId": 1
}
POST /product/_doc
{
  "productId": 1,
  "shopId": 2
}
POST /product/_doc
{
  "productId": 3,
  "shopId": 2
}
POST /product/_doc
{
  "productId": 3,
  "shopId": 3
}

次に、店舗IDを集計するクエリを書いてみます。

GET /product/_search
{
  "size": 0,
  "aggs": {
    "purchasedShops": {
      "composite": {
        "sources": [
          {
            "shopId": {
              "terms": {
                "field": "shopId"
              }
            }
          }
        ]
      }
    }
  }
}

"size": 0 としているのは、通常の検索結果は不要なためです。必要なのは "aggs" の方で、こちらには purchasedShops と名付けたaggregationを定義し、その種別をCompositeにしています。sourcesの下にはいくつもaggregationを定義できます。これがCompositeの特徴の一つですが、今回は10,000件以上を取得する方法に焦点を当てたいので、aggregationの定義は1つだけとします。

このクエリを実行すると、以下のような結果が得られます。

"aggregations" : {
    "purchasedShops" : {
      "after_key" : {
        "shopId" : 3
      },
      "buckets" : [
        {
          "key" : {
            "shopId" : 1
          },
          "doc_count" : 3
        },
        {
          "key" : {
            "shopId" : 2
          },
          "doc_count" : 2
        },
        {
          "key" : {
            "shopId" : 3
          },
          "doc_count" : 1
        }
      ]
    }
  }

bucketsの下に集計結果が含まれていて、shopId: 1で3件、shopId: 2で2件、shopId: 1で1件の購入があったことがわかります。注目して欲しいのが after_key というフィールドです。このフィールドの値を次回リクエスト時に使うことで、続きを取得することができます。

試しにやってみましょう。

GET /product/_search
{
  "size": 0,
  "aggs": {
    "purchasedShops": {
      "composite": {
        "after": {
          "shopId": 3
        }, 
        "sources": [
          {
            "shopId": {
              "terms": {
                "field": "shopId"
              }
            }
          }
        ]
      }
    }
  }
}

すでに全件取得してしまっているので、結果は0件になります。

  "aggregations" : {
    "purchasedShops" : {
      "buckets" : [ ]
    }
  }

では、次に大量のデータを登録して、結果が10,000件を超えた場合に after_key を使って続きを取得する方法を見ていきます。

大量データの登録はKibanaを使うと面倒なので、Javaでプログラムを組みます。こんな感じのメソッドを用意すれば10万件のランダムなデータを登録できます(コードの全体はサンプルのGitHubリポジトリを参照してください)。

  void loadData() throws JsonProcessingException, InterruptedException {
    var random = new Random();
    var objectMapper = new ObjectMapper();
    var bulkProcessor = createBulkProcessor(client);

    for (int i = 0; i < 100_000; i++) {
      var product = new Product();
      product.setProductId(random.nextInt(100_000));
      product.setShopId(random.nextInt(100_000));

      var request = new IndexRequest("product");
      request.source(objectMapper.writeValueAsBytes(product), XContentType.JSON);
      bulkProcessor.add(request);
    }
    bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
  }

改めてクエリを実行してみます。今度はcomposite aggregationのsizeを10000にします。

GET /product/_search
{
  "size": 0,
  "aggs": {
    "purchasedShops": {
      "composite": {
        "size": 10000,
        "sources": [
          {
            "shopId": {
              "terms": {
                "field": "shopId"
              }
            }
          }
        ]
      }
    }
  }
}

結果は以下のようになり、shopId: 15699の結果までが取得できました。

  "aggregations" : {
    "purchasedShops" : {
      "after_key" : {
        "shopId" : 15699
      },
      "buckets" : [
        {
          "key" : {
            "shopId" : 1
          },
          "doc_count" : 3
        },
...
        {
          "key" : {
            "shopId" : 15699
          },
          "doc_count" : 1
        }

after_keyを使って、続きを取得してみましょう。

GET /product/_search
{
  "size": 0,
  "aggs": {
    "purchasedShops": {
      "composite": {
        "after": {
          "shopId": 15699
        },
        "size": 10000,
        "sources": [
          {
            "shopId": {
              "terms": {
                "field": "shopId"
              }
            }
          }
        ]
      }
    }
  }
}

さらに1万件取得できました。

  "aggregations" : {
    "purchasedShops" : {
      "after_key" : {
        "shopId" : 31484
      },
      "buckets" : [
        {
          "key" : {
            "shopId" : 15701
          },
          "doc_count" : 1
        },
...
        {
          "key" : {
            "shopId" : 31484
          },
          "doc_count" : 3
        }

と、ここまでが前置き。ここからは、同様の処理をJVM向けのクライアントライブラリ(REST High Level Client)で実装していきます。

REST High Level Clientによる実装

Composite aggregationを使って、全てのユニークな店舗IDを取得するメソッドは以下のように書けます。

  Set<Integer> getAllShopIdSet() throws IOException {
    // aggregation クエリの組み立て
    final String aggName = "purchasedShops";
    final String fieldName = "shopId";
    final int aggregationSize = 10_000;
    CompositeAggregationBuilder aggregationBuilder = AggregationBuilders
        .composite(aggName, List.of(new TermsValuesSourceBuilder(fieldName).field(fieldName)))
        .size(aggregationSize);

    var result = new HashSet<Integer>();
    Map<String, Object> afterKey = null; // afterKey は初回リクエスト時にはnull
    CompositeAggregation aggregation;

    // レスポンスに含まれる結果がaggregationのサイズを下回るまで、繰り返し取得する
    do {
      var request = new SearchRequest("product")
          .source(
              SearchSourceBuilder.searchSource()
                  .size(0)
                  .aggregation(aggregationBuilder.aggregateAfter(afterKey))
          );
      SearchResponse response = client.search(request, RequestOptions.DEFAULT);
      aggregation = response.getAggregations().get(aggName);
      afterKey = aggregation.afterKey(); // afterKey は次のリクエストで使う
      result.addAll(
          aggregation.getBuckets()
              .stream()
              .map(bucket -> Integer.parseInt(bucket.getKey().get(fieldName).toString()))
              .collect(Collectors.toSet())
      );
    } while (aggregation.getBuckets().size() >= aggregationSize);

    return result;
  }

要所をいくつか解説します。まずは AggregationBuilders.composite 。このメソッドはなぜかドキュメントには載ってないですが、javadocを見れば雰囲気はわかると思います。

    CompositeAggregationBuilder aggregationBuilder = AggregationBuilders
        .composite(aggName, List.of(new TermsValuesSourceBuilder(fieldName).field(fieldName)))
        .size(aggregationSize);

あとはsearchメソッドを使えばOKですが、繰り返し取得を行う必要があるため、何かしらの手段で次回のリクエストを行うか判定する必要があります。SearchResponseには hasNext のようなわかりやすいフラグはないので、レスポンスの件数を要求した件数と比べるのがよいと思います。また、「最低1回ループする」という実装にはdo whileが向いているでしょう。

    do {
      SearchResponse response = client.search(...);
      aggregation = response.getAggregations().get(aggName);
      ...
    } while (aggregation.getBuckets().size() >= aggregationSize);

むすび

以上、ElasticsearchのComposite Aggregationを使って1万件以上の集計結果を取得する方法と、Composite aggregationをREST High Level Clientで使う方法を解説しました。

Discussion