Cloud Storage への書き込みをトリガーにして Filestore ファイル共有に特定のデータをコピーする
はじめに
この記事では、Cloud Storage バケットへの書き込みをトリガーにして、Cloud Run で特定のデータを Filestore ファイル共有にコピーする方法について説明します。
特定のデータとは、例えばファイル名が日付(yyyyMMdd)から始まるデータの2025年分だけをコピーしたい場合を考えます。
前提条件
以下のものを知っている、または設定済であることを前提とします。
- Google Cloud Platform (GCP) アカウント
- CloudShell
- Cloud Storage バケット
- Filestore ファイル共有
- Cloud Run
- Cloud Build
- Pub/Sub
- Python
システム構成
以下のような構成で作成します。
構築手順
1. Cloud Storage バケットの作成
バケットを作成する を参考に Cloud Storage バケットを作成します。
2. Filestore ファイル共有の作成
インスタンスを作成する を参考に Filestore ファイル共有を作成します。
Cloud Run からプライベート接続でアクセスできるようにするため、作成時に「ネットワークの詳細オプション」から「プライベート サービス アクセス接続」設定を忘れずに行います。
3. Cloud Run のサービスを作成する
起動する Cloud Run を準備します。
各プログラムは CloudShell エディタにて適当なフォルダ(ここではsample)を用意し、配下に作成します。作成後のフォルダの中身は以下のようになります。
├── sample
│ ├── Dockerfile
│ ├── requirements.txt
│ └── main.py
│
3.1 Cloud Run の DockerFile を用意する
Cloud Run へデプロイするコンテナのための Dockerfile を用意します。
Filestore の制限事項として書き込み時にはコンテナは root で実行する必要があるため、Dockerfile では root ユーザーを指定しています。
FROM python:3.12-slim
ENV PYTHONUNBUFFERED True
ENV PYTHONIOENCODING utf-8
ENV TZ Asia/Tokyo
ENV PORT=8080
ENV APP_HOME /app
WORKDIR $APP_HOME
COPY ./* ./
RUN pip install --no-cache-dir -r requirements.txt
EXPOSE $PORT
USER root
CMD exec gunicorn --bind :$PORT --workers 1 --threads 8 --timeout 0 main:app
3.2 Cloud Run のプログラムを作成
引き続き、Cloud Run のプログラムを作成します。
Cloud Run サービスに送信された Cloud Storage への書き込みイベントは、HTTP リクエストの JSON 形式で受信されます。
プログラムは、Cloud Storage への書き込まれた対象ファイル名を受信したリクエストから取得し、cp コマンドを使ってマウントしている Cloud Storage バケットから Filestore ファイル共有へコピーを実行します。
- main.py を以下のように変更
import json
import subprocess
from flask import Flask, request, Response
from http import HTTPStatus
app = Flask(__name__)
def logging(severity, message):
print(json.dumps(dict(severity=severity, message=message)))
def copy_file(name):
cmd = ["cp", "-v", "/mnt/gcs/" + name, "/mnt/nfs/" + name]
result = subprocess.run(cmd, shell=False, capture_output=True, text=True)
return result.stdout
@app.route("/", methods=['POST'])
def main() -> Response:
"""Receive and parse Pub/Sub messages."""
envelope = request.get_json()
if not envelope:
logging('WARNING', "no Pub/Sub message received")
return Response(status=HTTPStatus.BAD_REQUEST)
if not isinstance(envelope, dict) or "message" not in envelope:
logging('WARNING', "invalid Pub/Sub message format")
return Response(status=HTTPStatus.BAD_REQUEST)
message = envelope["message"]
objectId = message['attributes']['objectId']
if objectId == "":
logging('WARNING', "objectId is empty")
return Response(status=HTTPStatus.BAD_REQUEST)
else:
logging('INFO', 'Received Request: ' + objectId)
# メッセージを処理する
result = copy_file(objectId)
logging('INFO', result)
return "OK", 200
if __name__ == "__main__":
app.run(debug=True, host="0.0.0.0", port=int(os.environ.get("PORT", 8080)))
- requirements.txt を以下のように変更
Flask==3.0.3
gunicorn==22.0.0
Werkzeug==3.0.3
3.3 Cloud Run のコンテナをビルドする
CloudShell のターミナルを開き、sample フォルダへ移動します。
その後、Cloud Runで実行するサービスのコンテナをビルドします。
export PROJECT_ID=$(gcloud config get-value project)
export SERVICE_NAME=gcs-to-filestore
gcloud builds submit --tag gcr.io/${PROJECT_ID}/${SERVICE_NAME}
3.4 Cloud Run へデプロイする
最後に、ビルドしたコンテナを Cloud Run へデプロイします。
サービスの NFS ボリューム マウントを構成する において、
Cloud Run サービスは、NFS サーバーが実行されている VPC ネットワークに接続されます。最高のパフォーマンスを得るには、VPC コネクタではなくダイレクト VPC を使用します。
と書いてある通り、ダイレクト VPC にて NFS と接続します。
PROJECT_ID、SERVICE_NAME は手順3.3の変数を使用します。
export REGION=us-central1
export BUCKET_NAME=<<手順1. で作成した Google Storage のバケット名>>
export IP_ADDRESS=<<手順2. で作成した filestore のIPアドレス>>
gcloud run deploy ${SERVICE_NAME} \
--image gcr.io/${PROJECT_ID}/${SERVICE_NAME} \
--region=${REGION} \
--network=default \
--subnet=default \
--vpc-egress=private-ranges-only \
--add-volume=name=gcs,type=cloud-storage,bucket=${BUCKET_NAME} \
--add-volume-mount=volume=gcs,mount-path=/mnt/gcs/ \
--add-volume=name=nfs,type=nfs,location=${IP_ADDRESS}:/sample \
--add-volume-mount=volume=nfs,mount-path=/mnt/nfs/
4. Pub/Sub トピックの作成
Cloud Storage バケットへの書き込みイベント通知を受信する Pub/Sub トピックを作成します。Cloud Run Functions 作成時、一緒にトピックも作ることができますが、トピックと同時にサブスクリプションも自動で作成されます。
デフォルトの設定ではサブスクリプションにフィルタリング指定ができず、後から更新することもできないため、個別にトピックの作成を行います。
以下のコマンドを実行します。
gcloud pubsub topics create gcs-to-filestore
作成できた場合、「Created topic ~」というメッセージが表示されます。
5. Pub/Sub サブスクリプションの作成
手順4.で作成した Pub/Sub トピックからデータを配信するサブスクリプションを作成します。このとき、フィルタリングを設定して特定のデータのみを配信するようにします。機能の詳細についてはコチラをご覧ください。
今回の例だと、ファイル名が日付(yyyyMMdd)から始まる場合に2025年のデータを取得したいので、以下のコマンドを実行します。
gcloud pubsub subscriptions create gcs-to-filestore-sub \
--topic=gcs-to-filestore \
--push-endpoint=<<手順3. で作成した Cloud Run の URL>> \
--message-filter='hasPrefix(attributes.objectId, "2025")'
作成できた場合、「Created subscription ~」というメッセージが表示されます。
6. Cloud Storage イベント通知の設定
Cloud Storage バケットへの書き込みイベントを 手順4.で作成した Pub/Sub トピックに通知します。以下のコマンドを実行します。
gsutil notification create \
-t gcs-to-filestore \
-f json \
gs://ex-bucket-2025
作成できた場合、「Created notification ~」というメッセージが表示されます。
動作確認
Cloud Storage バケットにファイルをアップロードして、トリガーをテストします。Cloud Run サービスのログを確認して、ファイルが Filestore ファイル共有にコピーされたことを確認します。
確認の際、Compute Engine で NFS をマウントします。以下のコマンドでマウントができます。
sudo mount -o rw,intr <<filestore のIPアドレス>>:/sample /var/tmp/sample
また、Cloud Storage イベント通知が動作したのかは Pub/Sub の指標から確認します。これにより、フィルタリングによって対象外となるファイルのイベントも Pub/Sub が受信していることが確認できます
- バケットに対象ファイルをアップロード
- Functions の実行をログで確認
- Filestore ファイル共有にコピーされたことを確認
- バケットに対象外のファイルをアップロード
- Pub/Sub の応答を指標で確認
- Filestore ファイル共有にコピーされていないことを確認
追加の考慮事項
- エラー処理:
- Cloud Run のプログラムにエラー処理を実装して、ファイルのコピー中に発生する可能性のあるエラーを処理します
- Pub/Sub サブスクリプションのエラー時の再試行ポリシーやデッドレタリングを設定します
- セキュリティ:
- Cloud Run サービスを保護するために、適切な認証と認可の設定を行います
- Cloud Run サービスの認証に合わせて、Pub/Sub サブスクリプションの認証を有効にします
- パフォーマンス:
- 大量のデータをコピーする場合は、Cloud Run サービスのパフォーマンスを最適化することを検討してください
- フィルタリング:
- フィルタ条件に後方一致を設定したい場合は Cloud Run サービスで判定するなど、別の方法が必要です
さいごに
この記事では、Cloud Storage バケットへの書き込みをトリガーにして、Cloud Run を使用してデータを Filestore ファイル共有にコピーする方法について説明しました。
過去にも似たような記事を書いたことがあるのですが、Filestore のガイドにある インスタンス間でデータをコピーする では、Cloud Storage バケットと Filestore ファイル共有をファイル同期する方法が記載されていますが、特定のデータだけをコピーしたい場合は別の方法が必要だったので試しに作ってみました。
root 権限を持ったコンテナを Cloud Run にデプロイする必要があるなど、いくつかの制限事項が注意点となります。参考になれば幸いです。
Discussion