💨

朝から株野郎?計画3

に公開

前回の記事

https://zenn.dev/rucco/articles/770c27f5c1e3fc

東証プライムの企業一覧を取得

情報はここにある。
https://www.jpx.co.jp/markets/statistics-equities/misc/01.html

ということらしいので、毎月K3の12時ごろデータを取得するようにすればいいのね。
把握!!!

企業リストテーブル作成

そんなわけで、企業リストテーブルを作っていきます。
init_db.pyに以下を追加していきます!

def init_db():
    # データベースに接続(なければ作成)
    conn = sqlite3.connect('stocks.db')
    cursor = conn.cursor()

+    # 企業リストテーブルの作成
+    cursor.execute('''
+        CREATE TABLE IF NOT EXISTS company_list (
+            symbol TEXT PRIMARY KEY,
+            name TEXT NOT NULL,
+            market TEXT NOT NULL,
+            sector TEXT
+        )
+    ''')

    # 企業情報テーブルの作成
    cursor.execute('''
#省略

既存のstocks.dbを削除してから、init_dbをF5で実行。
できたー。

作成したDBに東証上場銘柄一覧を登録

import sqlite3
import pandas as pd
import requests
from io import BytesIO

def update_company_list():
    # 企業リストのURL
    url = "https://www.jpx.co.jp/markets/statistics-equities/misc/tvdivq0000001vg2-att/data_j.xls"  # JPXの企業リストURL

    # URLからExcelをダウンロード
    try:
        response = requests.get(url)
        response.raise_for_status()  # HTTPエラーをチェック
        with open('data_j.xls', 'wb') as file:
            file.write(response.content)
    except requests.RequestException as e:
        print(f"企業リストのダウンロードに失敗しました: {e}")
        return

    # ExcelデータをDataFrameに読み込む
    try:
        df = pd.read_excel(BytesIO(response.content), header=0)  # ヘッダー行を指定
    except Exception as e:
        print(f"Excelデータの読み込みに失敗しました: {e}")
        return

    # 東証プライムだけにフィルタリング
    df_prime = df[df["市場・商品区分"] == "プライム(内国株式)"]

    # データベースに接続
    conn = sqlite3.connect('stocks.db')
    cursor = conn.cursor()

    # 企業リストをデータベースに挿入または更新
    for index, row in df_prime.iterrows():
        company_info = {
            "symbol": row['コード'],
            "name": row['銘柄名'],
            "market": row['市場・商品区分'],
            "sector": row.get('33業種区分', None)  # sectorがない場合はNone
        }
        
        columns = ["symbol", "name", "market", "sector"]
        insert_columns = ', '.join(columns)
        placeholders = ', '.join([f":{col}" for col in columns])
        update_columns = ', '.join([f"{col} = excluded.{col}" for col in columns[1:]])  # 最初の1つはキーなので除外
        # SQL文の作成
        sql = f'''
            INSERT INTO company_list ({insert_columns})
            VALUES ({placeholders})
            ON CONFLICT(symbol) DO UPDATE SET
                {update_columns}
        '''
        # SQL文を実行
        cursor.execute(sql, company_info)

    # 変更を保存して接続を閉じる
    conn.commit()
    conn.close()
    print("企業リストを更新しました。")

if __name__ == "__main__":
    update_company_list()

F5で実行して、stocks.dbのcompany_listテーブルを確認。
データが登録されてる🤗

ここで問題が・・・・

昨日記事にしたupdate_company_info(symbol)とupdate_stock_prices(symbol)に
問題があることがわかりました。
この2つの関数は銘柄を1つずつ指定しながらデータを取得する方法。
東証プライムの企業全部となると1600件以上のリクエストを送ることになるので、
リクエスト送りすぎになってしまうのです。
株価の方は複数銘柄のデータを一括取得できる別の方法があるようなので、修正していきます。

update_stock_prices関数を修正

今までは引数で受け取った1件の企業に対して、yFinanceへリクエストを送り、
受け取ったデータをDBに登録みたいな感じでした。
1件ずつリクエストではなく、一括DLするので、引数はリストに変更します。
リストに格納された企業のデータをyfinanceから一括DL。
DLしたデータを整形してDBにドンと登録という流れにします。

import sqlite3
import yfinance as yf
from datetime import datetime, timedelta
import pandas as pd

def update_strock_prices(symbols:list = None):
    # symbolsが空の場合は終了
    if not symbols:
        print("企業リストが空です。")
        return
    
    # データフレームを使って株価データを一括で取得
    print(f"取得する銘柄数: {len(symbols)}")
    df = yf.download(symbols, period="1d", group_by='ticker')
    
    print(df.head())  # デバッグ用にデータフレームの先頭を表示

    # データフレームが空の場合は終了
    if df.empty:
        print("株価データが取得できませんでした。")
        return
    print("株価データの取得に成功しました。")

    # 一括登録用のリストを作成
    stock_prices = []

    # データ格納用リストに各銘柄のデータを追加
    for symbol in symbols:
        try:
            price_data = df[symbol].iloc[0]  # 最新の株価データを取得
            date_str = price_data.name.strftime("%Y-%m-%d") # 日付を文字列に変換
            # リストに辞書を追加
            stock_prices.append({
                "symbol": symbol,
                "date": date_str,
                "open": price_data['Open'],
                "high": price_data['High'],
                "low": price_data['Low'],
                "close": price_data['Close'],
                "volume": int(price_data['Volume']) if not pd.isna(price_data['Volume']) else None
            })
        except Exception as e:
            print(f"株価データの取得に失敗しました: {symbol} - {e}")
    
    # データベースに接続
    conn = sqlite3.connect('stocks.db')
    cursor = conn.cursor()

    # 一括挿入
    cursor.executemany('''
        INSERT INTO stock_prices (symbol, date, open, high, low, close, volume)
        VALUES (:symbol, :date, :open, :high, :low, :close, :volume)
        ON CONFLICT(symbol, date) DO UPDATE SET 
            open = excluded.open,
            high = excluded.high,
            low = excluded.low,
            close = excluded.close,
            volume = excluded.volume
    ''', stock_prices)

    # 変更を保存して接続を閉じる
    conn.commit()
    conn.close()
    print(f"{len(stock_prices)}件の株価データを更新しました")

if __name__ == "__main__":
    update_strock_prices()

企業一覧から株価と企業情報を更新する関数を作成

新しいファイルupdate_all.pyを作成して、このファイルを実行することで
株価取得と企業一覧を更新します。

まずは、stocks.dbにあるcompany_list(企業リスト)から、銘柄コード(symbol)を
格納するリストを作り、株価取得の関数update_stock_pricesを実行します。

def update_all():
import sqlite3
from update_stock_prices import update_strock_prices

    # データベースに接続
    conn = sqlite3.connect('stocks.db')
    cursor = conn.cursor()

    # 企業リストを取得
    cursor.execute("SELECT symbol FROM company_list")
    symbols = [row[0] + ".T" for row in cursor.fetchall()]

    # 企業情報の更新が完了したら株価データを更新
    update_strock_prices(symbols)

次に企業情報を更新する処理につなげたいのですが、
企業数は全部で1600以上あります。
1件ずつリクエストするとyfinanceさんから叱られます😱
で。企業情報は毎日更新じゃなくて、週に1回となるよう月曜日は250件みたいに
しようかなと考えました。

def update_all():
    # 省略

    # 企業情報を更新
    # 曜日ごとに更新する企業を分ける
    day_ranges = {
        0: symbols[:250],  # 月曜日は最初の250銘柄
        1: symbols[250:500],  # 火曜日は次の250銘柄
        2: symbols[500:750],  # 水曜日は次の250銘柄
        3: symbols[750:1000],  # 木曜日は次の250銘柄
        4: symbols[1000:1250],  # 金曜日は次の250銘柄
        5: symbols[1250:1500],  # 土曜日は次の250銘柄
        6: symbols[1500:]  # 日曜日は残りの銘柄
    }
    today = datetime.now().weekday()  # 今日の曜日を取得
    symbols = day_ranges.get(today, symbols)  # 今日の曜日に対応する銘柄リストを取得
    print(f"今日の曜日: {today}, 更新する銘柄数: {len(symbols)}")

    for symbol in symbols:
        update_company_info(symbol)
        # 1秒ディレイ
        time.sleep(1)  # APIの制限に引っかからないように1秒待機

    # 接続を閉じる
    conn.close()
    print("全ての企業情報と株価データを更新しました。")

if __name__ == "__main__":
    update_all()

ふー。
次は朝6時にこのプログラムを動かすようにします。
自分ちはWindows環境なので、タスクスケジューラを使おうかなと。
そんなわけでまたね👋

Discussion