BigQuery の Continuous Queries を試してみた
Google Cloud Japan Advent Calendar 2024 の17日目の記事です。
本日のテーマは、BigQuery の Continuous Queries(継続的クエリ)です。
継続的クエリは、BigQuery のテーブルに書き込まれたデータをリアルタイムに処理し、他のテーブルや Bigtable、Pub/Sub 等に書き込むことができます。
ユースケースとしては、以下が想定されています。
- カスタマイズされた顧客対応: 生成AI を使用して、顧客ごとにカスタマイズされたメッセージを作成する。
- 異常検出: 複雑なデータに対して異常と脅威をリアルタイムで検出できるソリューションを構築し、問題に迅速に対応できるようにする。
- カスタマイズ可能なイベントドリブンパイプライン: Pub/Sub と継続的クエリを統合して、受信データに基づいてダウンストリーム アプリケーションをトリガーします。
- データの拡充とエンティティの抽出: 継続的クエリで、SQL関数や機械学習(ML)モデルを使用し、リアルタイムでデータの拡充と変換を行う。
- リバースETL(抽出 / 変換 / 読み込み): 低レイテンシのアプリケーションに適した他のストレージシステム(リレーショナルDBやKVS等)にリアルタイムなリバースETLを実行する。 たとえば、BigQuery に書き込まれたイベントデータを分析、拡充、Bigtable に出力し、アプリケーションのバックエンドとして利用する。
本稿では、生成AIを使用してデータを拡充し、BigQuery のテーブルやBigtable、Pub/Subに出力していきます。
なお、2024/12/17時点では継続的クエリはプレビューとして提供されており、リクエストフォームでの申請が必要となります。
tl;dr
- 継続的クエリを使用すると、SQLでストリームデータの処理が実装できる
- 継続的クエリでは、処理した結果を BigQuery のテーブルや、Pub/Sub、Bigtable に書き込むといったリバースETLが実装できる
- ストリームデータに対して、生成AIやMLの推論、異常検知等を適用し、リアルタイムにデータの拡充ができる
早速試してみる
以下の流れで、継続的クエリを試していきます。
- 予約の設定
- 生成AIによるデータの拡充
- Bigtable への書き込み
- Pub/Sub への書き込み
前提条件
本稿では、チュートリアルでも使われている、映画レビューの公開テーブル bigquery-public-data.imdb.reviews
を使用します。
本稿の実施に必要な権限の詳細ついては、各手順に対応したドキュメントを参照してください。
BigQuery 関連の権限だけでなく、リモートモデルの利用のために Vertex AI の権限、リバースETLで書き込むリソース(Bigtable、Pub/Sub)の権限も必要となります。
準備
まず、継続的クエリ向けに Enterprise Edition または Enterprise Plus Edition の予約作成と割り当てを行います。
継続的クエリを利用するためには、予約にベースラインのスロットを設定し、ジョブタイプ CONTINUOUS
の割り当てをする必要があります。 ベースラインスロットの設定が必要となるため、Standard EditionやOn-demandではご利用いただけません。
容量管理画面の "CREATE RESERVATION" から、ベースラインを設定した予約を作成します。
作成した予約の "割り当てを作成する" から当該プロジェクトとジョブタイプ CONTINUOUS
を選択し、割り当てます。
予約の作成割り当ては以上です。
次の図は一般的な継続的クエリのワークフローを示しています。以降、BigQuery に挿入されたデータを生成AI関数(下図の Vertex AI にあたる部分 )を使用し、BigQueryのテーブル、Bigtable、Pub/Subに書き込んでいきます。
生成AIによるデータ拡充
gemini-1.5-flash-002
モデルに基づいてリモートモデルを作成し、ML.GENERATE_TEXT
関数をつかって、映画レビューの公開テーブル bigquery-public-data.imdb.reviews
からキーワードを抽出し、感情分析を行います。
このSQLを継続的クエリとして実行し、生成AIで拡充したデータを BigQuery のテーブルに書き込みます。
該当するドキュメントはこちらです。
本稿では継続的クエリの実施にフォーカスするため、詳細な手順は上記をご確認ください。
上記のドキュメントで以下を行います。
- データセットの作成
- 接続の作成
- サービスアカウントへの権限付与
- リモートモデルの作成
- キーワード抽出
- 感情分析
上記を実施したのち、感情分析のクエリを継続的クエリにしていきます。
ここでは、プロンプトに日本語を指定し、生成テキストの簡素化をしています。
継続的クエリで処理されることを試すための入力テーブルを作成します。
CREATE OR REPLACE TABLE bqml_tutorial.imdb_reviews AS
SELECT * FROM `bigquery-public-data.imdb.reviews` limit 0
結果を書き込むテーブルを作成します。
CREATE OR REPLACE TABLE bqml_tutorial.imdb_reviews_sentiment_analysis AS
SELECT
ml_generate_text_result['candidates'][0]['content']['parts'][0]['text'] AS generated_text,
* EXCEPT (ml_generate_text_result)
FROM
ML.GENERATE_TEXT(
MODEL `bqml_tutorial.gemini_model`,
(
SELECT
CONCAT(
'以下の文章を感情分析してください。感情は positive か negative のみで表してください: ',
review) AS prompt,
*
FROM
`bqml_tutorial.imdb_reviews`
LIMIT 5
),
STRUCT(
0.2 AS temperature,
100 AS max_output_tokens))
LIMIT 0
以下のSQLを継続的クエリとして実行します。
入力をbqml_tutorial.imdb_reviews
に挿入されたレコード、出力先を bqml_tutorial.imdb_reviews_sentiment_analysis
テーブルとしています。
INSERT INTO bqml_tutorial.imdb_reviews_sentiment_analysis
SELECT
ml_generate_text_result['candidates'][0]['content']['parts'][0]['text'] AS generated_text,
* EXCEPT (ml_generate_text_result)
FROM
ML.GENERATE_TEXT(
MODEL `bqml_tutorial.gemini_pro`,
(
SELECT
CONCAT(
'以下の文章を感情分析してください。感情は positive か negative のみで表してください: ',
review) AS prompt,
*
FROM
`bqml_tutorial.imdb_reviews`
),
STRUCT(
0.2 AS temperature,
100 AS max_output_tokens))
;
BigQueryのクエリエディタの "展開" をクリックし、"クエリモードを選択"から"継続的クエリ"を選択します。
エディタの下部に "クエリモード: Continuous" が表示されていれば継続的クエリとなっています。
それでは、実行してみましょう。コンソールから実行した場合、実行中はジョブ情報の上部に以下が表示されます。
継続的クエリの動作を確認するため、データを挿入する前に出力テーブルの状況を確かめます。作りたてのままであれば、クエリ結果は0件となります。
SELECT * FROM bqml_tutorial.imdb_reviews_sentiment_analysis
継続的クエリが処理する対象として、入力テーブル bqml_tutorial.imdb_reviews
にデータを挿入します。
INSERT INTO `bqml_tutorial.imdb_reviews`
SELECT * FROM `bigquery-public-data.imdb.reviews`
limit 5
入力テーブルにデータが挿入され、処理が実行されると、ジョブ情報の "読み取り済みレコード"、"書き込み済みレコード"の値が増えます。
正常に継続的クエリの処理が実行されたか、出力テーブルの状況を確かめます。
SELECT * FROM bqml_tutorial.imdb_reviews_sentiment_analysis
Negative / Positive 判定した結果が出力先テーブルに書き込まれました。
リバースETL
前段では処理結果を BigQuery テーブルに書き込みました。BigQuery のSQLでは EXPORT
にも対応しており、Cloud Storage、Bigtable、Pub/Sub、Spanner等にデータ出力が可能です。
ここからはEXPORT
を用いて、Bigtable と Pub/Sub に出力してみます。
Bigtable
実施前に出力先である Bigtable のテーブルを作成しておきます。
前章と同様のSQLを Bigtable への EXPORT
向けに修正し、継続的クエリモードで実行します。
EXPORT DATA
OPTIONS (
format = 'CLOUD_BIGTABLE',
truncate = TRUE,
overwrite = TRUE,
uri = 'https://bigtable.googleapis.com/projects/your-project/instances/sample-20241217-01/tables/sample-20241217-01')
AS (
select
CAST(movie_id AS STRING) AS rowkey,
struct(
ml_generate_text_result['candidates'][0]['content']['parts'][0]['text'] AS generated_text,
prompt,
label,
reviewer_rating,
title
) as features
from
ML.GENERATE_TEXT(
MODEL `bqml_tutorial.gemini_pro`,
(
SELECT
CONCAT(
'以下の文章を感情分析してください。感情は positive か negative のみで表してください: ',
review) AS prompt,
*
FROM
`bqml_tutorial.imdb_reviews`
),
STRUCT(
0.2 AS temperature,
100 AS max_output_tokens))
)
入力テーブル bqml_tutorial.imdb_reviews
にデータを挿入します。
INSERT INTO `bqml_tutorial.imdb_reviews`
SELECT * FROM `bigquery-public-data.imdb.reviews`
limit 5
Bigtable のテーブルを確認します。今回は継続的クエリ開始前のデータを除外する処理を含めていないため、元々あったデータと直前に挿入したデータの両方が対象となります。
Pub/Sub
実施前に出力先である Pub/Sub のトピックを作成しておきます。
前章と同様のSQLを Pub/Sub への EXPORT
向けに修正し、継続的クエリモードで実行します。
EXPORT DATA
OPTIONS (
format = 'CLOUD_PUBSUB',
uri = 'https://pubsub.googleapis.com/projects/your-project/topics/sample-1')
AS (
select
TO_JSON_STRING(
struct(
ml_generate_text_result['candidates'][0]['content']['parts'][0]['text'] AS generated_text,
prompt,
label,
movie_id,
reviewer_rating,
title
)
) as messege
from
ML.GENERATE_TEXT(
MODEL `bqml_tutorial.gemini_pro`,
(
SELECT
CONCAT(
'以下の文章を感情分析してください。感情は positive か negative のみで表してください: ',
review) AS prompt,
*
FROM
`bqml_tutorial.imdb_reviews`
),
STRUCT(
0.2 AS temperature,
100 AS max_output_tokens))
)
入力テーブル bqml_tutorial.imdb_reviews
にデータを挿入します。
INSERT INTO `bqml_tutorial.imdb_reviews`
SELECT * FROM `bigquery-public-data.imdb.reviews`
limit 5
当該 Pub/Sub のサブスクリプションを確認します。こちらも生成AIで拡張したデータが Pub/Sub に出力されています。
継続的クエリの停止
継続的クエリのジョブは明示的に停止するまで継続的に実行されます。継続的クエリを停止するには、ジョブをキャンセルしてください。
コンソールから実行した場合は、"キャンセル"をクリックすることでジョブを停止できます。
まとめ
継続的クエリを使用して BigQuery に挿入されたデータに ML.GENERATE_TEXT
関数を適用してデータの拡充を行いました。このように、ストリームデータの処理を BigQuery 上で実行することできます。
現時点では、まだ扱える処理がやや少ない状況ではありますが、本稿で試した生成AI関数によるデータの拡充とリバースETL以外にも、推論や予測、レコメンド、異常検知、データの前処理等を継続的クエリで実施することで、 高度な脅威の検知やよりリアルタイムなデータマートの作成、BigQuery の処理結果をアプリケーションのバックエンドへ反映といったことが実現できます。
BigQuery の用途が広がる機能となりますので、ぜひお試しいただけますと幸いです。
Discussion