Cloud Functionsで外部APIからデータを定期取得しCloud Storageに保存する
はじめに
定期的にAPIからデータを取得して、保存したい。
よくあるケースですが、意外と環境を用意したりcronを組んだりと大変ですよね。
扱うデータが少なかったり、SpreadSheetで管理する場合はGASで実装するのが一番楽な印象です。
拡張性も考えると、Cloud Functionsで取得してStorageに溜め込むのが良さそうです。定期実行はCloud Schedulerで手軽に行えます。
以下の記事のソースコードはGithubで公開してますので、参考にして頂けたら幸いです。
Cloud Functionsで外部APIからデータを定期取得し、Cloud Storageに保存する
前提
- Python 3.7
- Cloud SDK
GCPのプロジェクト、Cloud Storageのバケットは作成済みとしてスタートします。 GCPの始め方は公式チュートリアルが分かりやすくてオススメです。
構成
APIからのデータ取得は、JSON Placeholderをサンプルとして使います。 /postsで返却されるjsonを、Cloud StorageにJSONファイルとして保存するケースで実装してみましょう。
フォルダ構成はGithubを参考にして頂ければと思います。
モジュール
GCPへのデプロイ時に使用するライブラリを明記する必要があるので、まずは以下を参考にrequirements.txt
を作成してください。
google-cloud-storage
requests
実装
# main.py
import requests
import json
from google.cloud import storage
from datetime import datetime, timedelta, timezone
API_ENDPOINT = "https://jsonplaceholder.typicode.com/posts"
BUCKET_NAME = "test-api-data"
def data_uploader(request):
"""fetch data and upload to GCS
"""
JST = timezone(timedelta(hours=+9), 'JST')
data = fetcher(API_ENDPOINT)
file_name = getFileName(datetime.now(JST))
repository(BUCKET_NAME, file_name, data)
return "OK"
def getFileName(time: datetime) -> str:
return f'data_{time}.json'
def fetcher(url: str) -> str:
"""fetch api data and return json string
"""
print(f"Access: {url}")
response = requests.get(url)
# Raise error if status code is not 200
response.raise_for_status()
data = response.json()
return json.dumps(data)
def repository(bucket_name: str, file_name: str, data: str):
"""upload json file to GCS
"""
# Select Bucket
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
print(f'Bucket: {bucket.name}')
# Upload Data
blob = bucket.blob(file_name)
blob.upload_from_string(data, content_type='application/json')
print(f"Uploaded: {file_name}")
JSON Spaceholderから取得したjsonデータを、data_[日付].json
という形式で保存しています。
実装した関数について、軽く説明します。
data_uploader
今回はHTTPリクエストで動作する形式でCloud Functionsを実装しています。
Cloud Functionsのエンドポイントが叩かれると、この関数が実行されます。
今回は使用していませんが、request
オブジェクトからクエリパラメータやPOSTでのオブジェクトを取得できます。
fetcher
APIからのデータ取得を行う関数です。
requestsを使ってjsonをGETするシンプルな実装になっています。 最低限のHTTP statusのハンドリングだけしています。
APIの仕様や返却されるデータの形式に合わせて変更してください。
repository
Cloud Storageへファイルをアップロードする関数です。
Cloud StorageのアップロードはPython用Cloudクライアントライブラリを使用します。
jsonファイルとして保存するため、content_type='application/json'
を指定しています。
Cloud Functionsへのデプロイ
以下のコマンドでGCPにデプロイすることができます。 デプロイされるとdata-uploader
という名前でfunctionが生成されます。
# ログイン
gcloud auth login
# プロジェクトを設定
gcloud config set project <Yout Project>
# デプロイ
gcloud functions deploy data-uploader --entry-point data_uploader --runtime python37 --trigger-http
デプロイ時に、HTTPのエンドポイントを叩くのに認証を必要とするか聞かれます。
Allow unauthenticated invocations of new function [data-uploader]?
(y/N)?
y
とすると未認証を許可します。テスト用であればこちらでOKですが、誰でもエンドポイントを叩けるようになるため、それを避けたい場合はN
にしましょう。 Cloud Schedulerを設定する際に、認証情報を持たせるため定期実行には問題ありません。
デプロイに成功すると以下のように表示されます。
Deploying function (may take a while - up to 2 minutes)...done.
availableMemoryMb: 256
buildId: ...
entryPoint: data_uploader
httpsTrigger:
securityLevel: SECURE_OPTIONAL
url: https://us-central1-<projectName>.cloudfunctions.net/<functionName>
ingressSettings: ALLOW_ALL
labels:
deployment-tool: cli-gcloud
name: projects/...
runtime: python37
serviceAccountEmail: <projectName@appspot.gserviceaccount.com>
sourceUploadUrl: https://storage.googleapis.com/...
status: ACTIVE
timeout: 60s
updateTime: '2021-07-14T14:45:39.883Z'
versionId: '1'
Cloud Schedulerの設定
スケジューラも同様にCLIから設定できます。
gcloud scheduler jobs create http daily-data-uploader --schedule="every 24 hours" --uri=<ENDPOINT> --oidc-service-account-email=<serviceAccountEmail>
scheduleの表記はunix-cron構文とApp-Engine cron構文どちらでも動作します。
urlには、Functionsをデプロイしたときに出力されたhttpsTrigger
のurl
に表示されているエンドポイントを設定します。
oidc-service-account-emailもデプロイ時に出力されたserviceAccountEmail
のアドレスを設定しましょう。
設定された時間にFunctionが実行され、Storageにファイルが保存されていれば成功です。
おわりに
定期的なデータ取得 -> 保存はよく必要になるので、一度実装しておくと取得先と保存先を切り替えるだけで流用できて便利です。
AWSでLambda + S3で実装する記事は見かけるのですが、GCPで実装している記事が少なかったので今回紹介させていただきました。
Storageに貯めたデータはBigQueryに接続して分析ができるので、様々な用途にカスタマイズして使ってみてくれたら嬉しいです。
Discussion