🍍

go-elasticsearchを使った基本のリクエスト

2023/10/24に公開

はじめに

こんにちは!

今働いている会社で 最近 Elasticsearch を使った開発を行っていました。私はこの機会にはじめて Elasticsearch を利用したのですが、Elasticsearch の全体感を把握するのに時間がかかったり、Go 言語 特有のドキュメントがやや足りなかったりして、よく理解できたと思えるまでかなり時間がかかりました。

今回自分が得た知識をきちんと振り返るため、そしてこれから Go 言語 で Elasticsearch を利用した開発を行う方のため、Elasticsearch の概要やgo-elasticsearch基本のコード(index 作成/削除、Bulk API を利用した一括処理、SearchAPI を利用した検索クエリ)を残しておきたいと思います。

今回の記事では、Docker を使って Elasticsearch の環境を構築し、そこに実際にリクエストを送る方法を書きたいと思います。

※リクエスト内容は、ごく一般的なインデックスの作成・削除、インデックスからのデータ登録、データ取得方法などです。

事前準備

以下の DockerCompose ファイルを利用して環境構築をしてください。

version: "3"

services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.10.2
    container_name: elasticsearch
    environment:
      - xpack.security.enabled=false
      - discovery.type=single-node
    ulimits:
      memlock:
        soft: -1
        hard: -1
    ports:
      - 9200:9200

環境構築はElasticSearch の環境を Docker を使って構築してみるを参考にさせていただきました。詳しく Docker 環境について書いてあります。

Docker を立ち上げたら、以下を叩いて health check してみて、疎通できることを確認しておいてください。

curl -X GET "localhost:9200/_cat/health?v&pretty"

Elasticsearch とは

概要

Elasticsearch とは、分散型検索・分析エンジンです。データを決められた大きさに区切って分散し、複数のサーバーに設置するというところがミソで、この技術があることにより、データの信頼性(複数のサーバに同じデータを書き込める(プライマリシャード,レプリカシャードの仕組み))や検索の高速性(リクエスト時に複数のサーバが連携して分散したデータを集める(シャード,インデックス,クラスタの仕組み))に秀でたソフトウェアという位置付けになっています。

ちなみに内部には Apache LuceneというJava製の検索エンジンライブラリが利用されています。

Elasticsearch の概要を理解するのに最も役立ったと思っているのが、公式のWhat is Elasticsearch?です(公式バンザイですね)。大枠を掴むのにとてもわかりやすいので、まだ見ていない方は是非見てください。

用語おさらい

本記事では Elasticsearch の内部の仕組みについてそれほど記載はしませんが、基礎の開発を進めるにあたり押さえておくべき用語がいくつかあるためざっと下記に記載します。
どれも 公式ドキュメントで頻出の用語です。基本的な操作をするにあたりよく使うものだけをまとめました。

Elasticsearch を構成する用語

  • クラスタ
    Elasticsearch 構成要素における最も外側の部分です。1 つ以上のノードの塊がクラスタとなり、このクラスタの中でデータのやり取りが行われます。

  • ノード(サーバ)
    インスタンスにあたる部分です。クラスタ内のノードはそれぞれ、ノード全体管理を請け負う Master Node や、データの保存・集計・検索を行う Data Node などがあります。

  • インデックス
    RDB でいうテーブルのようなイメージで、論理的なグループ群となります。

  • シャード
    1 つのインデックス内に分散するデータの範囲で、かたまりのデータを指します。また、メインで利用するプライマリシャードとは別に、コピーして他のノードに分散して登録されるレプリカシャード等もあり、シャードも役割が色々とあります。1 インデックスに対するシャードの数や、レプリカシャードの数などはユーザーが指定して決めることができます。このあたりは設計が絡んでくるイメージでした。

Elasticsearch の機能に関する用語

  • Index modules
    個々のインデックスを制御する設定群です。モジュールを利用して、様々な設定が行えますが、正直最初はそんなに利用するイメージがありません。

  • Mapping
    インデックスの型を指定することをマッピングと呼んでいます。Elasticsearch は動的なマッピング機能がついており、特にインデックスを指定しなくても、与えたデータに応じて最適なマッピングを当ててくれます。フィールドとしては、全文検索に利用する text をはじめとし、keyword(text と比べると単語単体に近い),Boolean,Binary,Date などのオーソドックスなものから、地理系に特化したものまで多く揃えられています。最初は動的なマッピングでも良いですが、クエリを最適化し始める作業が発生したタイミングで、1 つのデータに複数のフィールドを用意することができるため、検索や分析をしやすい形に静的マッピングしていくことにはなるかなと思います(プロジェクトのやりたいこと次第ですが…)。

  • Query DSL
    Elasticsearch 検索のための、わかりやすい json クエリです。ただ、フィールドによっては使えない指定等があったりと、使い勝手がわかるまでは何度もエラーに出くわすことになると思います。検索だけでなく、ソートやページネーションなども簡単に実現ができます。

  • Bulk API
    一括処理で CRUD 操作ができる API で、インデックス速度を上げることができます。パフォーマンスを考慮する場面で、一気に処理したい時にはよく使います。

検索機能を実装する場合、Mapping と Query DSL については、公式ドキュメントを読み込むことにより、どうしたら自分たちのやりたいことができるかが非常に明確になると思っています。フルテキスト検索系や地理系以外は、あまり専門知識がなくてもスムーズにわかるなと感じました。

go-elasticsearch を使ったリクエストサンプル

簡単なサンプルは公式が出している Go 専用のドキュメントを参照することでわかるためこちらを最初に見てもらうのが良いと思います。

client への接続方法は 2 種類ありまして、以下のように接続できます。
両者の特徴として、すべて TypedClient の方が型があって書きやすいのですが、こちらはドキュメントが少ないです。また、bulk Index は TypedClient がサポートされていません。通常の client は json ベースなので document からコードに落とすのが比較的容易かと思います。

私のおすすめは、基本的には TypedClient を利用するのが良いのかなと思っています!
ドキュメントは少ないですが、規則性に基づいて型化されているため、(特にクエリなどバリエーションが豊富なものは)慣れれば慣れるほど json から型化されたコードに落としやすくなってきます。

func main() {
    // client
    es, err := elasticsearch.NewClient(elasticsearch.Config{
        Addresses: []string{"http://localhost:9200"},
    })
    if err != nil {
        log.Fatalf("Error creating the client: %s", err)
    }

    // typed client
    es, err := elasticsearch.NewTypedClient(elasticsearch.Config{
        Addresses: []string{"http://localhost:9200"},
    })
    if err != nil {
        log.Fatalf("Error creating the client: %s", err)
    }
}

Index Create/Delete

インデックスの作成については、公式が出している Go 専用のドキュメントにもサンプルがあるためさらっと紹介します。


func main() {
	es, err := elasticsearch.NewTypedClient(elasticsearch.Config{
		Addresses: []string{"http://localhost:9200"},
	})
	if err != nil {
		log.Fatalf("Error creating the client: %s", err)
	}

	ignoreAbove := 256
	keywordProperty := types.NewKeywordProperty()
	keywordProperty.IgnoreAbove = &ignoreAbove

	dateProperty := types.NewDateProperty()
	format := "yyyy/MM/dd||yyyy/MM||MM/dd||yyyy||MM||dd"
	dateProperty.Format = &format

	// index作成
	_, err = es.Indices.Create("sample_index").Request(&create.Request{
		Settings: &types.IndexSettings{
			IndexSettings: map[string]json.RawMessage{
				// 設定項目
				// bulk index中のデータ更新感覚。頻繁に更新する必要がなければ長めに設定するとパフォーマンスが上がる
				"refresh_interval": json.RawMessage(`"30s"`),
				// 取得できる最大件数の上限
				"max_result_window": json.RawMessage(`"1000000"`),
			},
		},
		Mappings: &types.TypeMapping{
			Properties: map[string]types.Property{
				// マッピングの定義
				"name":       keywordProperty,
				"age":        types.NewIntegerNumberProperty(),
				"is_checked": types.NewBooleanProperty(),
				"created_at": dateProperty,
			},
		},
	}).Do(context.TODO())
	if err != nil {
		log.Fatalf("Error creating the client: %s", err)
	}

	// index削除
	_, err = es.Indices.Delete("sample_index").Do(context.TODO())
	if err != nil {
		log.Fatalf("Error deleting the client: %s", err)
	}
}

Bulk Index

Bulk Indexing のコードはgo-elasticsearch のサンプルを参考にさせていただきました。

var jsonitier = jsoniter.ConfigCompatibleWithStandardLibrary

type SampleIndexData struct {
	ID        int64  `json:"id"`
	Name      string `json:"name"`
	Age       int    `json:"age"`
	IsChecked bool   `json:"is_checked"`
	CreatedAt string `json:"created_at"`
}

func main() {
	es, err := elasticsearch.NewClient(elasticsearch.Config{
		Addresses: []string{"http://localhost:9200"},
	})
	if err != nil {
		log.Fatalf("Error creating the client: %s", err)
	}

	esRef, err := elasticsearch.NewTypedClient(elasticsearch.Config{
		Addresses: []string{"http://localhost:9200"},
	})
	if err != nil {
		log.Fatalf("Error creating the client: %s", err)
	}

	datas := []*SampleIndexData{}
	for i := 1; i <= 100; i++ {
		datas = append(datas, &SampleIndexData{
			ID:        int64(i),
			Name:      fmt.Sprintf("name_%d", i),
			Age:       20,
			IsChecked: true,
            CreatedAt: time.Date(2021, 1, 15, 17, 28, 55, 0, jst).Format("2006/01/02"),
		})
	}

	bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
		Index:      "sample_index", // The default index name
		Client:     es,             // The Elasticsearch client
		NumWorkers: 1,              // The number of worker goroutines
	})
	if err != nil {
		log.Fatalf("Error creating the indexer: %s", err)
	}

	for _, a := range datas {
		data, err := jsonitier.Marshal(a)
		if err != nil {
			log.Fatalf("Cannot encode article %d: %s", a.ID, err)
		}

		err = bi.Add(
			context.Background(),
			esutil.BulkIndexerItem{
				// deleteの際はactionを"delete", bodyをnilにする
				Action: "index",
				DocumentID: strconv.Itoa(int(a.ID)),
				Body: bytes.NewReader(data),
				OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
					fmt.Println("success")
				},
				OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
					fmt.Println("failure")
				},
			},
		)
		if err != nil {
			log.Fatalf("Unexpected error: %s", err)
		}
	}

	if err := bi.Close(context.Background()); err != nil {
		log.Fatalf("Unexpected error: %s", err)
	}

    // refresh_intervalの値次第ですが、感覚が長い場合はすべてのindexが終わった後にリフレッシュしておくとデータが即時反映されるので良いです
	_, err = esRef.Indices.Refresh().Index("sample_index").Do(context.Background())
	if err != nil {
		log.Fatalf("Error getting response: %s", err)
	}
}

Query

基本のクエリは以下のような感じです。

公式が出している Go 専用のドキュメントには簡単な Search API のみのサンプルがある形です。こちら以上の詳細は基本的に自分で組み立てていくことになります。私の場合は、QueryDSLのページでクエリを確認しつつ、packageで必要なものを再現していました。

var jst = time.FixedZone("Asia/Tokyo", 9*60*60)
var formatTime = "2006-01-02T15:04:05.999999-07:00"

func main() {
	es, err := elasticsearch.NewTypedClient(elasticsearch.Config{
		Addresses: []string{"http://localhost:9200"},
	})
	if err != nil {
		log.Fatalf("Error creating the client: %s", err)
	}

	ageLte := 40.0
	ageLteC := (*types.Float64)(&ageLte)

	ageGte := 13.0
	ageGteC := (*types.Float64)(&ageGte)

	pageStart := 0
	size := 50

	req := &search.Request{
		// クエリを書く
		Query: &types.Query{
			Bool: &types.BoolQuery{
				// フィルタ
				Filter: []types.Query{
					// 範囲フィルタ
					{
						Range: map[string]types.RangeQuery{
							"age": types.NumberRangeQuery{
								Gte: ageGteC,
								Lte: ageLteC,
							},
						},
					},
					// 用語フィルタ
					{
						Term: map[string]types.TermQuery{
							"is_checked": {Value: true},
						},
					},
				},
			},
		},
		// ページのスタート地点
		From: &pageStart,
		// 返す数
		Size: &size,
		// ソート指定
		Sort: []types.SortCombinations{
			types.SortOptions{SortOptions: map[string]types.FieldSort{
				"created_at": {Order: &sortorder.Desc},
			}},
		},
	}
	res, err := es.Search().
		Index("sample_index").
		Request(req).
		Do(context.TODO())

	if err != nil {
		log.Fatalf("Error query: %s", err)
	}

	// total数を出す
	fmt.Println(res.Hits.Total)

	ds := []*SampleIndexData{}
	for _, hit := range res.Hits.Hits {
		var d *SampleIndexData
		if err := json.Unmarshal(hit.Source_, &d); err != nil {
			log.Fatalf("Error decoding: %s", err)
		}
		ds = append(ds, d)
	}

	// データを出す
	fmt.Println(ds)
}

また、クエリが合っているかどうかは、kibana 内の devtools を利用することで簡単に詳細エラーを確認することができます。正しいクエリになるように調整するためには、このあたりのツールを利用してみると良いかもしれません(コード上でいうと err の中にも一応入っていますので、そちらからも確認することができます)。

Elasticsearchのdevtools

おわりに

いかがでしたか?

基本の index 作成/削除、Bulk API を利用した一括処理、SearchAPI を利用した検索クエリを go-elasticsearch でどう書くかについてざっと記載してきました。

個人的には、Elasticsearch はドキュメント迷子になりやすかったり、詳しく何かしようとすると結構コードを叩いて試行錯誤することになったりして、全体感を理解してコードを書けるまで、非常に時間がかかりました。私もまだまだ es 勉強中なのですが、何かの助けになれば幸いです!

https://www.wantedly.com/projects/1130967

https://www.wantedly.com/projects/768663

Discussion