🍆

Cloud Storageにアップロードすると、自動的に Gemini API に処理を投げてデータ加工するパイプラインの土台を作る

に公開

スクレイピングをしていると、次に控えるデータ加工のためにローカルでデータを持ったままだと、その存在が気になって精神的に辛いです。

理想を言えば、スクレイピングしてその場で Cloud Storage にアップロードしたら、自動的に Gemini API を叩いてデータ加工を済ませてくれて、Cloud Storageに再格納して欲しい......。スクレイピング作業だけに集中したいんや...。

なんと、Google Cloud では、そんな理想的なデータ加工のパイプラインを簡単に構築できます!

夢のパイプラインを実現するための構成

構成は下記のようなイメージとなります。

  1. Cloud Storage にアップロードする。
  2. アップロードがトリガーとなって、 Cloud Run Functions にHTTPリクエストが飛ぶ。
  3. Cloud Run Functionsで Cloud Tasks にジョブ(タスク)作成をしてもらう。
  4. Cloud Tasks がジョブを制御して Cloud Run Functions にHTTPリクエストが投げられる。
  5. Cloud Run Functions によって Gemini API を叩いて、結果を Cloud Storage に保存して終了。 Compute Engine のVMから Gemini APIを叩いて結果を Cloud Storage に保存して終了。(Cluod Run Functionsでは await の処理がCPU/メモリ使用時間で課金枠を圧迫してしまいお金がかかりすぎてしまいます。VPSにするか、無料提供の米国リージョンのe2-microのCompute Engineを使う方が適しています。)

という流れです。
このパイプラインを構築してしまえば、あとはひたすらにスクレイピングして、アップロードするだけで自動的に加工済みデータに変換されるようになります。

本記事では1~4までの "パイプラインとしての土台" の構築について説明しています。

僕はスクレイピングに全集中できる上、Gemini APIのレート制限すら一切気にすることなく、えげつないほどにアップロードしても、このパイプラインがデータ加工の工程を完全にクラウド上で勝手に完了してくれるようになります。

0. 事前準備(Cloud Storageにバケットを作っておく)

Cloud Storage に独立したバケットを用意しておきましょう。このバケットに何らかのファイルがアップロードされたことをトリガーとして動くことになります。

アップロード用のバケット、加工済みデータ格納のバケットは分けておきましょう。

1. Cloud Tasks にキューを作っておく

ジョブ(タスク)を登録する先として、キューを作っておく必要があります。このキューの中にジョブ(タスク)を溜め込んでいくことで、Cloud Tasksが順次タスク(Gemini APIを使ったデータ加工など)を開始してくれます。

https://console.cloud.google.com/cloudtasks

こちらから「キューの作成」ボタンを押して、以下を参考に入力してキューを作成します。

項目名 何を入力するのか
キュー名 このキューの名前を決めてください。なんでもOKです。(例: gemini-queue)
リージョン 東京または大阪がオススメです。

次に、作ったキューの編集をします。
キューの名前をクリックして「キーを編集」で編集画面を開きます。

項目名 何を入力するのか
最大ディスパッチ数 実行トークンの回復スピードを指定。 1/秒 ぐらいでもいいと思います。
※トークンが回復 = 発火準備が整うイメージ。
最大同時ディスパッチ数 最大の同時実行数を指定。Gemini APIのレート制限に引っかからないぐらいの絶妙な値を見つけてみてください。値が大きくなる場合は、最大ディスパッチ数の数字も上げておきましょう。

その他の値はお調べになって適切な値を設定しておいてください。
本記事で必要な値はこの2つなので、これだけを扱っています。

2. タスク追加のための Cloud Run Functions を作る

次に、Cloud Storage トリガーからリクエストを受け取ったら、Cloud Tasks にタスクを追加するような仕組みを Cloud Run Functions で作ります。

ローカルにプロジェクトフォルダを作って、ファイルを設置したり、記述をしていきます。ディレクトリ構造や各種設定は下記に畳んでおきましたのでご覧ください。

ディレクトリ構造
.プロジェクト名
├── 📂src/
│   └── 📄index.ts
├── 📄.gitignore
├── 📄package.json
└── 📄tsconfig.json
package.json
package.json
{
  "main": "dist/index.js",
  "type": "module",
  "scripts": {
    "build": "tsc && tsc-alias",
    "postbuild": "tsc-alias",
    "start": "node dist/index.js"
  },
  "dependencies": {
    "@google-cloud/storage": "^7.16.0",
    "@google-cloud/tasks": "^6.1.0",
    "@google/genai": "^1.0.1",
    "@hono/node-server": "^1.14.2",
    "hono": "^4.7.10",
    "rimraf": "^6.0.1",
    "tsc-alias": "^1.8.16"
  },
  "devDependencies": {
    "@types/node": "^22.15.21",
    "typescript": "^5.8.3"
  }
}
tsconfig.json
tsconfig.json
{
  "tsc-alias": {
    "resolveFullPaths": true,
    "verbose": false
  },
  "compilerOptions": {
    "target": "ES2022",
    "module": "ES2022",
    "moduleResolution": "node",
    "baseUrl": "./src",
    "paths": {
      "@utils/*": ["./utils/*"],
      "@routes/*": ["./routes/*"]
    },
    "outDir": "./dist",
    "esModuleInterop": true,
    "forceConsistentCasingInFileNames": true,
    "strict": true,
    "skipLibCheck": true,
    "types": ["node"]
  },
  "include": ["src"]
}

次に、 src/index.ts に hono を使ってWebサーバーを作ります。

src/index.ts
import { serve } from "@hono/node-server";
import { Hono } from "hono";
import { CloudTasksClient, protos } from "@google-cloud/tasks";

const app = new Hono();
// Google Cloud のクライアント(Cloud Run Functionsにデプロイすると自動的に認証してくれる)
const client = new CloudTasksClient();

const PROJECT = "【ここにはGoogle CloudのプロジェクトIDを入れる(windy-territory-hogehogeみたいなやつ)】";
const LOCATION = "【ここにはCloud Tasksのリージョン(ロケーション)を入れる(asia-northeast1とか。)】";
const QUEUE = "【ここにはCloud Tasksのキューの名前を入れる。(1)で作ったキュー名のこと。】";
const URL = "【Cloud Tasksのタスク発火で、どこにリクエストを投るかを指定。つまり、Gemini APIを叩く処理が含まれるURLを指定する】";
const SERVICE_ACCOUNT_EMAIL = "【サービスアカウントのメールを入れる。サービスアカウントの「メール」の欄の文字を入れるだけ。】";

const port = parseInt(process.env.PORT || "8080", 10);

/** ================================================ **/
// キューにジョブ(タスク)を追加する
/** ================================================ **/
type Finalized = {
  id: string; // 例: scraping-articles/かきくけこ.png/1748002374237131
  name: string; // 例: かきくけこ.png
  bucket: string; // 例:scraping-articles
  contentType: string; // 例:image/png
  mediaLink: string; // 例:https://storage.googleapis.com/download/storage/v1/b/hogehoge/hogehogehogehoge.png
};

app.post("/gemini", async (c) => {
  const payload = (await c.req.json()) as Finalized;

  // どこのキューに追加するかを指定するために必要な情報
  const parent = client.queuePath(PROJECT, LOCATION, QUEUE);

  // Cloud Tasks に追加したいタスクを定義
  const task: protos.google.cloud.tasks.v2.ITask = {
    httpRequest: {
      httpMethod: "POST",
      url: URL,
      headers: {
        "Content-Type": "application/json",
      },
      // base64形式で渡す規定がある。この値は Cloud Tasks がジョブを実行するときに、URLへPOSTで渡してくれる。
      // Cloud Storageトリガーで得られたリクエスト内容をそのまま Cloud Tasksに渡しているだけ。
      body: Buffer.from(JSON.stringify(payload)).toString("base64"),
      oidcToken: {
        // サービスアカウントのメールを指定するだけで認証をやってくれる
        serviceAccountEmail: SERVICE_ACCOUNT_EMAIL,
      },
    },
  };

  try {
    const [response] = await client.createTask({ parent, task });
    console.info(`タスク追加: ${response.name}${payload.name}(${payload.mediaLink})`);
    return c.text(`タスク追加: ${response.name}${payload.name}(${payload.mediaLink})`, 200);
  } catch (error) {
    console.error(`タスク追加失敗: ${error}`);
    return c.text(`タスクの追加ができませんでした。`, 500);
  }
});

/** ================================================ **/
// キューに追加されたタスクの実行処理
/** ================================================ **/
app.post("/gemini/scraping-article-data-processing", async (c) => {
  const data = await c.req.json();
  console.log(JSON.stringify(data));

  // ここで Gemini API を叩いたり、
  // Cloud Storage にデータ格納するなど任意の処理を書きましょう。

  return c.json({
    status: 200,
    message: "タスクが実行されましたよ",
  });
});

/** ================================================ **/

serve({
  fetch: app.fetch,
  port: port,
});

この記述では、

URL 何が行われるか
/gemini 指定したキューにタスクを追加。Cloud Storageトリガーから受け取ったリクエストをそのままタスクに渡している。
/gemini/scraping-article-data-processing データ加工処理。今回の場合は Gemini API を叩いたり、その結果を Cloud Storage に保存するなど。

このフォルダは Github にプッシュしておきましょう。

3. Cloud Run Functions にデプロイする。

(2)で書いた src/index.ts をデプロイしていきます。(実際にはビルド後の dist/index.js が動く形になる。)

https://console.cloud.google.com/run

こちらの画面から「リポジトリを接続」をクリック。
先ほどプッシュしたリポジトリを指定して、リージョンは日本ならどこでもOK。

認証は「認証が必要」にしておきましょう。
こうしておくことでセキュアな状態を保てます。

コスト削減のためにも、課金は「リクエストベース」になっていることを確認してください。

あとはそのままで大丈夫です。
「作成」で完了です。

そして、指定したリポジトリのブランチにプッシュすると自動的にビルド&デプロイが開始します。3分くらいかかるので待ちましょう。

4. Cloud Storageトリガーと Cloud Run Functions を紐づけする

デプロイした Cloud Run Functions をクリックして、「トリガー」タブをクリックします。

「トリガーを追加」ボタンを押して、「Cloud Storageトリガー」をクリックします。

項目名 何を入力するか
トリガーの名前 管理しやすい名前を自由に入力。
トリガーのタイプ Google のソース
イベントプロバイダ Cloud Storage
イベントタイプ google.cloud.storage.object.v1.finalized(作成と上書きがトリガーになる)
バケット アップロードトリガーの対象にしたいバケットを選択する。
イベントデータのコンテンツタイプ application/json
サービスURLパス Cloud Tasksにタスク追加を実行するパス。今回の場合は(2)で作った /gemini となる。

「トリガーを保存」をクリックして完了です。

これでパイプラインは完成です!

Cloud Storage にアップロードされたら、 /gemini にPOSTメソッドのリクエストが投げられることになります。それによって、Cloud Tasks にタスク追加が行われ、 Cloud Tasks によって、タスク実行が発火され、 /gemini/scraping-article-data-processing にリクエストが投げられるということになります!

あとは /gemini/scraping-article-data-processing に処理したいこと(例えば、Gemini APIを叩いて、Cloud Storageに自動格納するなど)を実装するだけです!

ちなみに、 /gemini では、bodyにてbase64形式に変換していますが、 /gemini/scraping-article-data-processing にリクエストが投げられるときには自動的にデコードされた状態で届きます。なので、 const data = await c.req.json(); これだけでリクエストを受け取れることになります。

おしまい

このパイプラインを構築するだけで、Gemini APIのレート制限を考慮した自動のデータ加工が実現できます。スクレイピングをやる人にとっては理想的な構成だと思うので、ぜひ取り入れてみてください!

Discussion