Google Cloud Run と Python Streamlit を使ってインデックス投資管理ダッシュボードを作ってみる
Cloud Run with IAP に可能性を感じて作ったアプリについて書いていきます。
やりたいこと
せっかく作るので、多少は使えそうなものを作ろうと思い、普段投資している ETF の管理ができるアプリを作ることにしました。投資自体は長期投資目的で月イチくらいでしかやらないので、ポートフォリオ全体がどんな状態になっているか可視化してみることにしました。以下、要件 + やってみたいことです。
技術的な観点
- Cloud Run と IAP で認証付きアプリを作りたい
- 運用にコストをかけたくないので、なるべく自動化したい
Biz 的な観点
- ぱぱっと使えるインデックス投資管理アプリがほしい
→ 時間をかけたくないので、最小限の操作と、最低限の情報だけ表示したい
(TradingView と iPhone の株価アプリの間くらいのイメージ) - ETF の直近の値動きとか、投資している ETF の割合を可視化したい
→ お買い得かどうかとか、ポートフォリオの割合とかを見て
どの銘柄に投資するか判断する材料にしたい - 株価データは週 1 回くらいは更新したい
→ 毎日見るものではないけど、データが古すぎてもあれなので、
とりあえず週 1 回にしておく - Google 認証をつけて、自分だけがアクセスできるようにしたい
→ 自分の投資状況を全世界に公開するのはちょっと恥ずかしい
アプリ構成
基本的には、データをインターネットから取ってきたら適当にデータ変換を行い、それを可視化アプリから読み出すような仕組みになります。
以下、使ったものです。
データ取得・変換・蓄積
- Cloud Function: データ取得、データ変換のスクリプト実行元
- Cloud Storage: データ蓄積用、データはここから API で直接取る
- pandas-datareader: 株価データを取得するために利用
- gcsfs: Cloud Function からデータを取るために利用。Python ライブラリ
データ可視化
- Cloud Run: 可視化アプリの実行元
- Cloud IAP(Identity-Aware Proxy): 簡単に(ノーコードで)Google 認証をかけれる
- Streamlit: Python だけでまあまあきれいなダッシュボードがつくれる
- Plotly: Python ライブラリの中ではきれいなグラフがつくれる
開発環境
- macOS/WSL2
- VScode
- pyenv+poetry: 開発環境が複数あるので、Python とライブラリのバージョン統一のために利用
- Cloud Build: GitHub と連携して、Push したら自動で Cloud Run / Cloud Functions にデプロイするように設定
機能ごとの簡単な説明とコード
データ取得・変換
ここでは主に、pandas-datareader を使って銘柄ごとにデータを保存、その後 pandas でデータ変換を行っています。取得する銘柄はretrieve_target.yaml
に、ポートフォリオを構成する銘柄はportfolio.yaml
に記載します。datareader では複数のデータソースを使い分ける必要があったので、retrieve_target.yaml
には データソースを記載しています。portfolio.yaml
には銘柄ごとに通貨、説明、所持数、資産タイプを記載しています。
ticker:
1306.JP:
source: 'stooq'
1540.JP:
source: 'stooq'
JPY=X:
source: 'yahoo'
portfolio:
1306.JP:
currency: JPY
detail: NEXT FUNDS TOPIX連動型上場投信
number_of_hold: 1000
type: stock
1540.JP:
currency: JPY
detail: 純金上場信託(現物国内保管型)
number_of_hold: 1500
type: commodity
BND:
currency: USD
detail: VANGUARD TOTAL BOND MARKET ETF
number_of_hold: 100
type: bond
YAML はこちらのスクリプトで読み出します。ここでは、前半でデータ取得&生データ保存、後半でデータマート用のデータ変換をしています。ダッシュボードでは、ポートフォリオ全体の資産推移と、各銘柄・資産タイプが占める割合を表示しようと考えました。そのためデータ変換では、日本円への変換や、所持数を乗算したポートフォリオ全体の価値(日本円換算)計算などを行っています。
表示に使う株データについてですが、データ構造上 NaN を許容しなければならない一方、ダッシュボードでは少数桁が不要(むしろ見ずらいので邪魔)なので、データ型はInt64
にしました。データモデリングについては、今後加筆するかもしれないです。
Cloud Functions で使うスクリプトなので、名前がmain.py
になっているのと、巨大な関数になっているのは気になっているポイントです。。
import pandas_datareader.data as web
from datetime import datetime
import pandas as pd
import numpy as np
import yaml
import os
from dotenv import load_dotenv
import gcsfs
import time
def retrieve_and_etl(event, context):
# load environment variables for local
load_dotenv(".env")
PROJECT_NAME = os.environ.get("PROJECT_NAME")
fs = gcsfs.GCSFileSystem(project=PROJECT_NAME)
# load target tickers config
with fs.open(
"gcs://invest-analytics/invest-analytics-config/retrieve_target.yaml", "rb"
) as file:
targets = yaml.safe_load(file)
# set retrieve date
start = datetime(2017, 1, 1)
end = datetime.today()
# retrieve tickers' CSV
print(f"retrieve from {start.strftime('%Y%m%d')} to {end.strftime('%Y%m%d')}")
for k, v in targets["ticker"].items():
# read ticker and source
ticker = k
source = v["source"]
# retrive data
if source == "stooq":
stock = web.StooqDailyReader(ticker, start=start, end=end).read()
elif source == "yahoo":
stock = web.DataReader(ticker, "yahoo", start=start, end=end)
# data null check
if len(stock) == 0:
raise ValueError("retrieved data is null")
# save data
stock = stock.reset_index(drop=False)
# stock.to_csv(f"../data/{ticker}.csv", index=None)
with fs.open(f"gcs://invest-analytics/rawdata/{ticker}.csv", "wb") as f:
stock.to_csv(f, index=False)
print(f"successfuly retrived: {ticker}")
# be gentle to data source
time.sleep(1)
# ETL
# create date index
stc_dt = pd.DataFrame(columns=["Date"])
for t in targets["ticker"].keys():
with fs.open(f"gcs://invest-analytics/rawdata/{ticker}.csv", "rb") as f:
df = pd.read_csv(f)
stc_dt = pd.concat([stc_dt, df.loc[:, ["Date"]]], axis=0)
stc_dt = stc_dt.drop_duplicates(subset="Date", keep="first")
stc_dt = stc_dt.sort_values("Date")
# create data mart for stocks
stc = stc_dt
for t in targets["ticker"].keys():
with fs.open(f"gcs://invest-analytics/rawdata/{t}.csv", "rb") as f:
df = pd.read_csv(f)
df = df.add_suffix(f"_{t}")
df_unq = f"Date_{t}"
stc = pd.merge(stc, df, how="left", left_on="Date", right_on=df_unq)
# exchange usd to jpy
def usd_to_jpy(x, col):
if np.isnan(x[col]) or np.isnan(x["Close_JPY=X"]):
return np.nan
else:
return x[col] * x["Close_JPY=X"]
# load portfolio config
with fs.open(
"gcs://invest-analytics/invest-analytics-config/portfolio.yaml", "rb"
) as file:
portfolio = yaml.safe_load(file)
for k, v in portfolio["portfolio"].items():
# read ticker and source
ticker = k
config = v
# print(ticker, config)
# calculate USD -> JPY / if original is JPY then rename
if config["currency"] == "USD":
stc[f"Close_{ticker}_JPY"] = stc.apply(
lambda x: usd_to_jpy(x, f"Close_{ticker}"), axis=1
)
elif config["currency"] == "JPY":
stc[f"Close_{ticker}_JPY"] = stc[f"Close_{ticker}"]
else:
raise KeyError(f"Invalid currency: {config['currency']}")
# calculate portfolio value sum
def calc_pf(x):
pf_sum = 0
for t in portfolio["portfolio"].keys():
pf_sum += x[f"Close_{t}_JPY"] * portfolio["portfolio"][t]["number_of_hold"]
return pf_sum
stc["pf"] = stc.apply(lambda x: calc_pf(x), axis=1)
# convert to int
for k in portfolio["portfolio"].keys():
ticker = k
stc[f"Close_{ticker}_JPY"] = np.floor(
pd.to_numeric(stc[f"Close_{ticker}_JPY"], errors="coerce")
).astype("Int64")
stc["pf"] = np.floor(pd.to_numeric(stc["pf"], errors="coerce")).astype("Int64")
# save stocks datamart
with fs.open("gcs://invest-analytics/datamart/stocks.pkl", "wb") as f:
stc.to_pickle(f)
# calculate most recent value
recent_sum = 0
for k in portfolio["portfolio"].keys():
recent_value = (
stc.dropna(subset=f"Close_{k}_JPY")[f"Close_{k}_JPY"].values[-1]
* portfolio["portfolio"][k]["number_of_hold"]
)
portfolio["portfolio"][k]["recent_value"] = recent_value
recent_sum += recent_value
# calculate recent value percent
for k in portfolio["portfolio"].keys():
portfolio["portfolio"][k]["recent_value_percent"] = (
stc.dropna(subset=f"Close_{k}_JPY")[f"Close_{k}_JPY"].values[-1]
* portfolio["portfolio"][k]["number_of_hold"]
/ recent_sum
* 100
)
# create datamart for portfolio
pfdf = pd.concat(
[
pd.DataFrame({"ticker": list(portfolio["portfolio"].keys())}),
pd.DataFrame(list(portfolio["portfolio"].values())),
],
axis=1,
)
# convert to int
pfdf["recent_value"] = pfdf["recent_value"].astype("int")
pfdf["recent_value_percent"] = pfdf["recent_value_percent"].astype("int")
# save portfolio datamart
with fs.open("gcs://invest-analytics/datamart/portfolio.pkl", "wb") as f:
pfdf.to_pickle(f)
データ可視化
このパートでは、データ変換で作成された 2 つの dataframe を単純に可視化するようにしました。ここではなるべくデータ変換などの処理は行わず、可視化だけに留めるようにしています。
import streamlit as st
import pandas as pd
import plotly.express as px
import os
from dotenv import load_dotenv
import gcsfs
# from datetime import datetime
load_dotenv(".env")
# load environment variables
PROJECT_NAME = os.environ.get("PROJECT_NAME")
# print(PROJECT_NAME)
if "window_size" not in st.session_state:
st.session_state.window_size = 365
def read_stock_data_from_gcs():
"""read CSV file from cloud storage and return dataframe"""
# Instantiates a client
fs = gcsfs.GCSFileSystem(project=PROJECT_NAME)
stocks, pf = pd.DataFrame(), pd.DataFrame()
with fs.open("invest-analytics/datamart/stocks.pkl", "rb") as f:
stocks = pd.read_pickle(f)
with fs.open("invest-analytics/datamart/portfolio.pkl", "rb") as f:
pf = pd.read_pickle(f)
return stocks, pf
def read_stock_data_from_local():
stocks = pd.read_pickle("data/stocks.pkl")
pf = pd.read_pickle("data/portfolio.pkl")
return stocks, pf
def plot_stock_data(df: pd.DataFrame, window: int, x: str, y: str, title: str) -> None:
"""plot line chart"""
# drop nan values
df = df.dropna(subset=y)
df = df.tail(window)
fig = px.line(df, x=x, y=y)
fig.update_xaxes(showgrid=False, zeroline=False)
fig.update_yaxes(showgrid=False, zeroline=False)
st.plotly_chart(fig, use_container_width=True)
def layout_plots() -> None:
"""manipulate plots layout"""
col1, col2 = st.columns(2)
# read stocks and portfolio
stocks, pf = read_stock_data_from_gcs()
# plot portfolio values by time series
# with col1:
col0, col1, col2, col3, col4 = st.columns([6, 1, 1, 1, 1])
with col0:
st.write("Portfolio Overall Performance")
with col1:
if st.button("3Year", key="portfolio"):
st.session_state.window_size = 1080
with col2:
if st.button("Year", key="portfolio"):
st.session_state.window_size = 360
with col3:
if st.button("Quarter", key="portfolio"):
st.session_state.window_size = 90
with col4:
if st.button("Month", key="portfolio"):
st.session_state.window_size = 30
plot_stock_data(
df=stocks,
window=st.session_state.window_size,
x="Date",
y="pf",
title="Portfolio Transition",
)
# plot portfolio share by pie chart
col1, col2 = st.columns([1, 1])
with col1:
fig = px.sunburst(
pf,
path=["type", "ticker"],
values="recent_value",
title="Portfolio Balance",
)
fig.update_xaxes(showgrid=False, zeroline=False)
fig.update_yaxes(showgrid=False, zeroline=False)
st.plotly_chart(fig, use_container_width=True)
with col2:
st.table(pf.loc[:, ["ticker", "recent_value_percent"]])
for ticker, detail in zip(pf.ticker.values, pf.detail.values):
col0, col1, col2, col3, col4 = st.columns([6, 1, 1, 1, 1])
with col0:
st.write(f"{ticker}: {detail}")
with col1:
if st.button("3Year", key=f"{ticker}"):
st.session_state.window_size = 1080
with col2:
if st.button("Year", key=f"{ticker}"):
st.session_state.window_size = 360
with col3:
if st.button("Quarter", key=f"{ticker}"):
st.session_state.window_size = 90
with col4:
if st.button("Month", key=f"{ticker}"):
st.session_state.window_size = 30
plot_stock_data(
df=stocks,
window=st.session_state.window_size,
x="Date",
y=f"Close_{ticker}_JPY",
title=f"{ticker}: {detail}",
)
if __name__ == "__main__":
# general layout settings
st.set_page_config(layout="wide")
st.title("Invest Management Dashboard")
# layout plots
layout_plots()
運用について
運用コストは、とにかく最小限に抑えたかったので、基本的には何もしなくて良くなるように構成しました。定期的(現在は週 1 回)に Cloud Scheduler から Pub/Sub 経由で Cloud Functions をキックして、データ取得・変換が自動で走るようにしています。Cloud Run は GCS のデータを参照しているので、データが更新されればそれを表示してくれます。あと定期的な運用で、ポートフォリオの各銘柄の所持数を更新しないといけないのですが、これは YAML を手動更新後 Git Push→Cloud Build が GCS にデータ配置という流れにしています。本当は証券会社からデータ取得したいのですが、API などはなくスクレイピングが必要そうだったので諦めました。。
以下、Cloud Build の設定を置いておきます。やっていることは主に以下の 3 つです。
- ポートフォリオ設定の YAML を GCS にコピー
- Cloud Functions デプロイ
- Cloud Run デプロイ
steps:
# copy configs to GCS
- name: 'gcr.io/cloud-builders/gsutil'
args: ['cp', '-r', './invest-analytics-config', 'gs://invest-analytics/']
# deploy cloud functions
- name: 'gcr.io/cloud-builders/gcloud'
args:
[
'functions',
'deploy',
'Retrieve_And_ETL',
'--entry-point=retrieve_and_etl',
'--region=asia-northeast1',
'--runtime=python39',
'--memory=256MB',
'--env-vars-file=invest-analytics-function/env.yaml',
'--source=invest-analytics-function',
'--trigger-topic=weekly-trigger',
]
# build and deploy cloud run
- name: 'gcr.io/cloud-builders/docker'
args:
[
'build',
'-t',
'gcr.io/$PROJECT_ID/ui:latest',
'-f',
'invest-analytics-ui/Dockerfile',
'./invest-analytics-ui/',
]
- name: 'gcr.io/cloud-builders/docker'
args: ['push', 'gcr.io/$PROJECT_ID/ui:latest']
- name: 'gcr.io/cloud-builders/gcloud'
args:
[
'run',
'deploy',
'ui',
'--image',
'gcr.io/$PROJECT_ID/ui:latest',
'--region=asia-northeast1',
'--no-allow-unauthenticated',
]
options:
logging: CLOUD_LOGGING_ONLY
参考:フォルダ構成(ソースコード)
運用を楽にするため、モノレポにしています。
invest-analytics # working directory
├─ invest-analytics-config # configs to copy to GCS
│ ├─ portfolio.yaml #
│ └─ retrieve_target.yaml #
│
├─ invest-analytics-function # cloud function app
│ ├─ env.yaml #
│ ├─ main.py #
│ └─ requirements.txt #
│
├─ invest-analytics-ui # cloud run app
│ ├─ src #
│ │ └─ app.py #
│ │
│ ├─ .env #
│ ├─ Dockerfile #
│ └─ requirements.txt #
│
└─ cloudbuild.yaml # cloud build settings
参考:フォルダ構成(Google Cloud Storage)
invest-analytics #
├─ datamart #
│ ├─ pfdf.pkl #
│ └─ stocks.pkl #
│
├─ invest-analytics-config #
│ ├─ portfolio.yaml #
│ └─ retrieve_target.yaml #
│
└─ rawdata #
├─ 1504.JP.csv #
└─ VOO.csv #
└─ ... #
所感
Cloud Run がとにかく使いやすいです!アプリの更新もデプロイすればシームレスにやってくれるし、起動も十分早いです。IAP もポチポチ設定するだけで認証を付けられるのは本当に楽でした。Streamlit は、フロントエンドを自分で作るのに比べると不自由さはあるけど、開発速度が全然早いのでまあこれで良いかなという感じでした。Cloud Build は初見だったこともあり、なかなかうまく設定できず苦戦しましたが、できてしまえば全体の運用コストが下がって大変よいです。Cloud Functions はだいぶ昔にリリースされたプロダクトなこともあり、ちょっと古いな〜という印象を受けました。今だと Cloud Run Jobs が出てきたこともあり、今後はこちらを採用したほうが幸せになれそうな気がします(2022/5 現在まだ Preview 版ですが)。総じて Google Cloud だけで認証付きアプリ+運用まで割と簡単にできてしまうので、個人でクイックに作るのにはおすすめできる構成ではないかと思います。
今後の改善プラン
- データ変換を BigQuery+dbt でやってみたい
→ データ管理をちゃんとやりたい、簡単にデータ品質チェックできるようにしたい - なんらかの予測を VertexAI でやってみたい
→ 今回は予測をうまく使いこなすアイデアがわかなかったのでスルーしてしまった
データはあるので、うまく投資のサポートになるような予測ができるようにしたい
(あと技術的に面白そうなので VertexAI で遊んでみたい)
参考にさせていただいた記事
IAP を設定する方法。この記事の通りに設定すればいけたので大変助かりました
Int64 に変換する方法。NaN ありだけど int にしたいときに参考にしました
Cloud Scheduler と Pub/Sub の設定方法
Cloud Build の設定に関して。同じ箇所でハマったので助かりました
Cloud Build の設定の書き方(公式)
Cloud Functions の deploy コマンドのオプション(公式)
Discussion