🌟

yfinanceのAPIで取得した株価データを自動で収集・可視化するデータパイプラインを構築(Google Cloud)

に公開

はじめに

本記事では、GCP(Google Cloud Platform)のサービスを組み合わせて、yfinanceのAPIで取得した株価データを自動で収集・可視化するデータパイプラインを構築した過程を解説します。

実務でBigQueryには触れていたものの、データパイプラインを一から設計・構築する経験はなかったので、個人の学習プロジェクトとしてこのタスクに取り組みました。

作りたいもの:全自動の株価モニタリングシステム

今回構築するシステムは、以下の3つのステップを毎日自動で実行します。

  1. データの取得・加工: yfinanceから優良企業の株価データを取得・整形します。
  2. データの格納: 整形したデータをBigQueryに格納します。
  3. データの可視化: BigQuery上のデータをLooker Studioで可視化し、いつでもモニタリングできるダッシュボードを構築します。

システムの全体像は以下の通りです。

Looker Studioの画像

なぜこれらのサービスを選んだのか?

このシステムを構築するにあたり、GCPの様々なサービスの中から最適なものを選定しました。

1. データの取得・加工: Cloud Functions

データを取得する処理をGCP上で実行するため、Cloud Functionsを選択しました。同じような用途で使えるサービスは他にもありますが、以下の理由でFunctionsが最適だと判断しました。

  • Dataflow: 大規模なバッチ処理やストリーミング処理に適したサービスです。今回は数千〜数万件の比較的少量のデータを扱うため、Dataflowの機能はオーバースペックだと判断しました。
  • Cloud Data Fusion: データを取得・変換するETL/ELTパイプラインをGUI上で構築できます。しかし、yfinanceのAPIを叩くにはロジックをコードで記述する必要があり、GUIでの操作が難しいと判断しました。
  • Cloud Functions: コンテナベースで実行環境を自由にカスタマイズできる柔軟なサービスです。Cloud Functionsでも十分要件を満たせることから、よりシンプルで軽量なCloud Functionsを選びました。

2. バッチ処理:Cloud Scheduler

毎日決まった時間にデータ取得処理を実行するためには、Cloud Schedulerが最適です。Cloud Functionsを定期的にHTTPリクエストで叩くように設定することで、完全に自動化されたデータパイプラインを構築できます。

3. データの格納:BigQuery

実務で慣れていることから、データの格納先としてBigQueryを選定しました。Looker Studioとの連携もスムーズです。

4. ダッシュボード:Looker Studio

Looker Studio は、BigQueryとネイティブに連携しており、無料で使える強力なBIツールです。特別な権限設定をすることなく、BigQueryのデータを直接可視化できるため、アウトプットツールとして最適な選択でした。


実装手順とコード解説

ここからは、実際に構築したシステムの詳細をステップバイステップで解説します。

事前準備:優良企業マスタの作成とサービスアカウント権限の付与

yfinanceからデータを取得するため、あらかじめモニタリングしたい企業のティッカーコード(証券コード)をBigQueryのテーブル(excellent firms)に登録しておきます。このテーブルを「優良企業マスタ」として使用しました。

また、GCPサービス間で連携を行うため、適切な権限を持つサービスアカウントを作成し、設定します。

サービスアカウント 役割 必要な主な権限
sa-functions Cloud Functionsの実行者 BigQuery データ編集者<br>BigQuery ジョブユーザー
sa-deploy Cloud Buildのデプロイ実行者 Cloud Functions 開発者<br>Artifact Registry 書き込み
sa-build Cloud Build全体の実行者 サービスアカウントユーザー (デプロイ先のSAに対する権限)

Cloud Functionsのコード

BigQueryからティッカーリストを取得し、yfinanceでデータを取得・加工、そしてpandas_gbqでBigQueryに書き込む処理をPythonで記述します。メモリ上限を超えないよう、データを分割して書き込むように最適化しました。

import google.cloud.bigquery as bigquery
import yfinance as yf
import pandas as pd
import pandas_gbq
import functions_framework
from datetime import datetime, timedelta
import os

@functions_framework.http
def fetch_stock_price(request):
    # プロジェクトIDとテーブルIDを環境変数から取得
    project_id = os.environ.get('GCP_PROJECT')
    table_id = f'{project_id}.finance.stock_price'
    
    # BigQueryクライアントを初期化
    client = bigquery.Client(project=project_id)
    
    # BigQueryから優良企業マスタのティッカーリストを取得
    query = "SELECT Ticker FROM `my-project-1567934249798.finance.excellent_firms`"
    query_job = client.query(query)
    tickers = [row.Ticker for row in query_job.result()]
    
    # データを追記するための設定
    if_exists_mode = 'append'

    for ticker in tickers:
        try:
            # yfinanceの仕様に合わせるために.Tを追加
            yf_ticker = f"{ticker}.T"
            
            # yfinanceから株価データを取得
            today = datetime.now()
            yesterday = today - timedelta(days=1)
            start_date = yesterday.strftime('%Y-%m-%d')
            end_date = today.strftime('%Y-%m-%d')
            
            data = yf.download(
                tickers=yf_ticker,
                start=start_date,
                end=end_date,
                interval="1d",
                multi_level_index=False
            )

            # データフレームの加工
            if data.empty:
                print(f"Skipping {yf_ticker}: No data found for the period.")
                continue

            data['Ticker'] = ticker
            data.reset_index(inplace=True)
            cols = ['Ticker', 'Date'] + [col for col in data.columns if col not in ['Ticker', 'Date']]
            data = data[cols]

            # BigQueryへの書き込み(インクリメンタルな書き込みでメモリ消費を抑制)
            pandas_gbq.to_gbq(
                data,
                table_id,
                project_id=project_id,
                if_exists=append
            )


    return 'Data loaded to BigQuery successfully!', 200

Cloud Schedulerの設定

GCPコンソールからCloud Schedulerのジョブを作成し、以下のように設定します。

  1. ターゲット: HTTP
  2. URL: デプロイしたCloud FunctionsのトリガーURL
  3. 認証: 認証ヘッダーとしてOIDC(OpenID Connect)トークンを選択し、サービスアカウントにCloud Functions 起動元ロールを付与します。
  4. 頻度: 0 9 * * * (毎日午前9時(UTC)に実行)など、任意のスケジュールを設定します。

これにより、Cloud Functionsが定期的に実行され、データが自動でBigQueryに格納されるようになります。


アウトプット

最終的に完成したシステムは、GitHubで公開しています。
記事には書き切れていない処理(githubとCloud functions連携による、自動デプロイ、財務情報の取得など)も記載しています

さいごに

本プロジェクトを通じて、GCPのサービスを組み合わせてデータパイプラインを一から構築する経験を積むことができました。特に、Cloud Functionsのメモリ最適化サービスアカウントの権限管理などが勉強になりました。

ただ、取り組んでいる中でクラウド周りの知識が甘く、いまいち理解しきれない記事も多かったので、そのあたりの情報も今後キャッチアップしていきたいです。

Discussion