Elasticsearchの更新をGoogle Cloud DataflowとBigQuery+SQLで実現したい
はじめまして。
株式会社ココナラ R&Dグループ MLデータ基盤チームの北郷と申します。
MLデータ基盤チームではココナラにおける検索周りの機能拡張や運用を行なっています。
今回の記事は、現在導入を進めているGoogle CloudのDataflowについてです。
本記事は、2022/12/16時点で検証を行なっております。
GCP側の仕様変更により手順などが変わる可能性がある旨ご承知おきください。
Google Cloud Dataflowについて
Google Cloudのサービスの一つで、ETL(Extract Transform Load)処理を実装するフルマネージドのサービスです。
DataflowとGoogle Cloud上の他サービスを組み合わせることで、データストアからのデータの抽出および変換、結果の格納という頻出する処理をマネージドなサービスのみで構成することができます。
ETL処理を行う場合、扱うデータ量に応じて抽出/変換/読み込みの各段階で相応のコンピューターリソースの確保が必要となります。
弊社の例を挙げると、サービス検索のリランキング処理を実現する際、出品されたサービスの特徴量をRDBから取得して加工し、その結果をElasticsearchへアップロードする実装がされています。
これらの処理を素直にオンプレで実装する場合、RDBを稼働させるためのサーバ、結果の抽出と変換およびアップロード処理を行うサーバの確保が必要となります。
Dataflowを採用し、BigQueryなどのGoogle Cloud上のマネージドなサービスのみで同等の機能を実現することで、下位のレイヤーに煩わされる機会を大幅に減らすことができるという算段です。
やってみた
今回のお品書き
本記事では、Dataflowの「BigQuery to Elasticsearch」テンプレートを用いてBigQueryのテーブル上のデータをElasticsearchのインデックス上へのアップロードを行います。
大まかな流れ
- Dataflowの実行用パラメータを作成(Elasticsearch接続設定, データの抽出/変換を行うSQLクエリ等)
- DataflowのJobをgcloudコマンド(Google Cloud CLI)から起動
構成
本記事では、CLIによるDataflowのJobの実行までを扱います。
(Airflow等のJobの実行基盤への組み込みについてはスコープ外です。)
前提条件
本手順では、下記の前提で進めます
- GCP周り
- 作業用の環境にGoogle Cloud CLIをインストール済みで、
gcloud
コマンドを使用できること - 使用するサービスアカウントに必要なアクセス権限が付与されていること
- 作業用の環境にGoogle Cloud CLIをインストール済みで、
- Elasticsearch周り
- データのアップロード対象となるElasticsearchクラスタは、Elastic Cloud上に構築されているものとします
今回用意した構成
今回は、下記の構成を前提とします。
BigQuery上のcoconala-temp.dummy_datasource
テーブルの中身をSQLを用いてElaticSearchのdummy_index
インデックスへアップロードします
BigQuery テーブル構造
coconala-temp.dummy_datasource
テーブル
列名 | 型名 | 説明 |
---|---|---|
id | Number | レコード毎に一意です(Primary Key相当) |
head | String | レコードの概要を説明する数文字〜数十文字程度のマルチバイト文字列が格納されます |
body | String | 本文です。数十文字から数千文字程度のマルチバイト文字列が格納されます |
Elasticsearch Index Mapping
dummy_index
インデックスのデータ構造です。
BigQueryより抽出したデータをこのIndexへ格納します。
プロパティ概要
プロパティ名 | 説明 |
---|---|
data_id | _idと同一の値が格納されます |
head | coconala-temp.dummy_datasourceのhead列相当です |
body | coconala-temp.dummy_datasourceのbody列相当です |
upload_at | アップロード時刻を格納します |
マッピング情報
{
"mappings" : {
"properties": {
"data_id": { "type": "integer" },
"head": { "type": "text", "analyzer": "jp_analyzer" },
"body": { "type": "text", "analyzer": "jp_analyzer" },
"upload_at": { "type": "date", "format": "date_optional_time||strict_date_optional_time" }
}
},
"settings": {
"index": {
"analysis" :{
"tokenizer": {
"jp_kuromoji_tokenizer": {
"type": "kuromoji_tokenizer",
"mode": "search"
}
},
"analyzer": {
"jp_analyzer": {
"type": "custom",
"tokenizer": "jp_kuromoji_tokenizer",
}
}
}
}
}
}
DataflowのJob実行用のパラメータの作成
Dataflowの実行時、テンプレートの構成に合わせたパラメータを引き渡すことでJobを起動します。
引き渡すパラメータはテンプレート毎に異なり、処理内容をかなり細かく定義することができます。
今回使用する「BigQuery to Elasticsearch」テンプレートの場合、使用しているものだけでも以下の軸で挙動を制御できます
- BigQueryから取得するデータの取得方法(特定のテーブル内のレコードを全て取得もしくは、SQLクエリによる抽出)
- データの格納先のElasticsearch情報
- 接続情報(BASIC認証の場合id/pass, Elastic CloudのAPIKeyを使用する場合はAPI key)
- 格納先のIndex名
- データ格納時の挙動
- BigQueryのレコードをElasticsearchのドキュメントへ変換する際の
_id
列の解決方法(SQLクエリの結果の列名を指定する、もしくはUDFで実装) - Elasticsearchへドキュメントを格納する際、
_id
が重複する場合の解決方法(_id
が重複する場合はドキュメントを上書き、もしくは重複した場合はJobをエラー終了)
- BigQueryのレコードをElasticsearchのドキュメントへ変換する際の
今回作成した実行用パラメータは下記です。
設定を要する項目が多いため、今回は、gcloud
コマンドのflag-fileとして設定値を準備します。
(パスワードやホスト名などは、一部ダミーの値となります。)
cat <<EOT > flag-file.yml
--template-file-gcs-location: gs://dataflow-templates-us-central1/latest/flex/BigQuery_to_Elasticsearch
--region: us-central1
--project: coconala-temp
--parameters:
query: "SELECT id AS data_id, head AS head, body AS body, DATETIME(CURRENT_TIMESTAMP(), \"Asia/Tokyo\") AS upload_at FROM \`coconala-temp.dummy_datasource\`"
propertyAsId: data_id
connectionUrl: https://elasticcloud-cluster-hostname:elasticcloud-cluster-portno
index: dummy_index
apiKey: ""
elasticsearchUsername: elasticsearch_username
elasticsearchPassword: elasticsearch_username
EOT
このパラメータで、BigQuery上に存在するcoconala-temp.dummy_datasource
テーブルの中身をSQLクエリで抽出し、Elasticsearchのdummy_index
へデータ構造を合わせた上で格納する処理が定義されます。
各パラメータの解説
--template-file-gcs-location
使用するDataflowのテンプレートです。今回は、BigQueryからElasticsearchへのuploadを行うため、「BigQuery to Elasticsearch」テンプレートを使用します。
--parameters
DataflowのJobに引き渡すパラメータです。
Jobの挙動はここで調整します。
- query
- BigQueryからElasticsearchへアップロードするためのデータを抽出するSQLクエリです。
- 結果の1レコードがElasticsearchの1ドキュメントに変換されます。
- クエリの結果の列名がそのままElasticsearchのドキュメントのプロパティ名となるため、結果の列は格納先のElasticsearchのIndexのMappingに合わせた別名を付与する必要があります。
- propertyAsId
- Elasticsearchへのupload時、ドキュメントの識別子となる
_id
列に、queryパラメータの結果のどの列名を割り当てるかを指定するパラメータです。仮にdata_id
を指定した場合、クエリの結果のdata_id
列の値が_id
としてElasticsearchへuploadされます。
- Elasticsearchへのupload時、ドキュメントの識別子となる
- connecltionUrl
- upload先となるElasticsearchのホストのURLです
- index
- upload先となるElasticsearchのIndex名です
- apiKey
- ElasticCloudの認証時にapiKeyを使用する場合は指定します。 今回は未使用ですが、 必須パラメータのため空文字を指定しています。
- elasticsearchUsername, elasticsearchPassword
- Elasticsearchの接続時に使用する認証情報です。今回はBASIC認証のため指定しています。 (apiKeyを使用する場合は不要です)
Jobの実行
パラメータを作成した後、下記のコマンドでDataflowのJobを実行できます。
gcloud dataflow flex-template run temp-job-$(date +'%Y%m%d-%H%M') --flags-file=flag-file.yml
DataflowのJobを実行する際には、実行の都度一意なJob名を指定します。実行結果を確認する際にはこの項目名で履歴を追いかけることになるため、実行したタイミングが判別できる様にしておくと無難です。
(上記コマンドの場合、temp-job-$(date +'%Y%m%d-%H%M')
の部分がJob名にあたります.)
設定パラメータに問題がなければ、コマンド実行後にJobが起動します。
Jobの実行状況及び結果は下記コマンドで確認できます。
# 最近実行されたJobの一覧を表示する
gcloud dataflow jobs list
# 「JOB名」で指定されたJobの実行状況を確認する
gcloud dataflow jobs show JOB名
CLI上でも確認可能ですが、GCPのWeb GUIの方が情報がまとまっております。
Job実行失敗時の調査などはこちらを使用するのがおすすめです。
- まとめと今後
今回はJobの実行に触れたのみですが、実運用の際はAirflowなどのJob実行基盤経由で定周期でJobを起動する運用になると思います。
弊チームではJobの実行基盤にCloud Composer(Airflow)を採用しており、ここにDataflowを組み込むことでデータ基盤をマネージドなサービスで覆う見込みです。
Discussion