[Google Cloud]Always Freeプロダクトたちで野球データ分析基盤を作った

2021/05/02に公開

GWの大人の宿題(?)として、MLBのデータ分析を行うための基盤作りを始めた結果、Google Cloudで遊ぶのは初だったにも関わらず、たった半日で、Always Freeなプロダクトだけでデータ分析基盤が出来ちゃったので、その中身をまとめました。

稼働からちょっとしか経っていないですが、ここまでは安定稼働中で、特に課金を食らいそうな気配も無いです笑

全体図

ザッと書くと、、

  • 1日1回、その日行われた試合のデータを収集してCSVを作成、ストレージにCSVを置く
  • ストレージにCSVが置かれたことをトリガーに、置かれたCSVをDWHへインサート
  • DWHと分析・可視化ツールをつなぐ

行っていることとしては3点で、プライベートらしくとてもシンプルな要件の下、簡素に作り上げています。

Google Cloudプロダクト一覧

"Always Free" のかいつまんだ条件も含めて、ここにまとめます。

プロダクト 概要 Always Free条件
Cloud Scheduler cronジョブスケジューラー。HTTPはもちろん、Cloud Pub/Subへのメッセージも発行出来る 請求先アカウント1個につき、3ジョブまでは無料
Cloud Pub/Sub メッセージングサービス 10GB/月まで無料
Cloud Functions Google Cloud上のFaaS(Functions as a Service)。サーバーを立てること無く、簡易な処理を書いたファンクションを動かせる 200万回/月の呼び出しまで無料。リソースは400000GB秒のメモリ、200000GHz秒のCPUまで大丈夫
Cloud Storage ストレージサービス 5GB/月は無料。オブジェクト追加等の変更を伴うオペレーションは5000回/月、参照オペレーションは50000回/月まで大丈夫
BigQuery データウェアハウス(DWH)。カラム型データストアのアーキテクチャを採っており、パフォーマンス良し 10GBのストレージまでは無料。クエリの実行に課金がされるが、計1TBまでは無料

構築手順

日次でCSVデータを作成し、Cloud Storageへ置く処理

Cloud Schedulerで定時にPub/Subへメッセージを送り、それをサブスクライブしたCloud Functionsがデータを取得、CSVをストレージへ置く処理です。

Cloud Storageのバケットを作っておく

Cloud Storageのバケットは事前に作っておきましょう。

Always Freeに収まる様に、プランやリージョンは注意をしておきましょう。

Cloud Functionsを作成する

まずは、サブスクリプションするFunctionsを作ります。
Google Cloudコンソールから左サイドバー「Cloud Functions」→「関数の作成」を押下。

Pub/Subのトピックは、プルダウンを開くと下に「トピックを作成する」があるので、そこからトピックを作ってあげましょう。

ここまで設定した上で、一番下の「次へ」を押すと、実際に動くファンクションの中身を書く画面に行きます。今回は一律Python3.9で実装。

ここで実行されるエントリーメソッドを作り、requirements.txtpip install が必要なライブラリをバージョンと一緒に書いてあげましょう。

また、この時画面上で「Cloud Buildを有効にして下さい」とメッセージが出る場合は、Cloud Buildを有効にした上で、Cloud Buildの設定画面でCloud Functionsのサービスアカウント権限を有効にしてあげましょう。

これで「デプロイ」を押下すれば、デプロイ完了です!

ちなみに今回実行させたスクリプトは下記。pybaseball と言うライブラリ経由で全体図に書いたBaseball Savantからその日のデータを取得して、/tmp に一時ファイル作成、それをCloud Storageへアップロードしています。
# Cloud Functionsの書き込み権限は /tmp 配下のみです。きっと、AWSにしてもAzureにしても同じはず、、

import datetime
import pybaseball as pb
from google.cloud import storage

# このあたりはhoge/fugaでマスキングしてます
CLIENT = storage.Client("hoge")
BUCKET = CLIENT.get_bucket("fuga")

def gen_statcast_data(event, context):
    """Triggered from a message on a Cloud Pub/Sub topic.
    Args:
         event (dict): Event payload.
         context (google.cloud.functions.Context): Metadata for the event.
    """
    dt = datetime.date.today() - datetime.timedelta(days=1)
    dt_str = dt.strftime("%Y-%m-%d")
    
    df = pb.statcast(dt_str)
    df.to_csv(f"/tmp/statcast-{dt_str}.csv", index=False)
    p = BUCKET.blob(f"statcast-{dt_str}.csv")
    print("Start..")
    p.upload_from_filename(f"/tmp/statcast-{dt_str}.csv")
    print("Finish!")
    return

Cloud Schedulerを作成する

cronジョブスケジューラーを作り、先ほど作ったCloud Functionsを定期実行出来る様にします。
Google Cloudコンソールから左サイドバー「Cloud Scheduler」→「ジョブの作成」を押下。

基本はunix-cronの設定と同じですが、今回はcronで実行するのがPub/Subへのパブリッシュなので、下記の部分だけはGoogle Cloud用に注意をしておきます。

トピックはCloud Functionsで作成した物を、メッセージはパブリッシュされれば何でも良いので、適当な文言を入れておきます。

Cloud StorageにCSVが置かれたことをトリガーに、CSVをBigQueryへインサート

定期ジョブによって、その日のデータが日々Cloud Storageに置かれる様になったので、ストレージへの配置をトリガーにBigQueryへのインサートを走らせるCloud Functionsを作ります。

今回はめちゃくちゃ簡単です。先ほど作ったCloud Functionsのうち、トリガーにあたる部分だけが下記に変わるだけなためです。

ファイルがCloud Storageに作成されたことがトリガーになるので、イベントタイプは「ファイナライズ/作成」です。
あとは、先の工程と同じ具合に、ファンクションの中身を作るだけです。下記は今回の実装です。Cloud Functionsを用いたBigQueryへのインサート処理のサンプルとして見てもらえれば、、

from google.cloud import bigquery

# BigQueryクライアント
BQ = bigquery.Client()

DATASET = BQ.dataset("hoge")
CONFIG = bigquery.LoadJobConfig()
CONFIG.autodetect = True
CONFIG.source_format = bigquery.SourceFormat.CSV
# これで追記をしてくれる
CONFIG.write_disposition = "WRITE_APPEND"

def load_statcast(data, context):
    """Responds to any HTTP request.
    Args:
        request (flask.Request): HTTP request object.
    Returns:
        The response text or any set of values that can be turned into a
        Response object using
        `make_response <http://flask.pocoo.org/docs/1.0/api/#flask.Flask.make_response>`.
    """
    context_type = data["contentType"]
    if context_type != "text/csv":
        print('Not supported file type: {}'.format(context_type))
        return

    bucket_name = data["bucket"]
    file_name = data["name"]
    uri = f"gs://{bucket_name}/{file_name}"

    print(f"Target URI: {uri}")
    job = BQ.load_table_from_uri(
        uri, DATASET.table("fuga"), job_config=CONFIG
    )
    print("Start..")
    job.result()
    print("Finish!")
    return

DWHと分析・可視化ツールをつなぐ

BIツール

データ基盤としては上で完了(これで、毎日のデータがBigQueryへ入ってくる上、その中身もBigQueryで高速に処理が出来る)なのですが、エクストラでBigQueryへつなぐとよしなに可視化してくれるBIツールっぽい物、無いかなー、で探してみました。

結論としては、Google Data Studioが良さそうです。
# データポータルとデータスタジオ、どっちなんだろ、、笑

https://marketingplatform.google.com/intl/ja/about/data-studio/

BigQueryやスプレッドシート、一般的なRDBMS等、多種多様なデータソースへのコネクタをサポートしており、かつグラフィック面のサポートも充実しており、かなり簡単にダッシュボードを作れます。

いいじゃん、、笑
基本はデータソースの中身をバチコーンと可視化フォーマットにはめこむイメージですが、Data Studio側でも関数のサポートがあったりと、ある程度融通を効かせられるのもメリットです。

Data StudioからBigQuery側へ発行されているであろうクエリが分からないので、そのあたりはちょっと怖いですが、、このあたりは探る良い方法があるかは宿題事項ですね。

Google CloudのSDK

Jupyter LabやGoogle Colabからは、PythonのSDKが公式にサポートがあるため、それを利用することでアクセスが出来ます。

https://github.com/googleapis/google-cloud-python

まとめ

最高です!

このままで無料で行けるかは1ヶ月様子を見た上で、が正式な結論にはなりますが、ひとまず現在まではたしかに請求も無く、Always Freeだけでここまでやれちゃっています。Google Colabあたりに良く出ていますが、Googleは惜しげも無くマシンパワーを提供してくれる点で、プライベートのプログラマーとしてはめちゃくちゃありがたい限りです。
# 仕事では枯れ具合・ナレッジの多さからAWSにしがちなんですけど、、ごめんなさい🙇‍♂️

これをベースにしながら、より良いプライベートコーディング体験が味わえる構成は都度模索していきたいなー、と思います。

Discussion