👌

Elasticsearchの更新をGoogle Cloud DataflowとBigQuery+SQLで実現したい

2022/12/26に公開約6,100字

はじめまして。
株式会社ココナラ R&Dグループ MLデータ基盤チームの北郷と申します。

MLデータ基盤チームではココナラにおける検索周りの機能拡張や運用を行なっています。
今回の記事は、現在導入を進めているGoogle CloudのDataflowについてです。

本記事は、2022/12/16時点で検証を行なっております。
GCP側の仕様変更により手順などが変わる可能性がある旨ご承知おきください。

Google Cloud Dataflowについて

Google Cloudのサービスの一つで、ETL(Extract Transform Load)処理を実装するフルマネージドのサービスです。
https://cloud.google.com/dataflow?hl=ja

DataflowとGoogle Cloud上の他サービスを組み合わせることで、データストアからのデータの抽出および変換、結果の格納という頻出する処理をマネージドなサービスのみで構成することができます。

ETL処理を行う場合、扱うデータ量に応じて抽出/変換/読み込みの各段階で相応のコンピューターリソースの確保が必要となります。
弊社の例を挙げると、サービス検索のリランキング処理を実現する際、出品されたサービスの特徴量をRDBから取得して加工し、その結果をElasticsearchへアップロードする実装がされています。
これらの処理を素直にオンプレで実装する場合、RDBを稼働させるためのサーバ、結果の抽出と変換およびアップロード処理を行うサーバの確保が必要となります。

Dataflowを採用し、BigQueryなどのGoogle Cloud上のマネージドなサービスのみで同等の機能を実現することで、下位のレイヤーに煩わされる機会を大幅に減らすことができるという算段です。

やってみた

今回のお品書き

本記事では、Dataflowの「BigQuery to Elasticsearch」テンプレートを用いてBigQueryのテーブル上のデータをElasticsearchのインデックス上へのアップロードを行います。

大まかな流れ

  • Dataflowの実行用パラメータを作成(Elasticsearch接続設定, データの抽出/変換を行うSQLクエリ等)
  • DataflowのJobをgcloudコマンド(Google Cloud CLI)から起動

構成
Dataflowを使用した構成例

本記事では、CLIによるDataflowのJobの実行までを扱います。
(Airflow等のJobの実行基盤への組み込みについてはスコープ外です。)

前提条件

本手順では、下記の前提で進めます

  • GCP周り
    • 作業用の環境にGoogle Cloud CLIをインストール済みで、gcloudコマンドを使用できること
    • 使用するサービスアカウントに必要なアクセス権限が付与されていること
  • Elasticsearch周り
    • データのアップロード対象となるElasticsearchクラスタは、Elastic Cloud上に構築されているものとします

今回用意した構成

今回は、下記の構成を前提とします。
BigQuery上のcoconala-temp.dummy_datasourceテーブルの中身をSQLを用いてElaticSearchのdummy_indexインデックスへアップロードします

今回の構成

BigQuery テーブル構造

coconala-temp.dummy_datasourceテーブル

列名 型名 説明
id Number レコード毎に一意です(Primary Key相当)
head String レコードの概要を説明する数文字〜数十文字程度のマルチバイト文字列が格納されます
body String 本文です。数十文字から数千文字程度のマルチバイト文字列が格納されます

Elasticsearch Index Mapping

dummy_indexインデックスのデータ構造です。
BigQueryより抽出したデータをこのIndexへ格納します。

プロパティ概要

プロパティ名 説明
data_id _idと同一の値が格納されます
head coconala-temp.dummy_datasourceのhead列相当です
body coconala-temp.dummy_datasourceのbody列相当です
upload_at アップロード時刻を格納します

マッピング情報

{
    "mappings" : {
        "properties": {
            "data_id": { "type": "integer" },
            "head": { "type": "text", "analyzer": "jp_analyzer" },
            "body": { "type": "text", "analyzer": "jp_analyzer" },
            "upload_at": { "type": "date", "format": "date_optional_time||strict_date_optional_time" }
        }
    },
    "settings": {
        "index": {
            "analysis" :{
                "tokenizer": {
                    "jp_kuromoji_tokenizer": {
                         "type": "kuromoji_tokenizer",
                         "mode": "search"
                    } 
                },
                "analyzer": {
                    "jp_analyzer": {
                        "type": "custom",
                        "tokenizer": "jp_kuromoji_tokenizer",
                    }
                }
            }
        }    
    }
}

DataflowのJob実行用のパラメータの作成

Dataflowの実行時、テンプレートの構成に合わせたパラメータを引き渡すことでJobを起動します。
引き渡すパラメータはテンプレート毎に異なり、処理内容をかなり細かく定義することができます。

今回使用する「BigQuery to Elasticsearch」テンプレートの場合、使用しているものだけでも以下の軸で挙動を制御できます

  • BigQueryから取得するデータの取得方法(特定のテーブル内のレコードを全て取得もしくは、SQLクエリによる抽出)
  • データの格納先のElasticsearch情報
    • 接続情報(BASIC認証の場合id/pass, Elastic CloudのAPIKeyを使用する場合はAPI key)
    • 格納先のIndex名
  • データ格納時の挙動
    • BigQueryのレコードをElasticsearchのドキュメントへ変換する際の_id列の解決方法(SQLクエリの結果の列名を指定する、もしくはUDFで実装)
    • Elasticsearchへドキュメントを格納する際、_idが重複する場合の解決方法(_idが重複する場合はドキュメントを上書き、もしくは重複した場合はJobをエラー終了)

今回作成した実行用パラメータは下記です。
設定を要する項目が多いため、今回は、gcloudコマンドのflag-fileとして設定値を準備します。
(パスワードやホスト名などは、一部ダミーの値となります。)

cat <<EOT > flag-file.yml
--template-file-gcs-location: gs://dataflow-templates-us-central1/latest/flex/BigQuery_to_Elasticsearch
--region: us-central1
--project: coconala-temp
--parameters:
  query: "SELECT  id AS data_id, head AS head, body AS body, DATETIME(CURRENT_TIMESTAMP(), \"Asia/Tokyo\") AS upload_at  FROM \`coconala-temp.dummy_datasource\`"
  propertyAsId: data_id
  connectionUrl: https://elasticcloud-cluster-hostname:elasticcloud-cluster-portno
  index: dummy_index
  apiKey: ""
  elasticsearchUsername: elasticsearch_username
  elasticsearchPassword: elasticsearch_username
EOT

このパラメータで、BigQuery上に存在するcoconala-temp.dummy_datasourceテーブルの中身をSQLクエリで抽出し、Elasticsearchのdummy_indexへデータ構造を合わせた上で格納する処理が定義されます。

各パラメータの解説

--template-file-gcs-location

使用するDataflowのテンプレートです。今回は、BigQueryからElasticsearchへのuploadを行うため、「BigQuery to Elasticsearch」テンプレートを使用します。

--parameters

DataflowのJobに引き渡すパラメータです。
Jobの挙動はここで調整します。

  • query
    • BigQueryからElasticsearchへアップロードするためのデータを抽出するSQLクエリです。
    • 結果の1レコードがElasticsearchの1ドキュメントに変換されます。
    • クエリの結果の列名がそのままElasticsearchのドキュメントのプロパティ名となるため、結果の列は格納先のElasticsearchのIndexのMappingに合わせた別名を付与する必要があります。
  • propertyAsId
    • Elasticsearchへのupload時、ドキュメントの識別子となる_id列に、queryパラメータの結果のどの列名を割り当てるかを指定するパラメータです。仮にdata_idを指定した場合、クエリの結果のdata_id列の値が_idとしてElasticsearchへuploadされます。
  • connecltionUrl
    • upload先となるElasticsearchのホストのURLです
  • index
    • upload先となるElasticsearchのIndex名です
  • apiKey
    • ElasticCloudの認証時にapiKeyを使用する場合は指定します。 今回は未使用ですが、 必須パラメータのため空文字を指定しています。
  • elasticsearchUsername, elasticsearchPassword
    • Elasticsearchの接続時に使用する認証情報です。今回はBASIC認証のため指定しています。 (apiKeyを使用する場合は不要です)

Jobの実行

パラメータを作成した後、下記のコマンドでDataflowのJobを実行できます。

gcloud dataflow flex-template run temp-job-$(date +'%Y%m%d-%H%M') --flags-file=flag-file.yml

DataflowのJobを実行する際には、実行の都度一意なJob名を指定します。実行結果を確認する際にはこの項目名で履歴を追いかけることになるため、実行したタイミングが判別できる様にしておくと無難です。
(上記コマンドの場合、temp-job-$(date +'%Y%m%d-%H%M')の部分がJob名にあたります.)

設定パラメータに問題がなければ、コマンド実行後にJobが起動します。

Jobの実行状況及び結果は下記コマンドで確認できます。

# 最近実行されたJobの一覧を表示する
gcloud dataflow jobs list
# 「JOB名」で指定されたJobの実行状況を確認する
gcloud dataflow jobs show JOB名

CLI上でも確認可能ですが、GCPのWeb GUIの方が情報がまとまっております。
Job実行失敗時の調査などはこちらを使用するのがおすすめです。
https://console.cloud.google.com/dataflow/jobs

  1. まとめと今後

今回はJobの実行に触れたのみですが、実運用の際はAirflowなどのJob実行基盤経由で定周期でJobを起動する運用になると思います。
弊チームではJobの実行基盤にCloud Composer(Airflow)を採用しており、ここにDataflowを組み込むことでデータ基盤をマネージドなサービスで覆う見込みです。

Discussion

ログインするとコメントできます