BigQuery と Gemini で手軽に画像から情報抽出
この記事は MICIN Advent Calendar 2024 の 15日目の記事です。
前回は Ryo Shindo さんの、「Hono でつくる、うすい URL 短縮」 でした。
はじめに
DS チームの宮前です。MICIN の DS チームでは、事業部のデータから新たな価値を創出するための施策を行っています。ある日事業部から「分析に使いたいデータがあるんだけど、画像の中にしかなくて困っている」という話をもらいました。この記事では、そのときに解決策として採用した、BigQuery と Gemini を用いて画像から情報を抽出する方法をご紹介します。
なぜ BigQuery か
事業部から詳細を聞いたところ、以下のような条件があることが分かりました。
- 画像は Cloud Storage にある
- 画像のメタデータは BigQuery にある
- 画像とそのメタデータは毎日特定の時間に追加される
- 抽出結果は BigQuery テーブルに格納する
よって今回やりたいことは「Cloud Storage にある画像から特定の情報を抽出して、BigQuery テーブルに格納し、毎日特定の時間に定期実行する」となります。
BigQuery には BigQuery ML という、SQL で機械学習モデルを用いて推論ができる超便利機能が存在します。
またオブジェクトテーブルを使うと、BigQuery から Cloud Storage の画像にアクセスできます。
しかも BigQuery ML で Gemini を呼び出して、その入力にオブジェクトテーブルを指定すると、画像を直接処理できてしまうのです(参考ブログ)。
これらは全て SQL で完結するので、定期実行も BigQuery の Scheduled Query を設定すれば OK です。
このように、全てが BigQuery 上で完結するので、とても便利です。
実際に実装してみる
では実際に Terraform で実装していきたいと思います。
実際のタスクの詳細は公開できないので、今回は「本の表紙からタイトルと著者名を抽出する」というタスクを想定します。
外部接続の作成
まず BigQuery から Vertex AI と Cloud Storage にアクセスするための外部接続を作成します。
以下では book-cover
という名前の外部接続を作成し、そのサービスアカウントに必要なロールを付与しています。
resource "google_bigquery_connection" "connection" {
project = local.project_id
location = local.region
connection_id = "book-cover"
cloud_resource {}
}
resource "google_project_iam_member" "connection" {
for_each = toset(["roles/aiplatform.user", "roles/storage.objectViewer"])
project = local.project_id
role = each.value
member = "serviceAccount:${google_bigquery_connection.connection.cloud_resource[0].service_account_id}"
}
オブジェクトテーブルの作成
次にオブジェクトテーブルを作成します。
以下では book_cover_sample
バケット配下の画像を参照する object_table
という名前のオブジェクトテーブルを作成しています。
resource "google_bigquery_table" "object_table" {
project = local.project_id
deletion_protection = false
dataset_id = local.dataset_id
table_id = "object_table"
external_data_configuration {
connection_id = google_bigquery_connection.connection.name
autodetect = false
object_metadata = "SIMPLE"
source_uris = ["gs://book_cover_sample/*"]
}
}
リモートモデルの作成
次に BigQuery から Gemini を呼び出せるように準備します。
リモートモデルの作成は SQL でしか行えないので、今回は local-exec で行うことにしました。
以下では remote_model
という名前で Gemini 1.5 Flash のリモートモデルを作成しています。
resource "terraform_data" "remote_model" {
provisioner "local-exec" {
command = <<EOT
bq query --use_legacy_sql=false "
CREATE OR REPLACE MODEL \`${local.project_id}.${local.dataset_id}.remote_model\`
REMOTE WITH CONNECTION \`${google_bigquery_connection.connection.name}\`
OPTIONS (ENDPOINT = 'gemini-1.5-flash-001')
"
EOT
}
}
Scheduled Query の作成
最後に Scheduled Query を作成して、SQL が定期実行されるようにします。
以下では extract-title-and-author
という名前で、日本時間6時に実行される Scheduled Query を作成しています。
SQL の中身は長いので別ファイルに分けました。
resource "google_service_account" "scheduler" {
project = local.project_id
account_id = "extract-title-and-author"
}
resource "google_project_iam_member" "scheduler" {
project = local.project_id
role = "roles/bigquery.admin"
member = "serviceAccount:${google_service_account.scheduler.email}"
}
resource "google_bigquery_data_transfer_config" "scheduler" {
project = local.project_id
location = local.region
display_name = "extract-title-and-author"
data_source_id = "scheduled_query"
schedule = "every day 21:00" # 日本時間6:00
params = {
query = templatefile("./extract_title_and_author.sql", {
model_id = "${local.project_id}.${local.dataset_id}.remote_model",
object_table_id = "${local.project_id}.${local.dataset_id}.object_table",
output_table_id = "<出力先のテーブルのID>"
})
}
service_account_name = google_service_account.scheduler.email
depends_on = [google_project_iam_member.scheduler]
}
extract_title_and_author.sql
の中身は以下のようになっています。
CREATE TEMP TABLE llm_outputs AS
WITH target_records AS (
/*
* メタデータのテーブルから対象となる画像を特定します。
* 今回は毎日実行するので、直近一日で追加された画像を絞り込みます。
* 後続の処理のために、画像のURIをuriというカラム名で持つようにしておきます。
*/
)
SELECT *
FROM ML.GENERATE_TEXT(
MODEL `${model_id}`,
(
SELECT *
FROM ${object_table_id} AS o
WHERE EXISTS (
SELECT 1 FROM target_records AS t
WHERE o.uri = t.uri
)
),
STRUCT (
'本の表紙からタイトルと著者名を抽出して、titleとauthorをキーに持つJSON形式で出力すること。' AS prompt,
128 AS max_output_tokens,
0.0 AS temperature,
TRUE AS flatten_json_output
)
);
INSERT INTO ${output_table_id}
SELECT
uri,
JSON_VALUE(RTRIM(LTRIM(ml_generate_text_llm_result, "```json"), "```"), "$.title") AS title,
JSON_VALUE(RTRIM(LTRIM(ml_generate_text_llm_result, "```json"), "```"), "$.author") AS author
FROM llm_outputs;
ここでいくつかハマったポイントを紹介します。
ML.GENERATE_TEXT の結果は一時テーブルに保存する
「あれ?なんでわざわざ一時テーブルを作っているの?」と思われるかも知れません。確かに本来は以下のように直接取得したいところです。
SELECT
uri,
JSON_VALUE(RTRIM(LTRIM(ml_generate_text_llm_result, "```json"), "```"), "$.title") AS title,
JSON_VALUE(RTRIM(LTRIM(ml_generate_text_llm_result, "```json"), "```"), "$.author") AS author
FROM ML.GENERATE_TEXT(
-- ここの中身は上と同じなので省略。
)
ただ今回のケースでこうすると、処理が一向に終わらないという事象が起こりました。
オブジェクトテーブルを絞り込まない場合には発生しない?ようなので、絞り込みのせいかも知れませんが、詳しい原因はよく分かりません。
SELECT *
FROM ML.GENERATE_TEXT(
-- ここの中身は上と同じなので省略。
)
一方でこのようにすると問題なく処理が終わるんですよね。。
そこで bqml-feedback@google.com にメールで問い合わせたところ、以下のような返事がきました。原文は英語なので、なんとなくで翻訳しています。
2つのクエリでは生成される execution plan が異なっていて、前者のクエリでは十分な並列処理が行われないことが原因のようです。今後修正を行います。
それまでの間、SELECT * の結果を一時テーブルに保存して、そこから uri と ml_generate_text_llm_result カラムを取得するようにしてみてください。
execution plan はクエリプランとタイムラインで言うクエリプランに当たるものだと思われます。
確認してみると確かに異なってはいるのですが、詳しいことは分からないので、アドバイスの通り一時テーブルに保存する方法を取ることにしました。
出力先は明示的に INSERT する
google_bigquery_data_transfer_config のドキュメントを見ると、以下のパラメーターで出力先を指定しています。
-
destination_dataset_id
: 出力先のデータセット名 -
params.destination_table_name_template
: 出力先のテーブル名 -
params.write_disposition
: 出力先への書き込み方法
ただ今回のようにマルチステートメントクエリを使用していると、以下のようなエラーが発生します。
Invalid value: configuration.query.destinationTable cannot be set for scripts; JobID: xxx
そのためこれらのパラメーターを削除して、代わりに SQL で明示的に INSERT することで対処しました。
おまけの Tips
メインの実装はこれでおしまいですが、もう少しだけ、他のサイトやブログでは見つけられなかった Tips を共有します。
バケットが別プロジェクトにある場合
データの管理と処理の実装が別プロジェクトになるということもあると思います。
色々試した結果、このパターンでは以下のようにするのが良さそうです。
- 実装プロジェクト(プロジェクト1)
- 外部接続を作成するときに、そのサービスアカウントに
roles/aiplatform.user
を付与する-
roles/storage.objectViewer
はいりません
-
- オブジェクトテーブルを作成するときに、プロジェクト2のバケットを指定する
- あとはそのまま Scheduled Query の作成まで行う
- 外部接続を作成するときに、そのサービスアカウントに
- バケット管理プロジェクト(プロジェクト2)
- プロジェクト1の外部接続のサービスアカウントに、当該バケットに対する
roles/storage.objectViewer
を付与する - プロジェクト1の Vertex AI のサービスアカウントに、当該バケットに対する
roles/storage.objectViewer
を付与する
- プロジェクト1の外部接続のサービスアカウントに、当該バケットに対する
出力先テーブルが別プロジェクトにある場合
このパターンでは以下のようにします。
- SQL の INSERT 先を別プロジェクトのものにする
- Scheduled Query のサービスアカウントに、別プロジェクトのテーブルのデータセットに対する
roles/bigquery.dataEditor
を付与する- 書き込み権限はテーブル単位では付与できず、データセット単位での付与になります
おわりに
「まだ使ったことなかった」「これから使ってみようと思っていた」という方、同じような課題が降ってきた方にとって、何かのヒントになれば幸いです。
MICINではメンバーを大募集しています。
「とりあえず話を聞いてみたい」でも大歓迎ですので、お気軽にご応募ください!
Discussion