🏦

StreamlitでEDINETから有価証券報告書をダウンロードして分析するWEBアプリをサクっとつくろう

2024/08/17に公開

Pythonで簡単にWEBアプリケーションが作成できるStreamlitを使ってEDINETから有価証券報告書などをダウンロードしてグラフ化するWEBアプリケーションを作成してみました。

作ったアプリ

仕組みの解説

技術要素

今回のアプリはEDINET APIを主に使用するのですが以下の組み合わせでアプリを作成します

取得対象 取得方法 解説
EDINETコードリスト(会社リスト) APIなし 直接ダウンロード ※1 中身はEdinetcodeDlInfo.csv
書類一覧 書類一覧API ※2 会社指定が不可能なので日付を指定してダウンロード
有価証券報告書等 書類取得API ※2 書類一覧APIで取得したdoc_idが必要

参考文献

①会社名の選択

まず会社一覧及び、会社のEDINETコードが必要になってきます。
これについてはAPIで取得する方法はなく公式サイトからZIPを落としてくるか
ここからプログラム的に自動でダウンロードする必要があります。
今回は手動であらかじめダウンロードしたものを使います。
公式サイトからダウンロードすると毎回リンクが変わる、上記の直接リンクだと固定という謎仕様のようです(ドキュメントにもそうかいてある)
ZIPを展開するとShift-JISのCSVが手に入ります。文字コードに注意しましょう。EDINETからダウンロードするCSVはUTF16なのにこっちはShiftJISなのです。

中身は上記のようなもになっています。
末尾に0がついているものの証券コードも入っているのでStooqなどと組み合わせて有価証券報告書と株価推移をあわせてみるアプリもつくれそうです。

②書類一覧の取得と選択

会社一覧が取得できたら今度はどの会社の書類一覧(有価証券報告書や四半期報告書など)を入手する必要があります。これに関してはEDINET APIを使います。(なんで会社一覧はAPIでないのかは謎すぎる😅

API仕様書を読むとリクエストパラメーターにAPIキーを乗せる平成仕様ですが、それはともかくリクエストパラメーターにEDINETコードを指定できません、日付のみ指定できます。
つまり基本的に1年間や1ヶ月といったまとまった一覧をバッチで回して取得し、取得した一覧の中から自分でEDINETをキーに検索する必要があります。

レスポンスのJSONをすべてSQLiteに保存しておきます。

テーブルの中にdoc_idとedinet_codeがあるのがわかります

レスポンスのdoc_descroptionに書類名がありますので会社名を選択したあと、EDINETコードでこれをプルダウンに表示します

③書類のダウンロード

②で取得したdoc_idを使って種類取得APIを実行します。
リクエストパラメータにtype(1:XBRLか、2:pdfか、5:CSVかなどを渡します)

PDFの場合の例

XBRLの場合の例

EDINET解析を難解たらしめる悪名高きXBRL
これを分析するには骨がおれるので今回はこっちは使いません。

CSVの場合の例

EDINET API2から取得可能になったCSV。XBRLをCSVにしてくれています。

まだこれでもパースするのには骨が折れそうですがXBRLに比べたら10000倍マシです。
Office開きながら視覚的に探したい値やパースすべきカラムを探せます。
今回の分析ではこちらを使います。

準備

EDINET API仕様書 Version2 を参考にAPIキーを発行してください。
恐ろしいことにドキュメントにブラウザのポップアップを許可してくれと書いてあります。
APIキーには期限はないようなので一回発行するだけでいいようです。

キーはサイドバーで入力できるようにしておきます

# サイドバーにAPIキーの入力欄と保存ボタンを追加
api_key_input = st.sidebar.text_input(
    "APIキーを入力してください", value=api_key, type="password"
)

Streamlitではこれだけでサイドバーが実装できてしまいます

実装

書類一覧APIの実行

書類一覧の取得処理は、コマンドでバッチ処理する方法と画面から取得する方法両方に対応しておきます

document_list_batch.py
import os
import sys
import sqlite3
import requests
import json
import argparse
from datetime import datetime, timedelta
from pathlib import Path
import time

# 定数の設定
DATABASE_NAME = "edinet_info.db"
API_BASE_URL = "https://api.edinet-fsa.go.jp/api/v2/documents.json"
JSON_SAVE_DIR = "documentListAPI-JSON"
KEY_FILE_PATH = "Subscription-Key.txt"
API_SLEEP_TIME = 1


def create_database():
    conn = sqlite3.connect(DATABASE_NAME)
    c = conn.cursor()

    # documents_metadata テーブルの作成
    c.execute(
        """
        CREATE TABLE IF NOT EXISTS documents_metadata (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            title TEXT,
            date DATE UNIQUE,
            type TEXT,
            result_count INTEGER,
            process_date_time TEXT,
            status TEXT,
            message TEXT
        )
    """
    )

    # documents_results テーブルの作成
    c.execute(
        """
        CREATE TABLE IF NOT EXISTS documents_results (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            metadata_id INTEGER,
            seq_number INTEGER,
            doc_id TEXT,
            edinet_code TEXT,
            sec_code TEXT,
            jcn TEXT,
            filer_name TEXT,
            fund_code TEXT,
            ordinance_code TEXT,
            form_code TEXT,
            doc_type_code TEXT,
            period_start TEXT,
            period_end TEXT,
            submit_date_time TEXT,
            doc_description TEXT,
            issuer_edinet_code TEXT,
            subject_edinet_code TEXT,
            subsidiary_edinet_code TEXT,
            current_report_reason TEXT,
            parent_doc_id TEXT,
            ope_date_time TEXT,
            withdrawal_status INTEGER,
            doc_info_edit_status INTEGER,
            disclosure_status INTEGER,
            xbrl_flag INTEGER,
            pdf_flag INTEGER,
            attach_doc_flag INTEGER,
            english_doc_flag INTEGER,
            csv_flag INTEGER,
            legal_status INTEGER,
            FOREIGN KEY (metadata_id) REFERENCES documents_metadata (id)
        )
    """
    )

    conn.commit()
    conn.close()


create_database()


def load_api_key():
    if os.path.exists(KEY_FILE_PATH):
        with open(KEY_FILE_PATH, "r") as file:
            return file.read().strip()
    else:
        print(
            "APIキーが設定されていません。Subscription-Key.txtファイルを作成してください。"
        )
        sys.exit(1)


def fetch_and_save_document_list(api_key, start_date, end_date):
    conn = sqlite3.connect(DATABASE_NAME)
    c = conn.cursor()

    # JSON保存ディレクトリの作成
    Path(JSON_SAVE_DIR).mkdir(parents=True, exist_ok=True)

    current_date = start_date
    while current_date <= end_date:
        print(current_date)
        formatted_date = current_date.strftime("%Y-%m-%d")
        params = {"date": formatted_date, "type": "2", "Subscription-Key": api_key}

        response = requests.get(API_BASE_URL, params=params)
        if response.status_code == 200:
            json_data = response.json()

            # JSONデータをファイルに保存
            json_filename = f'{JSON_SAVE_DIR}/{current_date.strftime("%Y%m%d")}.json'
            with open(json_filename, "w", encoding="utf-8") as f:
                json.dump(json_data, f, ensure_ascii=False, indent=4)

            # metadataの処理
            metadata = json_data.get("metadata", {})
            title = metadata.get("title", "")
            parameter = metadata.get("parameter", {})
            date = parameter.get("date", "")
            mtype = parameter.get("type", "")
            result_count = metadata.get("resultset", {}).get("count", 0)
            process_date_time = metadata.get("processDateTime", "")
            status = metadata.get("status", "")
            message = metadata.get("message", "")

            # date を日付型に変換
            date_obj = datetime.strptime(date, "%Y-%m-%d").date() if date else None

            # documents_metadata テーブルに保存
            metadata_id = None
            try:
                c.execute(
                    """
                    INSERT INTO documents_metadata (title, date, type, result_count, process_date_time, status, message)
                    VALUES (?, ?, ?, ?, ?, ?, ?)
                """,
                    (
                        title,
                        date_obj,
                        mtype,
                        result_count,
                        process_date_time,
                        status,
                        message,
                    ),
                )
                metadata_id = c.lastrowid
            except sqlite3.IntegrityError:
                print(f"Failed to insert metadata for {formatted_date}")
                continue

            # resultsの処理
            results = json_data.get("results", [])
            for i, doc in enumerate(results):
                try:
                    c.execute(
                        """
                        INSERT INTO documents_results (
                            metadata_id, seq_number, doc_id, edinet_code, sec_code, jcn, filer_name, fund_code,
                            ordinance_code, form_code, doc_type_code, period_start, period_end, submit_date_time,
                            doc_description, issuer_edinet_code, subject_edinet_code, subsidiary_edinet_code,
                            current_report_reason, parent_doc_id, ope_date_time, withdrawal_status, doc_info_edit_status,
                            disclosure_status, xbrl_flag, pdf_flag, attach_doc_flag, english_doc_flag, csv_flag, legal_status
                        )
                        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                    """,
                        (
                            metadata_id,
                            i + 1,
                            doc.get("docID", ""),
                            doc.get("edinetCode", ""),
                            doc.get("secCode", ""),
                            doc.get("JCN", ""),
                            doc.get("filerName", ""),
                            doc.get("fundCode", ""),
                            doc.get("ordinanceCode", ""),
                            doc.get("formCode", ""),
                            doc.get("docTypeCode", ""),
                            doc.get("periodStart", ""),
                            doc.get("periodEnd", ""),
                            doc.get("submitDateTime", ""),
                            doc.get("docDescription", ""),
                            doc.get("issuerEdinetCode", ""),
                            doc.get("subjectEdinetCode", ""),
                            doc.get("subsidiaryEdinetCode", ""),
                            doc.get("currentReportReason", ""),
                            doc.get("parentDocID", ""),
                            doc.get("opeDateTime", ""),
                            doc.get("withdrawalStatus", None),
                            doc.get("docInfoEditStatus", None),
                            doc.get("disclosureStatus", None),
                            doc.get("xbrlFlag", None),
                            doc.get("pdfFlag", None),
                            doc.get("attachDocFlag", None),
                            doc.get("englishDocFlag", None),
                            doc.get("csvFlag", None),
                            doc.get("legalStatus", None),
                        ),
                    )
                except sqlite3.IntegrityError:
                    print(f"Failed to insert result for docID {doc.get('docID')}")
                    continue

        else:
            print(f"Failed to fetch data for {formatted_date}: {response.status_code}")

        current_date += timedelta(days=1)
        # リクエストごとに指定された秒数待機
        time.sleep(API_SLEEP_TIME)

    conn.commit()
    conn.close()


def main():
    # コマンドライン引数の処理
    parser = argparse.ArgumentParser(description="EDINET書類一覧取得バッチ")
    parser.add_argument("-s", "--start", type=str, help="開始日 (YYYYMMDD)")
    parser.add_argument("-e", "--end", type=str, help="終了日 (YYYYMMDD)")
    args = parser.parse_args()

    # 日付の設定
    if args.start:
        start_date = datetime.strptime(args.start, "%Y%m%d")
    else:
        start_date = datetime.now()

    if args.end:
        end_date = datetime.strptime(args.end, "%Y%m%d")
    else:
        end_date = start_date

    # APIキーの読み込み
    api_key = load_api_key()

    # データベースの初期化
    create_database()

    # 書類一覧の取得と保存
    fetch_and_save_document_list(api_key, start_date, end_date)


if __name__ == "__main__":
    main()

コマンドラインから実行する場合の例 2024/1/1から2024/5/31までの一覧を取得

python3 document_list_batch.py -s 20240101 -e 20240531

画面の実装

画面処理の他に書類のダウンロード処理、グラフ処理を担います

app.py

import streamlit as st
import sqlite3
from datetime import datetime, timedelta
import os
import pandas as pd
import requests
import base64
from xbrl_to_csv_analysis import xbrl_to_csv_analysis

# fetch_and_save_document_list をインポート
from document_list_batch import fetch_and_save_document_list
import io
import plotly.graph_objects as go

# データベースファイルとテーブル名
DATABASE_NAME = "edinet_info.db"
KEY_FILE_PATH = "Subscription-Key.txt"
# CSVファイルのパス
CSV_FILE_PATH = "EdinetcodeDlInfo.csv"
DOWNLOAD_DIR = "download"


class AccountingStandards:
    JAPAN_GAAP = "Japan GAAP"
    US_GAAP = "US GAAP"
    IFRS = "IFRS"


# CSVを読み込む関数(Shift-JISエンコーディングに対応)
def load_company_data():
    try:
        df = pd.read_csv(CSV_FILE_PATH, encoding="cp932", header=1)
        return df
    except Exception as e:
        st.error(f"会社名の読み込みに失敗しました: {e}")
        return pd.DataFrame()  # 空のデータフレームを返す


# APIキーを読み込む関数
def load_api_key():
    if os.path.exists(KEY_FILE_PATH):
        with open(KEY_FILE_PATH, "r") as file:
            return file.read().strip()
    return ""


# 最終更新日を取得する関数
def get_last_update_date():
    if os.path.exists(DATABASE_NAME):
        conn = sqlite3.connect(DATABASE_NAME)
        c = conn.cursor()
        c.execute("SELECT MAX(date) FROM documents_metadata")
        result = c.fetchone()
        conn.close()
        if result and result[0]:
            return result[0]  # 最終更新日を返す
        else:
            return None
    else:
        return None


# ファイルをダウンロードするボタンの作成
def generate_download_button(file_path, file_name):
    with open(file_path, "rb") as file:
        file_data = file.read()
        b64 = base64.b64encode(file_data).decode()
        href = f'<a href="data:file/{file_name.split(".")[-1]};base64,{b64}" download="{file_name}">こちらをクリックしてファイルをダウンロード</a>'
        st.markdown(href, unsafe_allow_html=True)


# 書類一覧を更新する関数(最終更新日から現在まで)
def update_document_list(api_key):
    last_update_date = get_last_update_date()
    if last_update_date:
        start_date = datetime.strptime(last_update_date, "%Y-%m-%d").date() + timedelta(
            days=1
        )
    else:
        start_date = datetime.now().date() - timedelta(
            days=30
        )  # 初回取得時は過去30日分
    end_date = datetime.now().date()
    fetch_and_save_document_list(api_key, start_date, end_date)


def load_documents(edinet_code):
    conn = sqlite3.connect(DATABASE_NAME)
    c = conn.cursor()
    c.execute(
        """
        SELECT doc_id, doc_description, submit_date_time, doc_type_code 
        FROM documents_results 
        WHERE edinet_code = ? 
        ORDER BY submit_date_time DESC
    """,
        (edinet_code,),
    )
    results = c.fetchall()
    conn.close()

    # doc_type_codeも含めてメモリに保持
    documents = []
    for row in results:
        documents.append(
            {
                "doc_id": row[0],
                "doc_description": f"{row[1]} ({row[2]})",
                "doc_type_code": row[3],
            }
        )

    return documents


def download_document(api_key, document_id, doc_type):
    # ダウンロードフォルダが存在しなければ作成
    if not os.path.exists(DOWNLOAD_DIR):
        os.makedirs(DOWNLOAD_DIR)

    # 書類取得APIのURLとパラメータ
    api_url = f"https://api.edinet-fsa.go.jp/api/v2/documents/{document_id}"
    params = {"type": doc_type, "Subscription-Key": api_key}
    print(api_url)
    # APIリクエストを送信
    response = requests.get(api_url, params=params)

    if response.status_code == 200:
        # Content-Typeヘッダーをチェック
        content_type = response.headers.get("Content-Type")

        if content_type == "application/octet-stream":
            # ZIP形式でファイルを保存
            file_extension = "zip"
        elif content_type == "application/pdf":
            # PDF形式でファイルを保存
            file_extension = "pdf"
        else:
            st.error(f"対応していないファイル形式です: {content_type}")
            return None

        # ファイル名を指定
        file_name = f"{document_id}_{doc_type}.{file_extension}"
        file_path = os.path.join(DOWNLOAD_DIR, file_name)

        # ファイルを保存
        with open(file_path, "wb") as file:
            file.write(response.content)

        return file_path
    else:
        st.error(f"書類取得に失敗しました。ステータスコード: {response.status_code}")
        return None


# Streamlitアプリケーションの設定
st.title("EDINET 情報取得アプリ")

# 画面ロード時にAPIキーを読み込む
api_key = load_api_key()

# サイドバーにAPIキーの入力欄と保存ボタンを追加
api_key_input = st.sidebar.text_input(
    "APIキーを入力してください", value=api_key, type="password"
)

if st.sidebar.button("保存"):
    with open(KEY_FILE_PATH, "w") as f:
        f.write(api_key_input)
    st.sidebar.success("APIキーが保存されました。")

# 最終更新日の取得
last_update_date = get_last_update_date()

# 書類一覧を更新ボタンと最終更新日を横に配置
col1, col2 = st.columns([1, 2])

with col1:
    if st.button("書類一覧を更新"):
        if api_key_input:
            update_document_list(api_key_input)
            st.success("書類一覧が更新されました。")
            # 更新後に再度最終更新日を取得して表示
            last_update_date = get_last_update_date()
        else:
            st.error("APIキーが設定されていません。")

with col2:
    if last_update_date:
        st.write(f"最終更新日: {last_update_date}")
    else:
        st.write("最終更新日: なし")

# 会社名データの読み込み
company_data = load_company_data()

# 会社名のプルダウンメニューを追加
company_names = company_data["提出者名"].tolist() if not company_data.empty else []
selected_company = st.selectbox("会社名を選択してください", company_names)

# 選択された会社の情報を表示
if selected_company:
    company_info = company_data[company_data["提出者名"] == selected_company]
    if not company_info.empty:
        st.write("選択された会社の情報")
        st.dataframe(company_info[["EDINETコード", "資本金", "証券コード"]])

        # EDINETコードを取得
        edinet_code = company_info.iloc[0]["EDINETコード"]

        # 書類のプルダウンメニューを表示
        documents = load_documents(edinet_code)

        # doc_descriptionを表示し、doc_idとdoc_type_codeを保持するプルダウンを作成
        document_options = {
            doc["doc_description"]: (doc["doc_id"], doc["doc_type_code"])
            for doc in documents
        }
        selected_document_desc = st.selectbox(
            "書類を選択してください", list(document_options.keys())
        )
        selected_document_id, selected_doc_type_code = document_options[
            selected_document_desc
        ]

        # 取得方法のプルダウンメニューを表示
        doc_types = {
            "1": "提出本文書及び監査報告書",
            "2": "PDF",
            "3": "代替書面・添付文書",
            "4": "英文ファイル",
            "5": "CSV",
        }
        selected_type = st.selectbox(
            "取得方法を選択してください", list(doc_types.values())
        )

        # ダウンロードボタンの配置
        if st.button("ダウンロード"):
            # doc_typeを取得
            doc_type_key = list(doc_types.keys())[
                list(doc_types.values()).index(selected_type)
            ]

            # 書類をダウンロード
            file_path = download_document(
                api_key_input, selected_document_id, doc_type_key
            )

            if file_path:
                st.success("書類がダウンロードされました。")
                file_name = os.path.basename(file_path)
                generate_download_button(file_path, file_name)

                # ダウンロードしたファイルのパスをセッションステートに保存
                st.session_state["downloaded_file_path"] = file_path
                st.session_state["selected_doc_type_code"] = (
                    selected_doc_type_code  # doc_type_codeも保存
                )
                # CSVファイルを含むZIPファイルの場合のみボタンを表示
                if doc_type_key == "5":
                    st.button("XBRL_TO_CSVを分析", key="analyze_csv")

        # XBRL_TO_CSV分析ボタンの処理
        if st.session_state.get("analyze_csv"):
            file_path = st.session_state["downloaded_file_path"]
            selected_doc_type_code = st.session_state.get("selected_doc_type_code")
            error_message, extracted_data = xbrl_to_csv_analysis(
                file_path, selected_doc_type_code
            )

            if error_message:
                st.error(error_message)
            else:
                if extracted_data:
                    st.write("XBRL_TO_CSV解析結果")

                    # カスタムデザインのCSSを定義
                    st.markdown(
                        """
                        <style>
                        .custom-table {
                            width: 100%;
                            border-collapse: collapse;
                            margin: 25px 0;
                            font-size: 18px;
                            text-align: left;
                        }
                        .custom-table th, .custom-table td {
                            padding: 12px 15px;
                            border: 1px solid #ddd;
                        }
                        .custom-table th {
                            background-color: #f4f4f4;
                        }
                        .custom-table tr:nth-of-type(even) {
                            background-color: #f9f9f9;
                        }
                        </style>
                    """,
                        unsafe_allow_html=True,
                    )

                    # テーブルをHTMLで表示
                    st.markdown(
                        """
                    <table class="custom-table">
                    <thead>
                    <tr>
                    <th>項目名</th>
                    <th>値</th>
                    </tr>
                    </thead>
                    <tbody>
                    """
                        + "".join(
                            f"<tr><td>{item[0]}</td><td>{item[1]}</td></tr>"
                            for item in extracted_data
                        )
                        + """
                    </tbody>
                    </table>
                    """,
                        unsafe_allow_html=True,
                    )

                    # 必要な項目名のリスト
                    required_items = None

                    # extracted_dataから「会計基準、DEI」の項目を探す
                    accounting_standard = None
                    for item_name, value in extracted_data:
                        if item_name == "会計基準、DEI":
                            accounting_standard = value.strip()
                            break

                    # 会計基準に基づいて required_items を設定
                    if accounting_standard == AccountingStandards.JAPAN_GAAP:
                        required_items = {
                            "売上高": "売上高",
                            "営業利益又は営業損失": "営業利益",
                            "経常利益又は経常損失": "経常利益",
                            "親会社株主に帰属する当期純利益又は親会社株主に帰属する当期純損失": "当期純利益",
                        }
                    elif accounting_standard == AccountingStandards.IFRS:
                        required_items = {}
                        # extracted_data の中に売上高関連のキーが存在するかを確認し、それに基づいて設定
                        sales_key = None
                        for key in ["売上高(IFRS)", "売上収益(IFRS)"]:
                            if any(key == item_name for item_name, _ in extracted_data):
                                sales_key = key
                                required_items[sales_key] = "売上高"
                                break  # 見つかった時点でループを抜ける

                        # 営業利益の設定
                        required_items["営業利益(△損失)(IFRS)"] = "営業利益"

                        # 当期純利益の設定
                        required_items["親会社の所有者、当期利益(△損失)(IFRS)"] = (
                            "当期純利益"
                        )

                    else:
                        st.error(f"未対応の会計基準です: {accounting_standard}")

                    # グラフに表示するデータを格納
                    graph_data = {}

                    for item_name, value in extracted_data:
                        # 項目名から末尾の「(△)」を削除して一致判定
                        cleaned_item_name = item_name.replace("(△)", "")
                        if cleaned_item_name in required_items:
                            # 単位を100万円に換算し、表示する項目名に変更
                            amount_in_million = (
                                float(value.split()[0].replace(",", "")) / 1_000_000
                            )
                            graph_data[required_items[cleaned_item_name]] = (
                                amount_in_million
                            )

                    # 必要なすべての項目が揃っている場合にグラフを表示
                    if all(item in graph_data for item in required_items.values()):
                        # 営業利益率を計算
                        operating_income_rate = (
                            graph_data["営業利益"] / graph_data["売上高"]
                        ) * 100

                        # グラフを作成
                        fig = go.Figure(
                            data=[
                                go.Bar(
                                    x=list(graph_data.keys()),
                                    y=list(graph_data.values()),
                                    text=[
                                        f"{value:,.0f}" for value in graph_data.values()
                                    ],
                                    textposition="auto",
                                )
                            ]
                        )

                        # グラフのタイトルとレイアウトを設定
                        fig.update_layout(
                            title=f"{selected_company} 営業利益率: {operating_income_rate:.2f}%",
                            xaxis_title="項目",
                            yaxis_title="金額 (百万円)",
                            yaxis=dict(tickformat=",.0f"),
                            showlegend=False,
                        )

                        # グラフをStreamlitに表示
                        st.plotly_chart(fig, use_container_width=True)
                else:
                    st.write("該当するデータが見つかりませんでした。")

    else:
        st.write("該当する会社情報が見つかりませんでした。")

ダウンロードしたXBRLtoCSVのパース

基本的に画面でしか使いませんがデバック用にこちらもコマンドラインから実行できるようにしておきます

xbrl_to_csv_analysis.py
import zipfile
import os
import pandas as pd
import sys


def xbrl_to_csv_analysis(zip_file_path, doc_type_code):
    # doc_type_codeに基づいて設定ファイルを特定
    select_column_file_path = f"docTypeCode{doc_type_code}_SelectColumn.txt"

    # 定義ファイルの存在を確認
    if not os.path.exists(select_column_file_path):
        return (
            f"docTypeCode{doc_type_code}を分析する定義ファイル「{select_column_file_path}」がありません",
            None,
        )

    # ZIPファイル名を取得して、拡張子を除去してフォルダ名を作成
    base_name = os.path.basename(zip_file_path)
    extract_dir = os.path.join(
        os.path.dirname(zip_file_path), base_name.replace(".zip", "")
    )

    # ZIPファイルを展開
    with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
        zip_ref.extractall(extract_dir)

    # 検索対象の報告書略語をdoc_type_codeに基づいて設定
    if doc_type_code == "120":
        search_patterns = ["-asr-"]  # 有価証券報告書
    elif doc_type_code == "140":
        search_patterns = [
            "-q1r-",
            "-q2r-",
            "-q3r-",
            "-q4r-",
            "-q5r-",
        ]  # 四半期報告書(第1~第5期)

    else:
        return (
            f"doc_type_code {doc_type_code} に対応する検索パターンが定義されていません",
            None,
        )

    # XBRL_TO_CSV フォルダ内の CSV ファイルを検索
    csv_dir = os.path.join(extract_dir, "XBRL_TO_CSV")
    csv_files = []
    for pattern in search_patterns:
        csv_files.extend(
            [f for f in os.listdir(csv_dir) if pattern in f and f.endswith(".csv")]
        )

    if not csv_files:
        return (
            f"doc_type_code {doc_type_code} に対応するファイルが見つかりませんでした",
            None,
        )

    # CSVファイルをロード (UTF-16エンコーディングとタブ区切りを指定)
    csv_file_path = os.path.join(csv_dir, csv_files[0])
    df = pd.read_csv(csv_file_path, encoding="utf-16", sep="\t")

    # 定義ファイルの読み込み
    with open(select_column_file_path, "r") as file:
        lines = file.readlines()

    selected_columns = []
    for line in lines:
        if not line.startswith("#"):  # コメント行は無視
            element_id, context_id = line.split()[0], line.split()[1]
            selected_columns.append((element_id, context_id))

    # 抽出したい要素IDとコンテキストIDに基づいてデータをフィルタリング
    extracted_data = []
    for element_id, context_id in selected_columns:
        filtered_df = df[
            (df["要素ID"] == element_id) & (df["コンテキストID"] == context_id)
        ]
        if not filtered_df.empty:
            # 該当する行が見つかった場合、値と単位を抽出
            value = filtered_df["値"].values[0]
            unit = (
                filtered_df["単位"].values[0] if "単位" in filtered_df.columns else ""
            )

            # 値が数値かどうかを確認し、数値でない場合にはそのまま表示
            try:
                value = float(value)
                formatted_value = f"{value:,.0f} {unit}"  # 整数のカンマ区切り
            except ValueError:
                formatted_value = f"{value}"  # 数値でない場合そのまま表示

            extracted_data.append((filtered_df["項目名"].values[0], formatted_value))

    return None, extracted_data


def main(zip_file_path, doc_type_code):
    # 分析関数を呼び出す
    error_message, extracted_data = xbrl_to_csv_analysis(zip_file_path, doc_type_code)

    # 結果を表示
    if error_message:
        print(error_message)
    else:
        if extracted_data:
            print("XBRL_TO_CSV解析結果:")
            for item in extracted_data:
                print(f"|{item[0]}| {item[1]}|")
        else:
            print("該当するデータが見つかりませんでした。")


if __name__ == "__main__":
    if len(sys.argv) != 3:
        print(
            "使用方法: python xbrl_to_csv_analysis.py <ZIPファイルパス> <doc_type_code>"
        )
    else:
        zip_file_path = sys.argv[1]
        doc_type_code = sys.argv[2]
        main(zip_file_path, doc_type_code)

期末有価証券報告書の解析

 python3 xbrl_to_csv_analysis.py ./download/S100TOS4_5.zip 120

四半期ZIPの解析

python3 xbrl_to_csv_analysis.py ./download/S100RO4X_5.zip 140

第二引数はdoc_type_codeの値(API仕様書に一覧あり)

CSVからの抽出カラム設定ファイル

ここが骨が折れる作業です、会社によってカラム構成が違ったり会計基準(JAPAN GAAP、IFRS)の違いなどでCSVから抽出すべきカラムが変わってくるので中身を見ながら汎用的に作り込む必要があります。

120 有価証券報告書の例

docTypeCode120_SelectColumn.txt
jppfs_cor:NetSales CurrentYearDuration
jppfs_cor:OperatingIncome CurrentYearDuration
jppfs_cor:OrdinaryIncome CurrentYearDuration
jppfs_cor:ProfitLossAttributableToOwnersOfParent CurrentYearDuration
# 会計基準
jpdei_cor:AccountingStandardsDEI FilingDateInstant
# IFRS
jpigp_cor:NetSalesIFRS CurrentYearDuration #売上高
jpigp_cor:RevenueIFRS CurrentYearDuration #売上収益
jpigp_cor:OperatingProfitLossIFRS CurrentYearDuration #営業利益	
jpigp_cor:ProfitLossAttributableToOwnersOfParentIFRS CurrentYearDuration #当期利益

140 四半期報告書の例

docTypeCode140_SelectColumn.txt
jppfs_cor:NetSales CurrentYTDDuration #売上
jppfs_cor:OperatingIncome CurrentYTDDuration #利益
jppfs_cor:OrdinaryIncome CurrentYTDDuration
jppfs_cor:ProfitLossAttributableToOwnersOfParent CurrentYTDDuration
# 会計基準
jpdei_cor:AccountingStandardsDEI FilingDateInstant
# IFRS
jpigp_cor:NetSalesIFRS CurrentYTDDuration
jpigp_cor:OperatingProfitLossIFRS CurrentYTDDuration #営業利益(△損失)(IFRS)	
jpigp_cor:ProfitLossAttributableToOwnersOfParentIFRS CurrentYTDDuration #親会社の所有者、当期利益(△損失)(IFRS)

実行

streamlit run app.py

まとめ

いままでこういったものはサーバー処理とクライアント処理、両方を実装するのが手間でコンソールアプリケーションやIPythonでやる手法が多かったですが、Streamlitの登場とChatgptの支援でクライアントサイドはほとんど手間かけずに実装できるようになりました。今回コードは1行も自分でかかずにすべてChatgptに指示して書きました。

ChatGPTにあたえた画面仕様

Discussion