🔍

【詳解】opensearch-jsのhelpersでsearch scrollとbulkのAPIを扱う

2022/05/05に公開

こんにちは!

昨年から本業の方でOpenSearch全般の運用・保守を行っております。
最近はopensearch-js(OpenSearchのJSクライアントライブラリ)を扱ったドキュメントの出し入れを行う処理に何度か改善を行っていました。
その中で得た知見等をアウトプットしようと思います。

前置き

前提として、以下の実行環境とします。
※ (後述)一部例外あり

パッケージ version
AWS OpenSearch Service 1.0
opensearch-js 1.1.0

client.helpersを利用することによって得られるメリット

ElasticSearchもOpenSearchもそうですが、
一時的に読み書きリクエストが集中するとJVMメモリプッシャーが上がってしまいサーキットブレーカーが発動してしまいます。
これによってIOパフォーマンスが悪くなったり、429 Too Many Requestが返却されてread/writeのロックが掛かったりと、
付随するシステムに影響を及ぼしてしまいます。

そして、下記のように面倒な制御をclient.helpersが行ってくれます。

  • 共通
    • 429返却時に自動でretry(デフォルトは3回まで)してくれる
      • helpersを利用すること無く何も意識していないとResponse Errorがエスカレーションされてしまう
    • retry時すぐに再リクエストをするのでは無く、wait(デフォルトは5000ms)後に再リクエストしてくれる
  • client.helpers.scrollSearch
    • 取得し終えるまでの再帰関数をコーディングする必要が無い
      • またscrollの終端でclearScrollする手続き等面倒な制御が不要
      • client.searchを利用するよりもレスポンスの取り出しや付随する処理のコーディングが楽になる
  • client.helpers.bulk
    • 分割リクエストサイズや同時リクエスト数の制限を制御するのが簡単
    • 書き込みに失敗したクエリの取得も行える

search(scroll API含む) APIの扱い方について

helpersを使わない場合

scroll時のcallback関数を事細かく制御していくと、
もう少し複雑な制御になってくると思います。

await new Promise((resolve, reject) => {
  const results = [];
  client.search({
    index: 'hoge',
    size: 10,
    scroll: '1m',
    body: {
      query: { ...foo },
    },
  }, async function scroll(error, response){
    try {
      if (!response || !response.body.hits) {
        if(response.body._scroll_id) await client.clearScroll({ scroll_id: response.body._scroll_id });
        reject(error);
      }
      if (response.body.hits.hits.length <= 0) {
        await client.clearScroll({ scroll_id: response.body._scroll_id });
        resolve(results)
      }
      results.push(response.body.hits.hits);
      client.scroll(
        { scroll_id: String(response.body._scroll_id), scroll: '1m' },
        scroll
      );
    } catch(e) {
      await client.clearScroll({ scroll_id: response.body._scroll_id });
      reject(error);
    }
  });
});

helpersを使う場合

エラー時の手続きがスマートになります。

const scrollSearch = client.helpers.scrollSearch({
  index: 'hoge',
  size: 10,
  scroll: '1m',
  body: {
    query: { ...foo },
  },
}, { maxRetries: 3, wait: 5000 });

for await (const response of scrollSearch) {
  console.log(response)
}

helpers.scrollSearchを使うことによって既に考慮されている処理について以下で解説しています。

第二引数のoptionについて

key名 役割
maxRetries 429エラー返却時の最大リトライ回数
wait 429エラー発生時に再試行するまでのミリ秒数

responseを取り出した時点で

  • response.body.hitsが存在する
  • && response.body.hits.hits.length > 0

ことが確約されています。

以下より。

https://github.com/opensearch-project/opensearch-js/blob/main/lib/Helpers.js#L122-L122

responseの取り出し方について

client.helpers.scrollSearch自体がジェネレーター関数となっており、
responseを順番に返却してくれます。

https://github.com/opensearch-project/opensearch-js/blob/main/lib/Helpers.js#L129-L129

※ そのため取り出すたびにリクエストが走ります。

scrollのclear方法について

ジェネレーター関数から終端のresponseを取り出した時点で自動でclearScrollを行ってくれます。

https://github.com/opensearch-project/opensearch-js/blob/main/lib/Helpers.js#L150-L150

以下はscrollをclearする際の処理。

https://github.com/opensearch-project/opensearch-js/blob/main/lib/Helpers.js#L114-L120

また、終端のresponseを取り出すまでにscrollをclearしたい場合は下記のようにします。

for await (const response of scrollSearch) {
  if (cond) response.clear()
}

response.clear にclearScroll処理のiteratorが生えていることはこちらで確認できます。

https://github.com/opensearch-project/opensearch-js/blob/main/lib/Helpers.js#L126-L126

clearScrollの制御処理。

https://github.com/opensearch-project/opensearch-js/blob/main/lib/Helpers.js#L114-L120

429 Too Many Request時の制御について

429エラーの時はwait(第二引数のoptionで指定したwait)後に再リクエストします。

https://github.com/opensearch-project/opensearch-js/blob/main/lib/Helpers.js#L141-L142

それをretry(第二引数のoptionで指定したretry)回分までは再リクエストしてくれます。

https://github.com/opensearch-project/opensearch-js/blob/main/lib/Helpers.js#L135-L135

※ retry回分までに429が解消されない場合はResponse Errorがエスカレーションされます。

https://github.com/opensearch-project/opensearch-js/blob/main/lib/Helpers.js#L144-L146

Bulk APIの扱い方について

helpersを使わない場合

正常系の処理のみであればこういう感じになるかと思います。

const response = await client.bulk({
  index: "test",
  body: [
    { create: { _id: "sample-1" } },
    { user_id: 1, name: "foo" },

    { index : { _id : "sample-2" } },
    { user_id : 2, name: "hoge" },

    { delete : { _id : "sample-2" } },

    { update : { _id : "sample-1" } },
    { doc : { name : "Foo" } },

    { update : { _id : "sample-2" } },
    { script : { source: "ctx._source.name = params.name;", lang: 'painless', params : { name: "Foo" } }, scripted_upsert: true }
  ]
});

加えて、429エラーが返却されないように、

  • リクエストサイズの調整
  • 同時リクエスト数の制限

等のチューニングが少々面倒だったりします。

また、OpenSearch側のリソースに余裕がある場合でも、
1リクエスト辺りの句数上限(max_clause_count)を気にしないといけなかったりします。

加えて、
どのクエリが失敗したのか、失敗したクエリだけ取り出したい
という制御処理が煩雑になってしまいがちです。

helpersを使う場合

前置き

元々、client.helpers.bulkでupdateオペレーションのscriptドキュメントが対応していなかったので、
使えるように修正をしてPRを出しました。
https://github.com/opensearch-project/opensearch-js/pull/210

↑がまだ取り込まれていないので、
システム側では先にpatchを当てて運用しています。

const datasource = [
  [
    { create: { index: "test", _id: "sample-1" } },
    { user_id: 1, name: "foo" },
  ],
  [
    { index : { index: "test", _id : "sample-2" } },
    { user_id : 2, name: "hoge" },
  ],
  [
    { delete : { index: "test", _id : "sample-2" } },
  ],
  [
    { update : { index: "test", _id : "sample-1" } },
    { doc : { name : "Foo" } },
  ],
  [
    { update : { index: "test", _id : "sample-2" } },
    { script : { source: "ctx._source.name = params.name;", lang: 'painless', params : { name: "Foo" } }, scripted_upsert: true }
  ]
];

await client.helpers.bulk({
  datasource,
  onDocument: doc => {
    // deleteオペレーションの場合は、'{"delete":{"_id":"sample-2"}}'のみ返却する
    if(doc[0]?.delete){
      return doc[0];
    }
    return doc;
  },
  onDrop: ({operation, document, retried, error}) => {
    if (retried) return; // trueの場合は "retries > リトライ回数" の状態
    const query = [operation];

    // deleteオペレーション以外の場合は2つ目のobject(アクションクエリ)がある
    if (document) query.push(document);

    // [{"create":{"_id":"sample-1"}},{"user_id":1,"name":"foo"}]
    console.log(JSON.stringify(query))
  },
  concurrency: 2,
  flushBytes: 1024 * 1024 * 10,
  refreshOnCompletion: false,
  retries: 3,
});

onDocument

bulkリクエスト用のデータを返却するためのcallback関数です。

callback関数の引数docでは、
下記のようにdatasource引数の値を順番に取り出せます。

  • datasource[0]
  • datasource[1]
  • datasource[2]
  • datasource[3]

またbulkリクエスト用のデータは下記のように返却する必要があります。
update(doc)update(script)patchを当てないと記載どおりの返却値では動作しません。

operation 期待する返り値
create [ { create: { index: "test", _id: "sample-1" } }, { user_id: 1, name: "foo" } ]
index [ { index: { index: "test", _id: "sample-2" } }, { user_id : 2, name: "hoge" } ]
delete { delete : { index: "test", _id : "sample-2" } }
update(doc) [ { update : { index: "test", _id : "sample-1" } }, { doc : { name : "Foo" } } ]
update(script) [ { update : { index: "test", _id : "sample-2" } }, { script : { source: "ctx._source.name = params.name;", lang: 'painless', params : { name: "Foo" } }, scripted_upsert: true } ]

そのためdeleteアクション時のみobjectを返却するように制御しています。

onDocument: doc => {
  if(doc[0]?.delete){
    return doc[0];
  }
  return doc;
},

onDrop

リクエストに失敗した際に実行されるcallback関数です。

引数に渡されるobjectには下記のような情報が入っています。

callbackで渡される引数 内容
operation 例:objectのまま、datasource[0][0]やdatasource[1][0]のオペレーションデータが入っています
document 例:objectのまま、datasource[0][1]やdatasource[1][1]のドキュメントデータが入っています。deleteオペレーションの場合はnullが格納されています。
retried 指定回数(デフォルトは3回、若しくはretriesオプションで指定)分の再試行を行った場合はfalseが入っています。それ以外の場合はtrue。
status 最後に失敗したときのHTTPステータスコード。
error 最後に失敗した時のエラー内容。

その他の引数について

引数 内容
concurrency 最大同時(非同期の並行)リクエスト数
flushBytes 1リクエスト辺りの最大サイズ。超えた場合はその時点でリクエストを行う
refreshOnCompletion index,update,delete,bulk apiのrefreshオプションと同等
wait 429エラー発生時に再試行するまでのミリ秒数
retries 429エラー返却時の最大リトライ回数

429 Too Many Request時の制御について

wait ミリ秒待ってからbulkリクエストが再試行されます。
また、 retries 回まで再試行されます。

そのとき onDrop callback関数の retried 引数には下記のような値が入ります。

状態 retriedの値
リトライ回数 < retries true
リトライ回数 >= retries false

bulkリクエストに失敗したクエリの取得方法について

onDropのcallback関数に渡されるoperationとdocumentを使って取得します。

onDrop: ({operation, document}) => {
  const query = [operation];

  // deleteオペレーション以外の場合は2つ目のobject(アクションクエリ)がある
  if (document) query.push(document);

  // [{"create":{"_id":"sample-1"}},{"user_id":1,"name":"foo"}]
  console.log(JSON.stringify(query))
}

deleteオペレーション以外の場合は下記によってdatasourceに入れたとおりのクエリを取得できます。

[operation, document]

※ deleteオペレーションの場合は document がnullなので注意が必要です

まとめ

OpenSearch公式のドキュメントがまだ充実していないので、
よくElasticSearchの公式ドキュメントを参照しています。

GWの後半は負荷を緩和するチューニングについてもう少し勉強しようと思います。

Discussion