Elasticsearchのインデックスデータを高速でダンプするCLIツールを作りたかった
作ろうとした背景
以前、elasticdumpを使ってインデックスデータをダンプしていたのですが、結構な時間を要したため暇だった年末年始あたりからCLIツールをGo言語で作りはじめました。
実装
処理の大まかな流れは下記の図のような形で実装しました。
今回は、処理の効率化と学習を目的に並行処理でインデックスデータを保存するようにしました。
困ったこと
Elasticsearchの検索上限
ElasticsearchのSeach APIではsearch_after
を使用しない場合、検索の上限がデフォルトで10000件までとなり、ドキュメントが10000件を超えるインデックスのダンプをすることができませんでした。
Search APIの代わりに、Scroll APIを使用することで、10000件以上のドキュメントを取得することができました。
並行処理
最初は下記のような形で実装したのですが、deadlockが発生し正常に終了することができませんでした。
変更前
package main
import (
"context"
"fmt"
"strconv"
"golang.org/x/sync/errgroup"
)
func main() {
eg, ctx := errgroup.WithContext(context.Background())
ch1 := make(chan string)
ch2 := make(chan string)
for i := 0; i < 10; i++ {
n := i
eg.Go(func() error {
select {
case <-ctx.Done():
return nil
default:
str := strconv.Itoa(n)
ch1 <- str + ":Hello, "
return nil
}
})
}
eg.Go(func() error {
select {
case <-ctx.Done():
return nil
default:
for str := range ch1 {
str += "World!"
ch2 <- str
}
return nil
}
})
eg.Go(func() error {
select {
case <-ctx.Done():
return nil
default:
for str := range ch2 {
fmt.Println(str)
}
return nil
}
})
if err := eg.Wait(); err != nil {
fmt.Println(err)
}
}
原因は、各処理のあとにチャネルを閉じていないためでした。
そこで下記のような形で実装しました。
チャネルch1
を使用するfor文では、処理の終了判定をおこない真の場合は、チャネルch1Done
に送信し、受信したサブゴルーチンでチャネルch1
を閉じるようにしました。
変更後
package main
import (
"context"
"fmt"
"strconv"
"sync"
"golang.org/x/sync/errgroup"
)
func main() {
eg, ctx := errgroup.WithContext(context.Background())
ch1 := make(chan string)
ch1Done := make(chan struct{})
ch2 := make(chan string)
var cnt int
var mu sync.Mutex
for i := 0; i < 10; i++ {
n := i
eg.Go(func() error {
select {
case <-ctx.Done():
return nil
default:
mu.Lock()
defer mu.Unlock()
str := strconv.Itoa(n)
ch1 <- str + ":Hello, "
cnt++
if cnt == 10 {
ch1Done <- struct{}{}
}
return nil
}
})
}
eg.Go(func() error {
select {
case <-ctx.Done():
return nil
case <-ch1Done:
fmt.Println("close ch1")
close(ch1)
return nil
}
})
eg.Go(func() error {
select {
case <-ctx.Done():
return nil
default:
defer close(ch2)
for str := range ch1 {
str += "World!"
ch2 <- str
}
return nil
}
})
eg.Go(func() error {
select {
case <-ctx.Done():
return nil
default:
for str := range ch2 {
fmt.Println(str)
}
return nil
}
})
if err := eg.Wait(); err != nil {
fmt.Println(err)
}
}
動作確認
環境
- CLIツール実行環境
- CPUコア:4
- メモリ:4GB
- Elasticsearch 7.9.3(Docker)
- CPUコア:4
- メモリ:4GB
計測
計測にはこちらのshakespeare.json
をelasticsearch 7用に修正したもの使用しました。
ドキュメント件数は111396件です。
計測用スクリプトは下記になります。
#!/bin/bash
TIME1=`date +%s%N`
# elasticdump --limitで一度に取得する件数を調整
#elasticdump --input=http://hoge.example.com:9200/shakespeare --output=dump.json --limit=100 --concurrency=5
# 自作ツール -sで一度に取得する件数を調整
#nikon dump shakespeare -h http://hoge.example.com:9200 -s 100
TIME2=`date +%s%N`
TIME=`echo "scale=3; (${TIME2} - ${TIME1})/1000000000" | bc`
echo $TIME
elasitcdump
1度に取得する件数 | 100 | 1000 | 5000 | 10000 |
---|---|---|---|---|
1回目 | 1121 | 124.37 | 30.927 | 21.242 |
2回目 | 1121 | 120.834 | 30.915 | 21.238 |
3回目 | 1122 | 120.723 | 31.036 | 21.266 |
4回目 | 1121 | 120.74 | 30.921 | 21.281 |
5回目 | 1121 | 120.734 | 30.93 | 21.235 |
6回目 | 1121 | 120.734 | 30.942 | 21.241 |
平均処理時間/秒 | 1121.167 | 121.356 | 30.945 | 21.251 |
自作CLIツール
1度に取得する件数 | 100 | 1000 | 5000 | 10000 |
---|---|---|---|---|
1回目 | 10.512 | 8.789 | 7.795 | 7.304 |
2回目 | 10.303 | 8.882 | 7.606 | 7.283 |
3回目 | 10.526 | 8.841 | 7.655 | 7.164 |
4回目 | 10.396 | 8.943 | 7.551 | 7.331 |
5回目 | 10.405 | 8.835 | 7.484 | 7.144 |
6回目 | 10.624 | 8.828 | 7.536 | 7.083 |
平均処理時間/秒 | 10.461 | 8.853 | 7.605 | 7.218 |
elasticdumpより早いと思ったのですが、elasticdumpには--retryDelay
という遅延時間を調整するオプションがあり、そちらを1msに設定して計測したらそんなに変わらない結果となりました。
elsticdump
1度に取得する件数 | 100 | 1000 | 5000 | 10000 |
---|---|---|---|---|
1回目 | 13.332 | 9.048 | 8.051 | 7.513 |
2回目 | 14.011 | 8.981 | 8.094 | 7.475 |
3回目 | 13.175 | 9.176 | 7.9 | 7.813 |
4回目 | 13.748 | 8.822 | 7.972 | 7.836 |
5回目 | 14.406 | 8.788 | 8.059 | 7.543 |
6回目 | 14.49 | 8.903 | 8.042 | 7.525 |
平均処理時間/秒 | 13.860 | 8.953 | 8.020 | 7.618 |
出来上がったもの
elasticdumpより微妙に早い(かもしれない)CLIツールはこちらです。
CLIツールはダンプは可能ですが、リストアは未実装です。保存されるファイルはNDJSON形式なので、Bulk APIを使用してリストア可能です。
https://github.com/takenoko-gohan/nikon
参考
REST APIs
go-elasticsearch
複数のGoroutineをWaitGroup(ErrGroup)で制御する
【Go入門】チャネルのrangeとclose()
Discussion