【詳解】opensearch-jsのhelpersでsearch scrollとbulkのAPIを扱う
こんにちは!
昨年から本業の方でOpenSearch全般の運用・保守を行っております。
最近はopensearch-js(OpenSearchのJSクライアントライブラリ)を扱ったドキュメントの出し入れを行う処理に何度か改善を行っていました。
その中で得た知見等をアウトプットしようと思います。
前置き
前提として、以下の実行環境とします。
※ (後述)一部例外あり
パッケージ | version |
---|---|
AWS OpenSearch Service | 1.0 |
opensearch-js | 1.1.0 |
client.helpersを利用することによって得られるメリット
ElasticSearchもOpenSearchもそうですが、
一時的に読み書きリクエストが集中するとJVMメモリプッシャーが上がってしまいサーキットブレーカーが発動してしまいます。
これによってIOパフォーマンスが悪くなったり、429 Too Many Requestが返却されてread/writeのロックが掛かったりと、
付随するシステムに影響を及ぼしてしまいます。
そして、下記のように面倒な制御をclient.helpersが行ってくれます。
- 共通
- 429返却時に自動でretry(デフォルトは3回まで)してくれる
- helpersを利用すること無く何も意識していないとResponse Errorがエスカレーションされてしまう
- retry時すぐに再リクエストをするのでは無く、wait(デフォルトは5000ms)後に再リクエストしてくれる
- 429返却時に自動でretry(デフォルトは3回まで)してくれる
- client.helpers.scrollSearch
- 取得し終えるまでの再帰関数をコーディングする必要が無い
- またscrollの終端でclearScrollする手続き等面倒な制御が不要
- client.searchを利用するよりもレスポンスの取り出しや付随する処理のコーディングが楽になる
- 取得し終えるまでの再帰関数をコーディングする必要が無い
- client.helpers.bulk
- 分割リクエストサイズや同時リクエスト数の制限を制御するのが簡単
- 書き込みに失敗したクエリの取得も行える
search(scroll API含む) APIの扱い方について
helpersを使わない場合
scroll時のcallback関数を事細かく制御していくと、
もう少し複雑な制御になってくると思います。
await new Promise((resolve, reject) => {
const results = [];
client.search({
index: 'hoge',
size: 10,
scroll: '1m',
body: {
query: { ...foo },
},
}, async function scroll(error, response){
try {
if (!response || !response.body.hits) {
if(response.body._scroll_id) await client.clearScroll({ scroll_id: response.body._scroll_id });
reject(error);
}
if (response.body.hits.hits.length <= 0) {
await client.clearScroll({ scroll_id: response.body._scroll_id });
resolve(results)
}
results.push(response.body.hits.hits);
client.scroll(
{ scroll_id: String(response.body._scroll_id), scroll: '1m' },
scroll
);
} catch(e) {
await client.clearScroll({ scroll_id: response.body._scroll_id });
reject(error);
}
});
});
helpersを使う場合
エラー時の手続きがスマートになります。
const scrollSearch = client.helpers.scrollSearch({
index: 'hoge',
size: 10,
scroll: '1m',
body: {
query: { ...foo },
},
}, { maxRetries: 3, wait: 5000 });
for await (const response of scrollSearch) {
console.log(response)
}
helpers.scrollSearchを使うことによって既に考慮されている処理について以下で解説しています。
第二引数のoptionについて
key名 | 役割 |
---|---|
maxRetries | 429エラー返却時の最大リトライ回数 |
wait | 429エラー発生時に再試行するまでのミリ秒数 |
responseを取り出した時点で
- response.body.hitsが存在する
- && response.body.hits.hits.length > 0
ことが確約されています。
以下より。
responseの取り出し方について
client.helpers.scrollSearch自体がジェネレーター関数となっており、
responseを順番に返却してくれます。
※ そのため取り出すたびにリクエストが走ります。
scrollのclear方法について
ジェネレーター関数から終端のresponseを取り出した時点で自動でclearScrollを行ってくれます。
以下はscrollをclearする際の処理。
また、終端のresponseを取り出すまでにscrollをclearしたい場合は下記のようにします。
for await (const response of scrollSearch) {
if (cond) response.clear()
}
response.clear
にclearScroll処理のiteratorが生えていることはこちらで確認できます。
clearScrollの制御処理。
429 Too Many Request時の制御について
429エラーの時はwait(第二引数のoptionで指定したwait)後に再リクエストします。
それをretry(第二引数のoptionで指定したretry)回分までは再リクエストしてくれます。
※ retry回分までに429が解消されない場合はResponse Errorがエスカレーションされます。
Bulk APIの扱い方について
helpersを使わない場合
正常系の処理のみであればこういう感じになるかと思います。
const response = await client.bulk({
index: "test",
body: [
{ create: { _id: "sample-1" } },
{ user_id: 1, name: "foo" },
{ index : { _id : "sample-2" } },
{ user_id : 2, name: "hoge" },
{ delete : { _id : "sample-2" } },
{ update : { _id : "sample-1" } },
{ doc : { name : "Foo" } },
{ update : { _id : "sample-2" } },
{ script : { source: "ctx._source.name = params.name;", lang: 'painless', params : { name: "Foo" } }, scripted_upsert: true }
]
});
加えて、429エラーが返却されないように、
- リクエストサイズの調整
- 同時リクエスト数の制限
等のチューニングが少々面倒だったりします。
また、OpenSearch側のリソースに余裕がある場合でも、
1リクエスト辺りの句数上限(max_clause_count)を気にしないといけなかったりします。
加えて、
どのクエリが失敗したのか、失敗したクエリだけ取り出したい
という制御処理が煩雑になってしまいがちです。
helpersを使う場合
前置き
元々、client.helpers.bulkでupdateオペレーションのscriptドキュメントが対応していなかったので、
使えるように修正をしてPRを出しました。
↑がまだ取り込まれていないので、
システム側では先にpatchを当てて運用しています。
const datasource = [
[
{ create: { index: "test", _id: "sample-1" } },
{ user_id: 1, name: "foo" },
],
[
{ index : { index: "test", _id : "sample-2" } },
{ user_id : 2, name: "hoge" },
],
[
{ delete : { index: "test", _id : "sample-2" } },
],
[
{ update : { index: "test", _id : "sample-1" } },
{ doc : { name : "Foo" } },
],
[
{ update : { index: "test", _id : "sample-2" } },
{ script : { source: "ctx._source.name = params.name;", lang: 'painless', params : { name: "Foo" } }, scripted_upsert: true }
]
];
await client.helpers.bulk({
datasource,
onDocument: doc => {
// deleteオペレーションの場合は、'{"delete":{"_id":"sample-2"}}'のみ返却する
if(doc[0]?.delete){
return doc[0];
}
return doc;
},
onDrop: ({operation, document, retried, error}) => {
if (retried) return; // trueの場合は "retries > リトライ回数" の状態
const query = [operation];
// deleteオペレーション以外の場合は2つ目のobject(アクションクエリ)がある
if (document) query.push(document);
// [{"create":{"_id":"sample-1"}},{"user_id":1,"name":"foo"}]
console.log(JSON.stringify(query))
},
concurrency: 2,
flushBytes: 1024 * 1024 * 10,
refreshOnCompletion: false,
retries: 3,
});
onDocument
bulkリクエスト用のデータを返却するためのcallback関数です。
callback関数の引数docでは、
下記のようにdatasource引数の値を順番に取り出せます。
- datasource[0]
- datasource[1]
- datasource[2]
- datasource[3]
またbulkリクエスト用のデータは下記のように返却する必要があります。
※ update(doc)
と update(script)
は patchを当てないと記載どおりの返却値では動作しません。
operation | 期待する返り値 |
---|---|
create | [ { create: { index: "test", _id: "sample-1" } }, { user_id: 1, name: "foo" } ] |
index | [ { index: { index: "test", _id: "sample-2" } }, { user_id : 2, name: "hoge" } ] |
delete | { delete : { index: "test", _id : "sample-2" } } |
update(doc) | [ { update : { index: "test", _id : "sample-1" } }, { doc : { name : "Foo" } } ] |
update(script) | [ { update : { index: "test", _id : "sample-2" } }, { script : { source: "ctx._source.name = params.name;", lang: 'painless', params : { name: "Foo" } }, scripted_upsert: true } ] |
そのためdeleteアクション時のみobjectを返却するように制御しています。
onDocument: doc => {
if(doc[0]?.delete){
return doc[0];
}
return doc;
},
onDrop
リクエストに失敗した際に実行されるcallback関数です。
引数に渡されるobjectには下記のような情報が入っています。
callbackで渡される引数 | 内容 |
---|---|
operation | 例:objectのまま、datasource[0][0]やdatasource[1][0]のオペレーションデータが入っています |
document | 例:objectのまま、datasource[0][1]やdatasource[1][1]のドキュメントデータが入っています。deleteオペレーションの場合はnullが格納されています。 |
retried | 指定回数(デフォルトは3回、若しくはretriesオプションで指定)分の再試行を行った場合はfalseが入っています。それ以外の場合はtrue。 |
status | 最後に失敗したときのHTTPステータスコード。 |
error | 最後に失敗した時のエラー内容。 |
その他の引数について
引数 | 内容 |
---|---|
concurrency | 最大同時(非同期の並行)リクエスト数 |
flushBytes | 1リクエスト辺りの最大サイズ。超えた場合はその時点でリクエストを行う |
refreshOnCompletion | index,update,delete,bulk apiのrefreshオプションと同等 |
wait | 429エラー発生時に再試行するまでのミリ秒数 |
retries | 429エラー返却時の最大リトライ回数 |
429 Too Many Request時の制御について
wait
ミリ秒待ってからbulkリクエストが再試行されます。
また、 retries
回まで再試行されます。
そのとき onDrop
callback関数の retried
引数には下記のような値が入ります。
状態 | retriedの値 |
---|---|
リトライ回数 < retries | true |
リトライ回数 >= retries | false |
bulkリクエストに失敗したクエリの取得方法について
onDropのcallback関数に渡されるoperationとdocumentを使って取得します。
onDrop: ({operation, document}) => {
const query = [operation];
// deleteオペレーション以外の場合は2つ目のobject(アクションクエリ)がある
if (document) query.push(document);
// [{"create":{"_id":"sample-1"}},{"user_id":1,"name":"foo"}]
console.log(JSON.stringify(query))
}
deleteオペレーション以外の場合は下記によってdatasourceに入れたとおりのクエリを取得できます。
[operation, document]
※ deleteオペレーションの場合は document
がnullなので注意が必要です
まとめ
OpenSearch公式のドキュメントがまだ充実していないので、
よくElasticSearchの公式ドキュメントを参照しています。
GWの後半は負荷を緩和するチューニングについてもう少し勉強しようと思います。
Discussion