👌

【Python】EDINET APIで財務データ自動収集!差分更新で自分だけの最強データベースを作る(後編)

に公開

はじめに:データ収集のその先へ

前編の記事では、PythonとEDINET APIを使って、企業の財務データ(CSV形式)を全自動で収集し、年/四半期ごとに整理して保存するデータ収集基盤を構築しました。

これにより、ローカル(またはGoogle Drive)に最新の財務データが自動で蓄積される環境が手に入りました。しかし、データ分析のたびに数百、数千のZIPファイルやCSVファイルを個別に読み込むのは、依然として非効率です。

そこで後編となるこの記事では、収集した大量のZIPファイルをSQLiteデータベースに取り込み、高速に検索・集計できる本格的な分析基盤へと進化させる方法を徹底解説します。

この記事で実現すること

  • 収集した大量のZIPファイルから、財務データを効率的に抽出する。
  • 抽出したデータをSQLiteデータベースに格納する。
  • 並列処理でデータ取り込みを高速化する。
  • 2回目以降は差分のみを更新する、賢いデータベース維持の仕組みを実装する。
  • 構築したデータベースから、全社横断的なデータ分析を瞬時に行う。

この記事を読み終える頃には、あなたは単なるデータ収集者から、自ら構築したデータ基盤を自在に操るデータ分析者へとステップアップしているはずです。


ステップ1:なぜデータベース化が必要なのか?

フォルダに整理されたファイルも便利ですが、なぜわざわざデータベース(DB)に格納する必要があるのでしょうか?理由は大きく3つあります。

  1. 検索・抽出の高速化:

    • ファイルベース: 「A社の売上高」を知りたい場合、A社のファイルを探し、その中からCSVを読み込み、さらに特定の行を探す必要があります。
    • データベース: SELECT value FROM data WHERE company='A社' AND item='売上高' のような1行のクエリで、数万・数百万のデータの中から瞬時に対象を抽出できます。
  2. 全社横断的な分析が容易に:

    • 「全上場企業の自己資本比率を比較したい」といった分析は、ファイルベースでは非常に困難です。
    • データベースなら、GROUP BY句などを使えば、複雑な集計や企業間比較も簡単に行えます。
  3. データの一元管理と安全性:

    • データがDBに集約されることで、管理が容易になります。
    • トランザクション機能により、データの書き込み中にエラーが発生しても、中途半端な状態でデータが保存されるのを防ぎ、一貫性を保つことができます。

今回は、Pythonに標準で組み込まれており、サーバーも不要で手軽に利用できるSQLiteを選択します。ローカルでの分析基盤としては、まさに最適な選択肢です。


ステップ2:データベースの設計と初期設定

まずは、データを格納する器となるデータベースとテーブルを設計します。

テーブル設計

今回は、データの内容に応じて2つのテーブルを用意します。

  1. reports テーブル:

    • 各レポート(ZIPファイル)のメタデータを格納します。
    • どの企業の、いつの期間の、いつ提出されたレポートなのか、といった情報を管理します。
    • zip_filename主キーとし、各レポートが一意に識別できるようにします。
  2. financial_data テーブル:

    • 各レポートに含まれる具体的な財務データ(売上高、利益など)を一行ずつ格納します。
    • reportsテーブルのzip_filename外部キーとして参照し、どのレポートに由来するデータなのかを紐付けます。

[Image of entity-relationship diagram for reports and financial_data]

このようにテーブルを分ける(正規化する)ことで、データの重複を減らし、管理しやすくしています。

データベースを準備するコード (setup_database)

以下のPythonコードは、指定されたパスにSQLiteデータベースファイルを作成し、上記設計のテーブルと、検索を高速化するためのインデックスを準備します。

import sqlite3
import pandas as pd
import os

DB_PATH = '/content/drive/MyDrive/Colab Notebooks/ALL_NoteBooks/01_株価/02_EDINET_DB/edinet_data_1y.db'

# reportsテーブルのスキーマ定義
REPORTS_TABLE_SCHEMA = """
CREATE TABLE IF NOT EXISTS reports (
    zip_filename TEXT PRIMARY KEY,
    security_code TEXT,
    company_name TEXT,
    period_start_date TEXT,
    period_end_date TEXT,
    filing_date TEXT
);
"""

# financial_dataテーブルのスキーマ定義
FINANCIAL_DATA_TABLE_SCHEMA = """
CREATE TABLE IF NOT EXISTS financial_data (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    report_zip_filename TEXT,
    element_id TEXT,
    item_name TEXT,
    context_id TEXT,
    relative_year TEXT,
    consolidated_individual TEXT,
    period_instant TEXT,
    unit_id TEXT,
    unit TEXT,
    value TEXT,
    FOREIGN KEY (report_zip_filename) REFERENCES reports (zip_filename) ON DELETE CASCADE
);
"""

def setup_database(db_path: str):
    """データベースとテーブル、インデックスを初期化・設定する"""
    print("🚀 データベースのセットアップを開始します...")
    with sqlite3.connect(db_path) as conn:
        cursor = conn.cursor()
        # 外部キー制約を有効にする
        cursor.execute("PRAGMA foreign_keys = ON;")

        # テーブルの作成
        cursor.execute(REPORTS_TABLE_SCHEMA)
        cursor.execute(FINANCIAL_DATA_TABLE_SCHEMA)

        print("🔍 パフォーマンス向上のためのインデックスを作成・確認中...")
        # financial_dataテーブルのインデックス
        cursor.execute("CREATE INDEX IF NOT EXISTS idx_financial_data_report_zip ON financial_data(report_zip_filename);")
        cursor.execute("CREATE INDEX IF NOT EXISTS idx_financial_data_item_name ON financial_data(item_name);")
        # reportsテーブルのインデックス
        cursor.execute("CREATE INDEX IF NOT EXISTS idx_reports_filing_date ON reports(filing_date);")

        conn.commit()
    print("✅ データベースのセットアップが完了しました。")

# 実行
setup_database(DB_PATH)

インデックスは、いわば本の索引のようなものです。これを作成しておくことで、特定のitem_name(項目名)やfiling_date(提出日)でのデータ検索が劇的に高速になります。

ステップ3:差分更新ロジックの実装

データベースを常に最新の状態に保つためには、前編と同様の「差分更新」の仕組みが不可欠です。

処理の全体像:

  1. 【理想の状態】 EDINETサマリーCSVから、直近1年分など「DBにあるべきレポート」のリストを取得する。
  2. 【現在の状態】 SQLiteのreportsテーブルから、「現在DBに格納されているレポート」のリストを取得する。
  3. 【差分計算】 上記2つを比較し、「追加すべきレポート」と「削除すべき古いレポート」を特定する。
  4. 【実行】 削除処理と追加処理を順に行う。

差分を計算するコード

Pythonの集合(set)型を使うと、この差分計算を非常にスマートに記述できます。

# 1. EDINETサマリーCSVから「あるべき」IDリストを取得
# (get_target_doc_ids関数はNotebook内のコードを参照)
target_ids = get_target_doc_ids(SUMMARY_CSV_PATH)

# 2. データベースから「現在の」IDリストを取得
db_ids = get_existing_doc_ids_from_db(DB_PATH)
print(f"🗃️ 現在データベースには {len(db_ids)} 件のデータが格納されています。")

# 3. 集合演算で差分を計算
ids_to_add = target_ids - db_ids      # targetにしか存在しない -> 追加対象
ids_to_delete = db_ids - target_ids  # dbにしか存在しない -> 削除対象

print(f"📊 差分計算結果: 追加 {len(ids_to_add)}件, 削除 {len(ids_to_delete)}件")

# 4. 実行
delete_records_from_db(DB_PATH, ids_to_delete)
add_new_records_to_db(DB_PATH, ZIP_DIR, ids_to_add)

このロジックにより、スクリプトを毎日実行しても、常にデータベースを最新の状態に保つことができます。

ステップ4:並列処理による高速データインポート

ETLパイプライン構築における最大の難関は、大量のファイル処理にかかる時間です。数千のZIPファイルを1つずつ処理していては、日が暮れてしまいます。

そこで、CPUのマルチコア性能を最大限に引き出す並列処理を導入します。

並列処理の仕組み (concurrent.futures)
Pythonのconcurrent.futures.ProcessPoolExecutorを使うと、複数のプロセスを立ち上げ、それぞれにファイル処理を分担させることができます。これにより、処理時間をコア数分の1近くまで短縮することが可能です。

ZIP解析とDB追加を並列化するコード
以下のadd_new_records_to_db関数が、並列処理の心臓部です。

import concurrent.futures
from tqdm import tqdm

# CPUのコア数を自動で取得
NUM_WORKERS = os.cpu_count()
# 一度にDBに書き込むファイル数
BATCH_SIZE = 500

def process_zip_file(zip_file_path: str):
    """単一のZIPファイルを処理し、メタデータと財務データフレームを返す(Notebook内のコード)"""
    # (...ZIPファイル内のCSVを読み込み、pandasで解析する処理...)
    # 詳細はNotebookの `process_zip_file` 関数を参照
    pass

def add_new_records_to_db(db_path: str, zip_dir: str, doc_ids_to_add: set):
    """新しいファイルのデータを並列処理でデータベースに追加する"""
    if not doc_ids_to_add:
        print("👍 追加対象の新規ファイルはありません。")
        return

    print(f"🚚 {len(doc_ids_to_add)}件の新規ファイルをデータベースに追加します (Workers: {NUM_WORKERS})...")
    files_to_process = [os.path.join(zip_dir, f"{doc_id}.zip") for doc_id in doc_ids_to_add]
    
    conn = sqlite3.connect(db_path)
    
    try:
        # ProcessPoolExecutorで並列処理のプールを作成
        with concurrent.futures.ProcessPoolExecutor(max_workers=NUM_WORKERS) as executor:
            
            # executor.mapで各ファイルにprocess_zip_file関数を適用
            # tqdmで進捗を可視化
            results = list(tqdm(executor.map(process_zip_file, files_to_process), total=len(files_to_process), desc="新規データ追加中"))

            # ( ...結果をまとめてバッチでDBに書き込む処理... )
            # 詳細はNotebookの `add_new_records_to_db` 関数を参照
    
    except Exception as e:
        print(f"\\n❌ データベース書き込み中にエラーが発生しました: {e}")
        conn.rollback()
    finally:
        conn.close()
        print(f"\\n✅ 新規データの追加処理が完了しました。")

この実装では、tqdmライブラリを組み合わせることで、処理の進捗状況をプログレスバーでリアルタイムに確認できるようにしています。大量のデータを扱う上で、このような進捗の可視化は非常に重要です。

ステップ5:構築したDBでデータ分析を加速させる!

ついに、自分だけの金融データベースが完成しました!このDBがどれほど強力か、いくつかの分析例で体感してみましょう。

例1:全銘柄の「項目名」出現頻度ランキング

「他の会社はどんな財務データを報告しているんだろう?」と思ったことはありませんか?以下の関数を使えば、データベース内の全データから、項目名(item_name)ごとの登場社数を集計し、ランキング形式で表示できます。

def get_item_counts_from_last_year(db_path: str):
    """項目名ごとのユニークな銘柄数を取得する"""
    sql_query = """
    SELECT
        d.item_name AS '項目名',
        COUNT(DISTINCT r.security_code) AS '銘柄数'
    FROM
        financial_data AS d
    INNER JOIN
        reports AS r ON d.report_zip_filename = r.zip_filename
    WHERE
        r.filing_date >= DATE('now', '-1 year')
    GROUP BY
        d.item_name
    ORDER BY
        '銘柄数' DESC;
    """
    with sqlite3.connect(db_path) as conn:
        df = pd.read_sql_query(sql_query, conn)
    return df

# 実行
item_counts_df = get_item_counts_from_last_year(DB_PATH)
print(item_counts_df.head(10))

実行結果(例):

                           項目名   銘柄数
0             ファンド名称(英語表記)、DEI  3904
1             ファンド名称(日本語表記)、DEI  3904
2                   ファンドコード、DEI  3904
3                XBRL訂正のフラグ、DEI  3904
4                 EDINETコード、DEI  3904
...

これにより、多くの企業で共通して使われている会計基準の項目や、特定の業種だけで使われる珍しい項目などを発見できます。

例2:特定の項目で全社データを瞬時に取得

「全上場企業の 『大株主の状況』 を一覧で見たい!」
こんな要求にも、データベースなら一瞬で応えられます。

def get_df_by_item_name(item_name_to_find: str, db_path: str):
    """指定された項目名に一致するデータを全社分抽出する"""
    sql_query = """
    SELECT
        r.company_name AS '銘柄名',
        r.security_code AS '銘柄コード',
        d.context_id AS 'コンテキストID',
        d.value AS '値'
    FROM
        financial_data AS d
    INNER JOIN
        reports AS r ON d.report_zip_filename = r.zip_filename
    WHERE
        d.item_name = ?; -- ? はプレースホルダー
    """
    with sqlite3.connect(db_path) as conn:
        df = pd.read_sql_query(sql_query, conn, params=(item_name_to_find,))
    return df

# 実行
target_item = "氏名又は名称、大株主の状況"
result_df = get_df_by_item_name(target_item, DB_PATH)
print(result_df)

実行結果(例):

              銘柄名   銘柄コード                                 コンテキストID                             値
0      オーエスジー株式会社   6136   CurrentYearInstant_No1MajorShareholdersMember   日本マスタートラスト信託銀行株式会社(信託口)
1      オーエスジー株式会社   6136   CurrentYearInstant_No2MajorShareholdersMember         株式会社日本カストディ銀行(信託口)
...             ...    ...                                       ...                            ...
39863   株式会社 イチケン   1847  CurrentYearInstant_No10MajorShareholdersMember                     住友不動産株式会社

個別のファイルを一つも開くことなく、4,000社近い企業の筆頭株主から10位までの株主情報(約4万行)が手に入りました。これこそが、データベース化による分析体験の劇的な向上です。

まとめ:あなただけのデータ分析基盤が完成!

前後編にわたり、EDINETから財務データを自動収集し、高速な分析を可能にするデータベースを構築するまでの一連の流れを解説しました。

  • 前編: EDINET APIと差分更新ロジックで、データ収集を自動化した。
  • 後編: 収集したデータをSQLiteに格納し、並列処理でETLを高速化し、いつでも分析できる状態を整えた。

このパイプラインを定期的に実行することで、あなたの手元には常に最新化された金融データベースが維持されます。ここから先は、あなたのアイデア次第で無限の可能性が広がっています。

  • 可視化: 抽出したデータをMatplotlibやPlotlyでグラフ化し、企業の成長性を分析する。
  • 機械学習: 時系列データとして株価予測モデルの入力特徴量に利用する。
  • Webアプリ化: FlaskやFastAPIでAPIを立て、他のアプリケーションからデータを参照できるようにする。

この記事が、あなたのデータドリブンな投資分析や企業研究の第一歩となれば幸いです。Happy Analyzing🚀

Discussion