😢

Elasticsearchのインデックスデータを高速でダンプするCLIツールを作りたかった

2021/01/11に公開

作ろうとした背景

以前、elasticdumpを使ってインデックスデータをダンプしていたのですが、結構な時間を要したため暇だった年末年始あたりからCLIツールをGo言語で作りはじめました。

実装

処理の大まかな流れは下記の図のような形で実装しました。
今回は、処理の効率化と学習を目的に並行処理でインデックスデータを保存するようにしました。

困ったこと

Elasticsearchの検索上限

ElasticsearchのSeach APIではsearch_afterを使用しない場合、検索の上限がデフォルトで10000件までとなり、ドキュメントが10000件を超えるインデックスのダンプをすることができませんでした。
Search APIの代わりに、Scroll APIを使用することで、10000件以上のドキュメントを取得することができました。

並行処理

最初は下記のような形で実装したのですが、deadlockが発生し正常に終了することができませんでした。

The Go Playground

変更前
package main

import (
	"context"
	"fmt"
	"strconv"

	"golang.org/x/sync/errgroup"
)

func main() {
	eg, ctx := errgroup.WithContext(context.Background())

	ch1 := make(chan string)
	ch2 := make(chan string)

	for i := 0; i < 10; i++ {
		n := i
		eg.Go(func() error {
			select {
			case <-ctx.Done():
				return nil
			default:
				str := strconv.Itoa(n)
				ch1 <- str + ":Hello, "

				return nil
			}
		})
	}

	eg.Go(func() error {
		select {
		case <-ctx.Done():
			return nil
		default:
			for str := range ch1 {
				str += "World!"
				ch2 <- str
			}
			return nil
		}
	})

	eg.Go(func() error {
		select {
		case <-ctx.Done():
			return nil
		default:
			for str := range ch2 {
				fmt.Println(str)
			}
			return nil
		}
	})

	if err := eg.Wait(); err != nil {
		fmt.Println(err)
	}
}

原因は、各処理のあとにチャネルを閉じていないためでした。
そこで下記のような形で実装しました。
チャネルch1を使用するfor文では、処理の終了判定をおこない真の場合は、チャネルch1Doneに送信し、受信したサブゴルーチンでチャネルch1を閉じるようにしました。

The Go Playground

変更後
package main

import (
	"context"
	"fmt"
	"strconv"
	"sync"

	"golang.org/x/sync/errgroup"
)

func main() {
	eg, ctx := errgroup.WithContext(context.Background())

	ch1 := make(chan string)
	ch1Done := make(chan struct{})
	ch2 := make(chan string)

	var cnt int

	var mu sync.Mutex

	for i := 0; i < 10; i++ {
		n := i
		eg.Go(func() error {
			select {
			case <-ctx.Done():
				return nil
			default:
				mu.Lock()
				defer mu.Unlock()
				str := strconv.Itoa(n)
				ch1 <- str + ":Hello, "
				cnt++
				if cnt == 10 {
					ch1Done <- struct{}{}
				}
				return nil
			}
		})
	}

	eg.Go(func() error {
		select {
		case <-ctx.Done():
			return nil
		case <-ch1Done:
			fmt.Println("close ch1")
			close(ch1)
			return nil
		}
	})

	eg.Go(func() error {
		select {
		case <-ctx.Done():
			return nil
		default:
			defer close(ch2)
			for str := range ch1 {
				str += "World!"
				ch2 <- str
			}
			return nil
		}
	})

	eg.Go(func() error {
		select {
		case <-ctx.Done():
			return nil
		default:
			for str := range ch2 {
				fmt.Println(str)
			}
			return nil
		}
	})

	if err := eg.Wait(); err != nil {
		fmt.Println(err)
	}
}

動作確認

環境

  • CLIツール実行環境
    • CPUコア:4
    • メモリ:4GB
  • Elasticsearch 7.9.3(Docker)
    • CPUコア:4
    • メモリ:4GB

計測

計測にはこちらshakespeare.jsonをelasticsearch 7用に修正したもの使用しました。
ドキュメント件数は111396件です。

計測用スクリプトは下記になります。

#!/bin/bash
TIME1=`date +%s%N`

# elasticdump --limitで一度に取得する件数を調整
#elasticdump --input=http://hoge.example.com:9200/shakespeare --output=dump.json --limit=100 --concurrency=5
# 自作ツール -sで一度に取得する件数を調整
#nikon dump shakespeare -h http://hoge.example.com:9200 -s 100

TIME2=`date +%s%N`
TIME=`echo "scale=3; (${TIME2} - ${TIME1})/1000000000" | bc`
echo $TIME

elasitcdump

1度に取得する件数 100 1000 5000 10000
1回目 1121 124.37 30.927 21.242
2回目 1121 120.834 30.915 21.238
3回目 1122 120.723 31.036 21.266
4回目 1121 120.74 30.921 21.281
5回目 1121 120.734 30.93 21.235
6回目 1121 120.734 30.942 21.241
平均処理時間/秒 1121.167 121.356 30.945 21.251

自作CLIツール

1度に取得する件数 100 1000 5000 10000
1回目 10.512 8.789 7.795 7.304
2回目 10.303 8.882 7.606 7.283
3回目 10.526 8.841 7.655 7.164
4回目 10.396 8.943 7.551 7.331
5回目 10.405 8.835 7.484 7.144
6回目 10.624 8.828 7.536 7.083
平均処理時間/秒 10.461 8.853 7.605 7.218

elasticdumpより早いと思ったのですが、elasticdumpには--retryDelayという遅延時間を調整するオプションがあり、そちらを1msに設定して計測したらそんなに変わらない結果となりました。

elsticdump

1度に取得する件数 100 1000 5000 10000
1回目 13.332 9.048 8.051 7.513
2回目 14.011 8.981 8.094 7.475
3回目 13.175 9.176 7.9 7.813
4回目 13.748 8.822 7.972 7.836
5回目 14.406 8.788 8.059 7.543
6回目 14.49 8.903 8.042 7.525
平均処理時間/秒 13.860 8.953 8.020 7.618

出来上がったもの

elasticdumpより微妙に早い(かもしれない)CLIツールはこちらです。
CLIツールはダンプは可能ですが、リストアは未実装です。保存されるファイルはNDJSON形式なので、Bulk APIを使用してリストア可能です。

https://github.com/takenoko-gohan/nikon

参考

REST APIs
go-elasticsearch
複数のGoroutineをWaitGroup(ErrGroup)で制御する
【Go入門】チャネルのrangeとclose()

Discussion