🐷

【Python×EDINET API】CSVで財務データを自動取得!差分更新で構築する金融分析データベース

に公開

はじめに:財務データ収集は「CSV」でスマートに始めよう

企業の財務データ分析、興味はあるけど「データの入手が面倒…」と感じていませんか?特に、上場企業の有価証券報告書などが集まるEDINETは情報の宝庫ですが、一つ一つ手作業でダウンロードするのは現実的ではありません。

この記事では、PythonEDINET APIを使って、企業の財務データ(CSV形式)を全自動で収集し、継続的に更新し続けるデータ基盤を構築する方法を、ステップ・バイ・stepで徹底解説します。

この記事で実現できること

  • EDINET APIを叩いて、必要な書類の情報を自動で取得
  • 2回目以降は差分のみを更新する効率的な仕組み
  • ダウンロード対象をCSVが提供されている書類に限定
  • 取得したファイルを年/四半期ごとに自動でフォルダ整理
  • 最終的には、自分だけの金融分析用データベースを構築

完成すれば、一度スクリプトを実行するだけで、指定した期間の財務データがローカル(またはGoogle Drive)に蓄積されていく環境が手に入ります。

なぜ「CSV」で始めるのか?

EDINETでは、同じデータがXBRLやPDF形式でも提供されています。XBRLは機械可読性に優れた素晴らしい形式ですが、解析には専門のライブラリが必要で、少し敷居が高いのも事実です。

一方、CSV形式であれば、データ分析の定番ライブラリである**pandas**を使って、誰でも手軽にデータを読み込み、分析をスタートできます。

import pandas as pd

# たった1行でデータフレームに!
df = pd.read_csv("取得した財務データ.csv")
print(df.head())

この記事は、「まずは手軽に財務データ分析の世界に足を踏み入れたい」という方に最適な入門編となっています。

完成するシステムの全体像

これから構築するシステムの処理フローは以下の通りです。

  1. 【メタデータ取得】 EDINET APIにアクセスし、指定期間の全書類リストを取得・更新する。
  2. 【差分管理】 既存のサマリーファイルと比較し、新しく公開された書類だけを特定する。
  3. 【フィルタリング】 新規書類の中から、「CSVが提供されている」かつ「未ダウンロード」の書類だけを絞り込む。
  4. 【ダウンロード&整理】 対象書類のZIPファイルをダウンロードし、提出年月日に基づいて 年/四半期 フォルダに自動で仕分ける。

この「差分更新」の仕組みが、本システムの最大のキモです。毎日実行しても、APIへの負荷を最小限に抑え、効率的に最新のデータを収集し続けることができます。

それでは早速、環境構築から始めていきましょう!

システム構築の準備をしよう

データ収集を始める前に、まずは開発環境を整えましょう。今回は、無料で利用でき、環境構築も簡単なGoogle ColaboratoryGoogle Driveを連携させる方法を前提に進めます。

1. 必要なライブラリ

基本的にはPythonの標準ライブラリと、以下の主要なライブラリを使用します。Google Colaboratoryにはプリインストールされているものがほとんどですが、もしローカル環境で実行する場合はpipでインストールしてください。

  • requests: EDINET APIにHTTPリクエストを送るために使用します。
  • pandas: 取得した書類リスト(メタデータ)を効率的に管理・操作するために使用します。
  • tqdm: 処理の進捗状況をプログレスバーで分かりやすく表示するために使用します。

2. EDINET APIキーの取得

EDINET APIを利用するには、APIキーが必要です。以下の公式サイトから無料で取得できますので、事前に準備しておきましょう。

フォームに必要事項を記入して申請すると、登録したメールアドレスにAPIキーが送られてきます。

⚠️ APIキーの取り扱いには十分注意してください。
ソースコード内に直接書き込むと、GitHubなどで公開した際に第三者に漏洩する危険があります。Google Colaboratoryのシークレット機能を使って、安全に管理することを強く推奨します。

# Google Colaboratoryのシークレット機能からAPIキーを読み込む
from google.colab import userdata

# 'EDINET_API_KEY'という名前でシークレットを登録した場合
API_KEY = userdata.get('EDINET_API_KEY')

3. Google Driveのマウントとフォルダ構成

収集したデータはGoogle Driveに保存すると便利です。以下のコードを実行し、ご自身のGoogle DriveをColaboratoryにマウント(接続)します。

from google.colab import drive
drive.mount('/content/drive')

実行すると認証を求められるので、画面の指示に従って許可してください。

接続が完了したら、データを保存するためのフォルダを作成しておきましょう。今回は以下のような構成を想定します。

/content/drive/MyDrive/
└── Colab Notebooks/
    └── 02_EDINET_DB/
        ├── 01_zip_files/  # ← ダウンロードしたZIPファイルの保存先
        └── EDINET_Summary_v3.csv  # ← 取得した全書類のリスト(自動生成される)

このパスは後ほどコード内で設定します。ご自身の環境に合わせて適宜変更してください。

これで準備は完了です。次章から、いよいよPythonコードの実装に入ります!

実装①:設定管理とメタデータの差分更新

ここからは、システムの心臓部となるPythonコードを実装していきます。まずは、メンテナンス性を高めるための設定管理と、APIから書類リスト(メタデータ)を取得し、差分を管理する関数を作成します。

Part 1: 全体設計と設定管理 (Configクラス)

スクリプトが大きくなってくると、APIキーや保存先フォルダのパスなどがコードのあちこちに散らばってしまいがちです。これを避けるため、設定値をまとめて管理するConfigクラスを最初に定義しましょう。

import os
import time
import requests
import pandas as pd
from datetime import date, timedelta, datetime
from pathlib import Path
from tqdm.notebook import tqdm
import warnings
from google.colab import userdata

# urllib3のInsecureRequestWarningを非表示にする
warnings.filterwarnings('ignore', category=requests.packages.urllib3.exceptions.InsecureRequestWarning)

# --- 設定項目 ---
class Config:
    """設定を管理するクラス"""
    # Google Driveの基本パス
    BASE_DIR = Path("/content/drive/MyDrive/Colab Notebooks/ALL_NoteBooks/01_株価/02_EDINET_DB/")
    # ダウンロードしたZIPファイルの保存先
    SAVE_FOLDER = BASE_DIR / "01_zip_files/"

    # ColabのシークレットからAPIキーを読み込む
    API_KEY = userdata.get('EDINET_API_KEY')

    # データの信頼性を担保するため、何日分遡ってデータを再取得するか
    RELIABILITY_DAYS = 7
    # 初回実行時に何年分のデータを取得するか
    INITIAL_FETCH_YEARS = 5
    
    # ダウンロード対象の書類タイプコード
    # 120: 有価証券報告書, 140: 四半期報告書, 160: 半期報告書
    TARGET_DOC_TYPE_CODES = ['120', '140', '160']

このように設定をクラスにまとめておくことで、後からパスや取得期間を変更したくなった場合も、このクラスを修正するだけで済み、メンテナンスが非常に楽になります。

Part 2: 書類のメタデータを賢く更新する (update_summary_file関数)

次に、EDINET APIから日次の書類一覧を取得し、CSVファイル(サマリーファイル)に保存・更新する関数を作成します。この関数が、本システムの最も重要な「差分更新」ロジックを担います。

処理の流れ:

  1. サマリーファイル (EDINET_Summary_v3.csv) が既に存在するかチェック。
  2. 存在すれば、そのファイルから最後に取得した日付を読み取る。
  3. APIへの問い合わせ期間を「最後に取得した日付 - RELIABILITY_DAYS」から今日までに設定する。(これにより、データの取りこぼしを防ぎつつ、不要なAPIリクエストを削減)
  4. ファイルが存在しない(初回実行の)場合は、INITIAL_FETCH_YEARSで設定した年数分のデータを取得する。
  5. 取得した新しいデータを既存のサマリーファイルに追記し、重複を除去して保存する。
def update_summary_file(base_dir: Path, api_key: str) -> pd.DataFrame:
    """EDINETから日次の書類一覧を取得し、サマリーファイルを更新する。"""
    summary_path = base_dir / "EDINET_Summary_v3.csv"
    print(f"サマリーファイル '{summary_path.name}' の状態を確認・更新します...")

    today = date.today()
    summary = pd.DataFrame()
    # 初回実行時のデータ取得開始日を設定
    start_day = today - timedelta(days=365 * Config.INITIAL_FETCH_YEARS)

    # サマリーファイルが存在する場合の処理
    if summary_path.exists():
        try:
            dtype_map = {'secCode': str, 'docTypeCode': str, 'xbrlFlag': str, 'csvFlag': str}
            summary = pd.read_csv(summary_path, encoding='utf_8_sig', dtype=dtype_map)
            summary['submitDateTime'] = pd.to_datetime(summary['submitDateTime'], errors='coerce')

            # ファイル内の最新日付を取得し、データ取得開始日を更新
            if not summary.empty and 'submitDateTime' in summary.columns and not summary['submitDateTime'].isnull().all():
                latest_date_in_file = summary['submitDateTime'].max().date()
                # 訂正報告などに対応するため、信頼性期間として数日遡って取得
                start_day = latest_date_in_file - timedelta(days=Config.RELIABILITY_DAYS)
        except Exception as e:
            print(f"サマリーファイルの読み込み中にエラーが発生しました: {e}")
            pass

    end_day = today
    day_term = [start_day + timedelta(days=i) for i in range((end_day - start_day).days + 1)]

    new_docs = []
    # 1日ずつAPIを叩いて書類リストを取得
    for day in tqdm(day_term, desc="APIからメタデータ取得"):
        params = {'date': day, 'type': 2, 'Subscription-Key': api_key}
        try:
            response = requests.get('[https://disclosure.edinet-fsa.go.jp/api/v2/documents.json](https://disclosure.edinet-fsa.go.jp/api/v2/documents.json)', params=params, verify=False, timeout=30)
            response.raise_for_status() # HTTPエラーがあれば例外を発生
            res_json = response.json()
            if res_json.get('results'):
                new_docs.extend(res_json['results'])
        except requests.exceptions.RequestException as e:
            tqdm.write(f"エラー: {day} のデータ取得に失敗 - {e}")
        time.sleep(0.1) # APIへの負荷を考慮

    # 新しく取得したデータがあれば、既存のデータと結合
    if new_docs:
        temp_df = pd.DataFrame(new_docs)
        summary = pd.concat([summary, temp_df], ignore_index=True)

    # 重複を排除し、日付でソートして保存
    summary['submitDateTime'] = pd.to_datetime(summary['submitDateTime'], errors='coerce')
    summary.dropna(subset=['docID'], inplace=True)
    summary = summary.drop_duplicates(subset='docID', keep='last')
    summary = summary.sort_values(by='submitDateTime', ascending=True).reset_index(drop=True)

    summary.to_csv(summary_path, index=False, encoding='utf_8_sig')
    print("\\n✅ サマリーファイルの更新が完了しました!")
    return summary

def step1_create_and_summarize():
    """ステップ①: サマリーを作成し、その概要を出力する。"""
    print("--- ステップ① サマリー作成と概要の表示 ---")
    summary_df = update_summary_file(Config.BASE_DIR, Config.API_KEY)

    if summary_df.empty:
        print("⚠️ サマリーデータの作成に失敗したか、データがありませんでした。")
        return pd.DataFrame()

    print(f"  - データ期間: {summary_df['submitDateTime'].min():%Y-%m-%d}{summary_df['submitDateTime'].max():%Y-%m-%d}")
    print(f"  - 総データ数: {len(summary_df)} 件")
    print(f"  - 銘柄数(ユニーク): {summary_df['secCode'].nunique()} 社")
    print("-" * 40 + "\\n")
    return summary_df

この関数を実行するだけで、ローカル(Google Drive)に最新の書類リストが蓄積されていきます。
次は、このリストを元に、どのファイルをダウンロードすべきかを判断するロジックを実装します。

実装②:ダウンロード対象の絞り込みと状況確認

全書類のメタデータが詰まったサマリーファイルが手に入りました。次はこの中から、本当にダウンロードが必要なファイルだけを賢く絞り込む処理を実装します。

Part 3: ダウンロード状況を確認する (step2_check_download_status関数)

このステップの目的は、以下の2つの情報を照合して、**「これからダウンロードすべきファイルのリスト」**を作成することです。

  1. サマリーファイル: これから収集したい全書類のリスト
  2. ダウンロードフォルダ: 既にダウンロード済みのファイル

処理の流れ:

  1. ダウンロード先のフォルダに存在する全*.zipファイルのリストを取得する。
  2. サマリーファイル(summary_df)から、ダウンロードしたい書類の条件でフィルタリングする。←ここが最重要ポイント!
  3. 上記2つを比較し、「ダウンロードしたいが、まだダウンロードされていない」書類のリストを作成して返す。
def step2_check_download_status(summary_df: pd.DataFrame):
    """
    ステップ②: ダウンロードフォルダの状況を確認し、未ダウンロードの件数などを出力する。
    """
    print("--- ステップ② ダウンロード状況の確認 ---")
    save_folder = Config.SAVE_FOLDER
    save_folder.mkdir(parents=True, exist_ok=True)

    # サブフォルダ内も含めて、既存の全zipファイルを再帰的に検索
    existing_files_path = list(save_folder.rglob('*.zip'))

    print(f"📁 指定フォルダ: {save_folder}")
    if not existing_files_path:
        print("  - 既存のダウンロード済みファイルはありません。")
    else:
        print(f"  - 既存ファイル数: {len(existing_files_path)} 件")

    # --- ▼▼▼ ここが最重要ポイント!▼▼▼ ---
    # ダウンロード対象の条件を定義
    # - csvFlagが'1'である(CSVファイルが提供されている)
    # - secCode(証券コード)が存在する
    # - docTypeCodeが対象(有報、四半期、半期)である
    query_str = (
        "csvFlag == '1' and "  # ← XBRLからCSVに変更!
        "secCode.notna() and secCode != 'None' and "
        f"docTypeCode in {Config.TARGET_DOC_TYPE_CODES}"
    )
    target_docs = summary_df.query(query_str)
    # --- ▲▲▲ ここが最重要ポイント!▲▲▲ ---

    # 既存ファイル名(docID)のセットを作成
    existing_file_stems = {f.stem for f in existing_files_path}
    
    # ダウンロード対象のうち、まだダウンロードされていないものを抽出
    docs_to_download = target_docs[~target_docs['docID'].isin(existing_file_stems)]

    print(f"\\n📊 サマリーと照合した結果:")
    print(f"  - ダウンロード対象の総書類数(CSV提供あり): {len(target_docs)} 件")
    print(f"  - ダウンロードが必要な(未取得の)書類数: {len(docs_to_download)} 件")
    print("-" * 40 + "\\n")

    return docs_to_download

【Point】pandas.queryで条件を自在に操る
このコードの柔軟性の核となっているのがsummary_df.query(query_str)の部分です。

csvFlag == '1': 今回の目的である「CSVファイルが提供されている書類」を絞り込むための条件です。もしXBRLが欲しければ、ここをxbrlFlag == '1'に変えるだけで対応できます。

secCode.notna(): 証券コードがない(上場企業でない)提出物を除外しています。

docTypeCode in [...]: Configクラスで定義した書類種別(有価証券報告書など)のみを対象にしています。

このように、queryメソッドを使うことで、SQLのように直感的かつ柔軟にデータフレームから必要な情報を抽出できます。分析の目的に合わせてこの条件式をカスタマイズすることで、様々な応用が可能になります。

これで、ダウンロードすべきファイルのリスト(docs_to_download)が完成しました。次のステップでは、いよいよこのリストを使って実際のダウンロード処理を実装します。

実装③:ファイルのダウンロードと自動整理

いよいよ最終段階です。前のステップで作成した「ダウンロードすべきファイルリスト」に基づき、実際にEDINET APIからZIPファイルをダウンロードし、後々の管理がしやすいようにフォルダへ自動で整理・保存する処理を実装します。

Part 4: ダウンロードを実行し、フォルダに整理する (step3_execute_download関数)

この関数の役割はシンプルです。

  1. 「ダウンロードすべきファイルリスト」(docs_to_download)を1行ずつループ処理する。
  2. 各行のdocIDを使って、ファイルダウンロード用のAPIエンドポイントを叩く。
  3. 提出日 (submitDateTime) を基に、保存先のフォルダパス(例:.../2025/Q3/)を動的に決定する。
  4. 決定したパスにZIPファイルを保存する。
def step3_execute_download(docs_to_download: pd.DataFrame):
    """
    ステップ③: 実際にファイルのダウンロードを実行し、年/四半期フォルダに保存する。
    """
    print("--- ステップ③ ダウンロードの実行 ---")
    if docs_to_download.empty:
        print("✅ ダウンロード対象の新しいファイルはありません。処理を完了します。")
        print("-" * 40 + "\\n")
        return

    print(f"{len(docs_to_download)}件のファイルのダウンロードを開始します。")

    for _, row in tqdm(docs_to_download.iterrows(), total=len(docs_to_download), desc="ZIPダウンロード進捗"):
        doc_id = row['docID']
        submit_date = row['submitDateTime']

        # --- ▼▼▼ ファイル整理のロジック ▼▼▼ ---
        # 提出日に基づいて年と四半期のフォルダパスを決定
        if pd.isna(submit_date):
            # 日付がないデータは 'unknown_date' フォルダに保存
            target_folder = Config.SAVE_FOLDER / "unknown_date"
        else:
            year = submit_date.year
            quarter = (submit_date.month - 1) // 3 + 1
            target_folder = Config.SAVE_FOLDER / str(year) / f"Q{quarter}"

        # 保存先フォルダが存在しない場合は作成
        target_folder.mkdir(parents=True, exist_ok=True)
        zip_path = target_folder / f"{doc_id}.zip"
        # --- ▲▲▲ ファイル整理のロジック ▲▲▲ ---

        # EDINET API v2 のファイル取得エンドポイント
        url_zip = f"[https://disclosure.edinet-fsa.go.jp/api/v2/documents/](https://disclosure.edinet-fsa.go.jp/api/v2/documents/){doc_id}"
        # type=5 は「提出書類及び監査報告書(ファイル形式:ZIP)」を指定
        params_zip = {"type": 5, 'Subscription-Key': Config.API_KEY}

        try:
            r = requests.get(url_zip, params=params_zip, stream=True, verify=False, timeout=60)
            r.raise_for_status()
            # チャンク形式で書き込み、メモリ効率を向上
            with open(zip_path, 'wb') as f:
                for chunk in r.iter_content(chunk_size=8192):
                    f.write(chunk)
        except requests.exceptions.RequestException as e:
            tqdm.write(f"ダウンロード失敗: {doc_id}, エラー: {e}")
            # 失敗した場合は中途半端なファイルを削除
            if zip_path.exists():
                zip_path.unlink()
        time.sleep(0.1) # APIへの負荷分散

    print("\\n✅ ダウンロード処理が完了しました。")
    print("-" * 40 + "\\n")

【Point】年/四半期ごとのフォルダ分け

このコードのファイル整理のロジック部分が、数百・数千のファイルを整然と管理するための重要な工夫です。

year = submit_date.year
quarter = (submit_date.month - 1) // 3 + 1 # 月から四半期を計算
target_folder = Config.SAVE_FOLDER / str(year) / f"Q{quarter}"

この数行により、ダウンロードしたファイルが自動的に .../2025/Q1/, .../2025/Q2/ のように整理されます。これにより、後から「2025年第2四半期のデータだけを分析したい」といった場合に、対象ファイルを簡単に見つけ出すことができます。

実行してみよう!
これまで作成した3つのステップを順番に実行するメイン処理を用意しましょう。

# --- メイン実行部 ---
def main():
    """メイン処理を実行する関数"""
    # ステップ①: メタデータのサマリーを作成・更新
    summary_data = step1_create_and_summarize()

    # サマリーが空でなければ次のステップへ
    if not summary_data.empty:
        # ステップ②: ダウンロードが必要なファイルを特定
        files_to_download = step2_check_download_status(summary_data)
        
        # ステップ③: ダウンロードを実行
        step3_execute_download(files_to_download)

    print("全ての処理が完了しました。🎉")

# --- スクリプトとして実行 ---
if __name__ == '__main__':
    main()

このmain()関数を実行すれば、これまで解説してきた全ての処理が連携して動き、あなたのGoogle Driveに必要な財務データが自動で蓄積されていきます。

(発展) 既存の大量ファイルを後から整理する

ここまでの実装で、これからダウンロードするファイルは自動で整理されるようになりました。しかし、もし既に数千・数万のZIPファイルがフォルダ直下に整理されずに存在していたらどうでしょうか?

それらを一度に整理しようとすると、全ファイルパスをメモリに読み込もうとして、環境によってはメモリ不足で処理が止まってしまう可能性があります。

このようなケースに対応するため、ファイルを少しずつ(バッチ処理で)整理するための追加スクリプトを紹介します。これは、ご提供いただいた元のNotebookに含まれていた非常に実践的なテクニックです。

メモリを圧迫しない逐次整理処理

このスクリプトは、フォルダ内のファイルを一度に全てリストアップするのではなく、指定した数(BATCH_SIZE)だけを処理し、終わったら次のバッチへ…という動作を繰り返します。

処理の流れ:

  1. サマリーファイルを読み込み、docIDと提出日の対応表をメモリに保持する。
  2. ダウンロードフォルダをスキャンし、フォルダ直下にあるZIPファイルBATCH_SIZE個だけ取得する。
  3. 取得したファイルのdocIDをキーに、サマリーファイルから提出日を調べる。
  4. 提出日に基づいて正しい移動先フォルダ(年/四半期)を決定し、ファイルを移動させる。
  5. フォルダ直下に整理対象のファイルがなくなるまで、2〜4の処理を繰り返す。
import shutil

class FileOrganizerConfig:
    """ファイル整理用の設定を管理するクラス"""
    SUMMARY_FILE_PATH = Config.BASE_DIR / "EDINET_Summary_v3.csv"
    FILES_ROOT_FOLDER = Config.SAVE_FOLDER
    BATCH_SIZE = 500 # 一度に処理するファイル数

def organize_files_incrementally():
    """
    フォルダ直下のファイルを逐次的にバッチ処理で整理する。
    """
    print("--- 既存ファイルの逐次整理処理を開始します ---")

    # 1. サマリーファイルの読み込み
    summary_path = FileOrganizerConfig.SUMMARY_FILE_PATH
    if not summary_path.exists():
        print(f"❌エラー: サマリーファイルが見つかりません: {summary_path}")
        return

    try:
        # docIDをインデックスにすることで、日付の検索を高速化
        summary_df = pd.read_csv(summary_path, index_col='docID', parse_dates=['submitDateTime'])
    except Exception as e:
        print(f"❌エラー: サマリーファイルの読み込みに失敗しました: {e}")
        return

    # 2. 逐次的なバッチ処理の開始
    root_folder = FileOrganizerConfig.FILES_ROOT_FOLDER
    total_processed_count = 0
    batch_number = 0

    while True:
        batch_number += 1
        print(f"\\n--- バッチ {batch_number}: 整理対象のファイルを探しています... ---")

        # フォルダ直下の.zipファイルをBATCH_SIZE個だけ取得
        batch_files = []
        for item in root_folder.iterdir():
            if item.is_file() and item.suffix.lower() == '.zip':
                batch_files.append(item)
                if len(batch_files) >= FileOrganizerConfig.BATCH_SIZE:
                    break
        
        # 整理対象がなければループを抜ける
        if not batch_files:
            print("✅ フォルダ直下に整理対象のファイルは見つかりませんでした。")
            break

        print(f"📂 {len(batch_files)} 件のファイルを発見。整理処理を開始します。")

        # バッチ内のファイルを1つずつ移動
        for file_path in tqdm(batch_files, desc=f"整理中 (バッチ {batch_number})"):
            doc_id = file_path.stem
            dest_folder = None # 移動先フォルダを初期化

            try:
                submit_date = summary_df.loc[doc_id, 'submitDateTime']
                if pd.isna(submit_date):
                    dest_folder = root_folder / "_unknown_date" # 日付不明
                else:
                    year = submit_date.year
                    quarter = (submit_date.month - 1) // 3 + 1
                    dest_folder = root_folder / str(year) / f"Q{quarter}"
            except KeyError:
                dest_folder = root_folder / "_unclassified" # サマリーに情報なし

            # フォルダを作成してファイルを移動
            dest_folder.mkdir(parents=True, exist_ok=True)
            shutil.move(str(file_path), str(dest_folder))
            total_processed_count += 1
    
    print(f"\\n--- 全ての整理処理が完了しました! 🎉 ---")
    print(f"合計 {total_processed_count} 件のファイルを整理しました。")

# --- 実行 ---
# if __name__ == '__main__':
#     organize_files_incrementally()

このスクリプトは、データ収集基盤を長期的に運用していく上で非常に役立つテクニックです。メインの収集スクリプトとは別に、必要に応じて実行することで、ファイルシステムを常にクリーンな状態に保つことができます。

活用編:CSVデータを取り出して分析の一歩を踏み出す

さて、データ収集基盤が完成し、年/四半期ごとに整理されたZIPファイルが手元に集まりました。ここからは、これらのファイルからCSVデータを抽出し、pandasを使って分析する第一歩を踏み出してみましょう。

ZIPファイルの中身を覗いてみよう

EDINETからダウンロードしたZIPファイルには、複数のファイルが含まれています。CSVデータは、通常 CSV/jpcrp... といったパスに格納されています。Pythonのzipfileライブラリを使えば、これを簡単に扱うことができます。

import zipfile
from pathlib import Path

# 例として、ダウンロードしたZIPファイルのパスを指定
zip_path = Path("/content/drive/MyDrive/Colab Notebooks/ALL_NoteBooks/01_株価/02_EDINET_DB/01_zip_files/2025/Q2/S100XXXX.zip")

# ZIPファイルを開く
with zipfile.ZipFile(zip_path, 'r') as z:
    # ZIP内の全ファイルリストを表示
    print("--- ZIP内のファイル一覧 ---")
    z.printdir()

    # CSVファイルだけをリストアップ
    csv_files = [f for f in z.namelist() if f.lower().endswith('.csv')]
    print("\\n--- CSVファイル ---")
    print(csv_files)

CSVをpandasで読み込んで可視化する

目的のCSVファイル名が分かれば、あとはzipfileとpandasを組み合わせて直接データフレームに読み込むだけです。

ここでは例として、トヨタ自動車(7203)の有価証券報告書から「売上高」や「営業利益」といった主要な財務データを抽出し、簡単なグラフを作成してみましょう。

※実際のCSVファイル名や構造は提出書類によって異なります。ここでは一般的な財務諸表(損益計算書)を想定したサンプルコードを示します。

import pandas as pd
import matplotlib.pyplot as plt
import japanize_matplotlib # 日本語表示のためのライブラリ

def analyze_finance_data_from_zip(zip_path: Path):
    """
    ZIPファイルから財務諸表CSVを読み込み、簡単な分析と可視化を行う。
    """
    target_csv_name = None
    with zipfile.ZipFile(zip_path, 'r') as z:
        # 一般的に主要財務諸表は PublicDoc/jpcrp_cor... のようなパスにあることが多い
        for file_name in z.namelist():
            if 'jpcrp_cor' in file_name and file_name.lower().endswith('.csv'):
                target_csv_name = file_name
                break
    
    if not target_csv_name:
        print("目的の財務諸表CSVが見つかりませんでした。")
        return

    print(f"読み込み対象: {target_csv_name}")

    # ZIPファイル内のCSVを直接pandasで読み込む
    with zipfile.ZipFile(zip_path, 'r') as z:
        with z.open(target_csv_name) as f:
            # Shift-JISでエンコードされていることが多い
            df = pd.read_csv(f, encoding='cp932', header=None)

    # --- データクレンジングと加工(書類の形式によって調整が必要) ---
    # ここでは例として、特定のキーワードで主要項目を抽出する
    df_finance = df[df[2].isin(['売上高', '営業利益', '経常利益', '当期純利益'])]
    
    # 必要な列だけを抽出(4列目が当期、5列目が前期の数値など)
    df_result = df_finance[[2, 4, 5]].copy()
    df_result.columns = ['項目', '当期連結', '前期連結']
    df_result.set_index('項目', inplace=True)
    
    # 数値をfloatに変換
    df_result = df_result.apply(pd.to_numeric, errors='coerce').dropna()
    print("\\n--- 抽出データ ---")
    print(df_result)

    # --- データの可視化 ---
    df_result.T.plot(kind='bar', figsize=(10, 6), grid=True)
    plt.title(f'主要財務データ比較 ({zip_path.stem})')
    plt.ylabel('金額(単位は書類に依存)')
    plt.xticks(rotation=0)
    plt.show()

# 実行例
# analyze_finance_data_from_zip(Path(".../S100XXXX.zip"))

Discussion