📈

Google Cloud Run と Python Streamlit を使ってインデックス投資管理ダッシュボードを作ってみる

2022/05/15に公開

architecture

Cloud Run with IAP に可能性を感じて作ったアプリについて書いていきます。

やりたいこと

せっかく作るので、多少は使えそうなものを作ろうと思い、普段投資している ETF の管理ができるアプリを作ることにしました。投資自体は長期投資目的で月イチくらいでしかやらないので、ポートフォリオ全体がどんな状態になっているか可視化してみることにしました。以下、要件 + やってみたいことです。

技術的な観点
  • Cloud Run と IAP で認証付きアプリを作りたい
  • 運用にコストをかけたくないので、なるべく自動化したい
Biz 的な観点
  • ぱぱっと使えるインデックス投資管理アプリがほしい
    → 時間をかけたくないので、最小限の操作と、最低限の情報だけ表示したい
    (TradingView と iPhone の株価アプリの間くらいのイメージ)
  • ETF の直近の値動きとか、投資している ETF の割合を可視化したい
    → お買い得かどうかとか、ポートフォリオの割合とかを見て
    どの銘柄に投資するか判断する材料にしたい
  • 株価データは週 1 回くらいは更新したい
    → 毎日見るものではないけど、データが古すぎてもあれなので、
    とりあえず週 1 回にしておく
  • Google 認証をつけて、自分だけがアクセスできるようにしたい
    → 自分の投資状況を全世界に公開するのはちょっと恥ずかしい

アプリ構成

基本的には、データをインターネットから取ってきたら適当にデータ変換を行い、それを可視化アプリから読み出すような仕組みになります。

architecture

以下、使ったものです。

データ取得・変換・蓄積
  • Cloud Function: データ取得、データ変換のスクリプト実行元
  • Cloud Storage: データ蓄積用、データはここから API で直接取る
  • pandas-datareader: 株価データを取得するために利用
  • gcsfs: Cloud Function からデータを取るために利用。Python ライブラリ
データ可視化
  • Cloud Run: 可視化アプリの実行元
  • Cloud IAP(Identity-Aware Proxy): 簡単に(ノーコードで)Google 認証をかけれる
  • Streamlit: Python だけでまあまあきれいなダッシュボードがつくれる
  • Plotly: Python ライブラリの中ではきれいなグラフがつくれる
開発環境
  • macOS/WSL2
  • VScode
  • pyenv+poetry: 開発環境が複数あるので、Python とライブラリのバージョン統一のために利用
  • Cloud Build: GitHub と連携して、Push したら自動で Cloud Run / Cloud Functions にデプロイするように設定

機能ごとの簡単な説明とコード

データ取得・変換

ここでは主に、pandas-datareader を使って銘柄ごとにデータを保存、その後 pandas でデータ変換を行っています。取得する銘柄はretrieve_target.yamlに、ポートフォリオを構成する銘柄はportfolio.yamlに記載します。datareader では複数のデータソースを使い分ける必要があったので、retrieve_target.yamlには データソースを記載しています。portfolio.yamlには銘柄ごとに通貨、説明、所持数、資産タイプを記載しています。

retrieve_target.yamlの一部
ticker:
  1306.JP:
    source: 'stooq'
  1540.JP:
    source: 'stooq'
  JPY=X:
    source: 'yahoo'
portfolio.yamlの一部
portfolio:
  1306.JP:
    currency: JPY
    detail: NEXT FUNDS TOPIX連動型上場投信
    number_of_hold: 1000
    type: stock
  1540.JP:
    currency: JPY
    detail: 純金上場信託(現物国内保管型)
    number_of_hold: 1500
    type: commodity
  BND:
    currency: USD
    detail: VANGUARD TOTAL BOND MARKET ETF
    number_of_hold: 100
    type: bond

YAML はこちらのスクリプトで読み出します。ここでは、前半でデータ取得&生データ保存、後半でデータマート用のデータ変換をしています。ダッシュボードでは、ポートフォリオ全体の資産推移と、各銘柄・資産タイプが占める割合を表示しようと考えました。そのためデータ変換では、日本円への変換や、所持数を乗算したポートフォリオ全体の価値(日本円換算)計算などを行っています。
表示に使う株データについてですが、データ構造上 NaN を許容しなければならない一方、ダッシュボードでは少数桁が不要(むしろ見ずらいので邪魔)なので、データ型はInt64にしました。データモデリングについては、今後加筆するかもしれないです。
Cloud Functions で使うスクリプトなので、名前がmain.pyになっているのと、巨大な関数になっているのは気になっているポイントです。。

main.py
import pandas_datareader.data as web
from datetime import datetime
import pandas as pd
import numpy as np
import yaml
import os
from dotenv import load_dotenv
import gcsfs
import time


def retrieve_and_etl(event, context):

    # load environment variables for local
    load_dotenv(".env")

    PROJECT_NAME = os.environ.get("PROJECT_NAME")
    fs = gcsfs.GCSFileSystem(project=PROJECT_NAME)

    # load target tickers config
    with fs.open(
        "gcs://invest-analytics/invest-analytics-config/retrieve_target.yaml", "rb"
    ) as file:
        targets = yaml.safe_load(file)

    # set retrieve date
    start = datetime(2017, 1, 1)
    end = datetime.today()

    # retrieve tickers' CSV
    print(f"retrieve from {start.strftime('%Y%m%d')} to {end.strftime('%Y%m%d')}")
    for k, v in targets["ticker"].items():
        # read ticker and source
        ticker = k
        source = v["source"]

        # retrive data
        if source == "stooq":
            stock = web.StooqDailyReader(ticker, start=start, end=end).read()
        elif source == "yahoo":
            stock = web.DataReader(ticker, "yahoo", start=start, end=end)

        # data null check
        if len(stock) == 0:
            raise ValueError("retrieved data is null")

        # save data
        stock = stock.reset_index(drop=False)
        # stock.to_csv(f"../data/{ticker}.csv", index=None)
        with fs.open(f"gcs://invest-analytics/rawdata/{ticker}.csv", "wb") as f:
            stock.to_csv(f, index=False)
        print(f"successfuly retrived: {ticker}")

        # be gentle to data source
        time.sleep(1)

    # ETL
    # create date index
    stc_dt = pd.DataFrame(columns=["Date"])
    for t in targets["ticker"].keys():
        with fs.open(f"gcs://invest-analytics/rawdata/{ticker}.csv", "rb") as f:
            df = pd.read_csv(f)
        stc_dt = pd.concat([stc_dt, df.loc[:, ["Date"]]], axis=0)
    stc_dt = stc_dt.drop_duplicates(subset="Date", keep="first")
    stc_dt = stc_dt.sort_values("Date")

    # create data mart for stocks
    stc = stc_dt
    for t in targets["ticker"].keys():
        with fs.open(f"gcs://invest-analytics/rawdata/{t}.csv", "rb") as f:
            df = pd.read_csv(f)
        df = df.add_suffix(f"_{t}")
        df_unq = f"Date_{t}"
        stc = pd.merge(stc, df, how="left", left_on="Date", right_on=df_unq)

    # exchange usd to jpy
    def usd_to_jpy(x, col):
        if np.isnan(x[col]) or np.isnan(x["Close_JPY=X"]):
            return np.nan
        else:
            return x[col] * x["Close_JPY=X"]

    # load portfolio config
    with fs.open(
        "gcs://invest-analytics/invest-analytics-config/portfolio.yaml", "rb"
    ) as file:
        portfolio = yaml.safe_load(file)

    for k, v in portfolio["portfolio"].items():
        # read ticker and source
        ticker = k
        config = v
        # print(ticker, config)

        # calculate USD -> JPY / if original is JPY then rename
        if config["currency"] == "USD":
            stc[f"Close_{ticker}_JPY"] = stc.apply(
                lambda x: usd_to_jpy(x, f"Close_{ticker}"), axis=1
            )
        elif config["currency"] == "JPY":
            stc[f"Close_{ticker}_JPY"] = stc[f"Close_{ticker}"]
        else:
            raise KeyError(f"Invalid currency: {config['currency']}")

    # calculate portfolio value sum
    def calc_pf(x):
        pf_sum = 0
        for t in portfolio["portfolio"].keys():
            pf_sum += x[f"Close_{t}_JPY"] * portfolio["portfolio"][t]["number_of_hold"]
        return pf_sum

    stc["pf"] = stc.apply(lambda x: calc_pf(x), axis=1)

    # convert to int
    for k in portfolio["portfolio"].keys():
        ticker = k
        stc[f"Close_{ticker}_JPY"] = np.floor(
            pd.to_numeric(stc[f"Close_{ticker}_JPY"], errors="coerce")
        ).astype("Int64")
    stc["pf"] = np.floor(pd.to_numeric(stc["pf"], errors="coerce")).astype("Int64")

    # save stocks datamart
    with fs.open("gcs://invest-analytics/datamart/stocks.pkl", "wb") as f:
        stc.to_pickle(f)

    # calculate most recent value
    recent_sum = 0
    for k in portfolio["portfolio"].keys():
        recent_value = (
            stc.dropna(subset=f"Close_{k}_JPY")[f"Close_{k}_JPY"].values[-1]
            * portfolio["portfolio"][k]["number_of_hold"]
        )
        portfolio["portfolio"][k]["recent_value"] = recent_value
        recent_sum += recent_value

    # calculate recent value percent
    for k in portfolio["portfolio"].keys():
        portfolio["portfolio"][k]["recent_value_percent"] = (
            stc.dropna(subset=f"Close_{k}_JPY")[f"Close_{k}_JPY"].values[-1]
            * portfolio["portfolio"][k]["number_of_hold"]
            / recent_sum
            * 100
        )

    # create datamart for portfolio
    pfdf = pd.concat(
        [
            pd.DataFrame({"ticker": list(portfolio["portfolio"].keys())}),
            pd.DataFrame(list(portfolio["portfolio"].values())),
        ],
        axis=1,
    )

    # convert to int
    pfdf["recent_value"] = pfdf["recent_value"].astype("int")
    pfdf["recent_value_percent"] = pfdf["recent_value_percent"].astype("int")

    # save portfolio datamart
    with fs.open("gcs://invest-analytics/datamart/portfolio.pkl", "wb") as f:
        pfdf.to_pickle(f)

データ可視化

このパートでは、データ変換で作成された 2 つの dataframe を単純に可視化するようにしました。ここではなるべくデータ変換などの処理は行わず、可視化だけに留めるようにしています。

app.py
import streamlit as st
import pandas as pd
import plotly.express as px
import os
from dotenv import load_dotenv
import gcsfs

# from datetime import datetime
load_dotenv(".env")
# load environment variables
PROJECT_NAME = os.environ.get("PROJECT_NAME")
# print(PROJECT_NAME)

if "window_size" not in st.session_state:
    st.session_state.window_size = 365


def read_stock_data_from_gcs():
    """read CSV file from cloud storage and return dataframe"""
    # Instantiates a client
    fs = gcsfs.GCSFileSystem(project=PROJECT_NAME)
    stocks, pf = pd.DataFrame(), pd.DataFrame()
    with fs.open("invest-analytics/datamart/stocks.pkl", "rb") as f:
        stocks = pd.read_pickle(f)
    with fs.open("invest-analytics/datamart/portfolio.pkl", "rb") as f:
        pf = pd.read_pickle(f)
    return stocks, pf


def read_stock_data_from_local():
    stocks = pd.read_pickle("data/stocks.pkl")
    pf = pd.read_pickle("data/portfolio.pkl")
    return stocks, pf


def plot_stock_data(df: pd.DataFrame, window: int, x: str, y: str, title: str) -> None:
    """plot line chart"""
    # drop nan values
    df = df.dropna(subset=y)
    df = df.tail(window)
    fig = px.line(df, x=x, y=y)
    fig.update_xaxes(showgrid=False, zeroline=False)
    fig.update_yaxes(showgrid=False, zeroline=False)
    st.plotly_chart(fig, use_container_width=True)


def layout_plots() -> None:
    """manipulate plots layout"""
    col1, col2 = st.columns(2)

    # read stocks and portfolio
    stocks, pf = read_stock_data_from_gcs()

    # plot portfolio values by time series
    # with col1:
    col0, col1, col2, col3, col4 = st.columns([6, 1, 1, 1, 1])
    with col0:
        st.write("Portfolio Overall Performance")
    with col1:
        if st.button("3Year", key="portfolio"):
            st.session_state.window_size = 1080
    with col2:
        if st.button("Year", key="portfolio"):
            st.session_state.window_size = 360
    with col3:
        if st.button("Quarter", key="portfolio"):
            st.session_state.window_size = 90
    with col4:
        if st.button("Month", key="portfolio"):
            st.session_state.window_size = 30
    plot_stock_data(
        df=stocks,
        window=st.session_state.window_size,
        x="Date",
        y="pf",
        title="Portfolio Transition",
    )

    # plot portfolio share by pie chart
    col1, col2 = st.columns([1, 1])
    with col1:
        fig = px.sunburst(
            pf,
            path=["type", "ticker"],
            values="recent_value",
            title="Portfolio Balance",
        )
        fig.update_xaxes(showgrid=False, zeroline=False)
        fig.update_yaxes(showgrid=False, zeroline=False)
        st.plotly_chart(fig, use_container_width=True)
    with col2:
        st.table(pf.loc[:, ["ticker", "recent_value_percent"]])

    for ticker, detail in zip(pf.ticker.values, pf.detail.values):
        col0, col1, col2, col3, col4 = st.columns([6, 1, 1, 1, 1])
        with col0:
            st.write(f"{ticker}: {detail}")
        with col1:
            if st.button("3Year", key=f"{ticker}"):
                st.session_state.window_size = 1080
        with col2:
            if st.button("Year", key=f"{ticker}"):
                st.session_state.window_size = 360
        with col3:
            if st.button("Quarter", key=f"{ticker}"):
                st.session_state.window_size = 90
        with col4:
            if st.button("Month", key=f"{ticker}"):
                st.session_state.window_size = 30
        plot_stock_data(
            df=stocks,
            window=st.session_state.window_size,
            x="Date",
            y=f"Close_{ticker}_JPY",
            title=f"{ticker}: {detail}",
        )


if __name__ == "__main__":
    # general layout settings
    st.set_page_config(layout="wide")
    st.title("Invest Management Dashboard")

    # layout plots
    layout_plots()

運用について

運用コストは、とにかく最小限に抑えたかったので、基本的には何もしなくて良くなるように構成しました。定期的(現在は週 1 回)に Cloud Scheduler から Pub/Sub 経由で Cloud Functions をキックして、データ取得・変換が自動で走るようにしています。Cloud Run は GCS のデータを参照しているので、データが更新されればそれを表示してくれます。あと定期的な運用で、ポートフォリオの各銘柄の所持数を更新しないといけないのですが、これは YAML を手動更新後 Git Push→Cloud Build が GCS にデータ配置という流れにしています。本当は証券会社からデータ取得したいのですが、API などはなくスクレイピングが必要そうだったので諦めました。。

以下、Cloud Build の設定を置いておきます。やっていることは主に以下の 3 つです。

  • ポートフォリオ設定の YAML を GCS にコピー
  • Cloud Functions デプロイ
  • Cloud Run デプロイ
cloudbuild.yaml
steps:
  # copy configs to GCS
  - name: 'gcr.io/cloud-builders/gsutil'
    args: ['cp', '-r', './invest-analytics-config', 'gs://invest-analytics/']
  # deploy cloud functions
  - name: 'gcr.io/cloud-builders/gcloud'
    args:
      [
        'functions',
        'deploy',
        'Retrieve_And_ETL',
        '--entry-point=retrieve_and_etl',
        '--region=asia-northeast1',
        '--runtime=python39',
        '--memory=256MB',
        '--env-vars-file=invest-analytics-function/env.yaml',
        '--source=invest-analytics-function',
        '--trigger-topic=weekly-trigger',
      ]
  # build and deploy cloud run
  - name: 'gcr.io/cloud-builders/docker'
    args:
      [
        'build',
        '-t',
        'gcr.io/$PROJECT_ID/ui:latest',
        '-f',
        'invest-analytics-ui/Dockerfile',
        './invest-analytics-ui/',
      ]
  - name: 'gcr.io/cloud-builders/docker'
    args: ['push', 'gcr.io/$PROJECT_ID/ui:latest']
  - name: 'gcr.io/cloud-builders/gcloud'
    args:
      [
        'run',
        'deploy',
        'ui',
        '--image',
        'gcr.io/$PROJECT_ID/ui:latest',
        '--region=asia-northeast1',
        '--no-allow-unauthenticated',
      ]
options:
  logging: CLOUD_LOGGING_ONLY
参考:フォルダ構成(ソースコード)

運用を楽にするため、モノレポにしています。

    invest-analytics              # working directory
    ├─ invest-analytics-config    # configs to copy to GCS
    │   ├─ portfolio.yaml         #
    │   └─ retrieve_target.yaml   #
    │
    ├─ invest-analytics-function  # cloud function app
    │   ├─ env.yaml               #
    │   ├─ main.py                #
    │   └─ requirements.txt       #
    │
    ├─ invest-analytics-ui        # cloud run app
    │   ├─ src                    #
    │   │   └─ app.py             #
    │   │
    │   ├─ .env                   #
    │   ├─ Dockerfile             #
    │   └─ requirements.txt       #
    │
    └─ cloudbuild.yaml            # cloud build settings
参考:フォルダ構成(Google Cloud Storage)
    invest-analytics             #
    ├─ datamart                  #
    │   ├─ pfdf.pkl              #
    │   └─ stocks.pkl            #
    │
    ├─ invest-analytics-config   #
    │   ├─ portfolio.yaml        #
    │   └─ retrieve_target.yaml  #
    │
    └─ rawdata                   #
        ├─ 1504.JP.csv           #
        └─ VOO.csv               #
        └─ ...                   #

所感

Cloud Run がとにかく使いやすいです!アプリの更新もデプロイすればシームレスにやってくれるし、起動も十分早いです。IAP もポチポチ設定するだけで認証を付けられるのは本当に楽でした。Streamlit は、フロントエンドを自分で作るのに比べると不自由さはあるけど、開発速度が全然早いのでまあこれで良いかなという感じでした。Cloud Build は初見だったこともあり、なかなかうまく設定できず苦戦しましたが、できてしまえば全体の運用コストが下がって大変よいです。Cloud Functions はだいぶ昔にリリースされたプロダクトなこともあり、ちょっと古いな〜という印象を受けました。今だと Cloud Run Jobs が出てきたこともあり、今後はこちらを採用したほうが幸せになれそうな気がします(2022/5 現在まだ Preview 版ですが)。総じて Google Cloud だけで認証付きアプリ+運用まで割と簡単にできてしまうので、個人でクイックに作るのにはおすすめできる構成ではないかと思います。

今後の改善プラン

  • データ変換を BigQuery+dbt でやってみたい
    → データ管理をちゃんとやりたい、簡単にデータ品質チェックできるようにしたい
  • なんらかの予測を VertexAI でやってみたい
    → 今回は予測をうまく使いこなすアイデアがわかなかったのでスルーしてしまった
    データはあるので、うまく投資のサポートになるような予測ができるようにしたい
    (あと技術的に面白そうなので VertexAI で遊んでみたい)

参考にさせていただいた記事

IAP を設定する方法。この記事の通りに設定すればいけたので大変助かりました
https://zenn.dev/ww24/articles/19099c85febe0d

Int64 に変換する方法。NaN ありだけど int にしたいときに参考にしました
https://stackoverflow.com/questions/62899860/how-can-i-resolve-typeerror-cannot-safely-cast-non-equivalent-float64-to-int6

Cloud Scheduler と Pub/Sub の設定方法
https://dev.classmethod.jp/articles/try-cloud-functions-scheduler-pubsub/

Cloud Build の設定に関して。同じ箇所でハマったので助かりました
https://qiita.com/ricemountainer/items/173bafedfd44eefa1ac9

Cloud Build の設定の書き方(公式)
https://cloud.google.com/build/docs/configuring-builds/configure-build-step-order?hl=ja

Cloud Functions の deploy コマンドのオプション(公式)
https://cloud.google.com/sdk/gcloud/reference/functions/deploy

Discussion