🙆

FITファイル分析をさらに進化:Pythonスクリプトの改良と新機能

に公開

こんにちは!以前の記事「PythonとfitdecodeでサイクリングのFITファイルを分析してみる」では、fitdecodeライブラリを使ってFITファイルから基本的なデータを抽出し、簡単なグラフを作成する方法をご紹介しました。
もっと詳細な分析や見やすい可視化をしたい!ということで、スクリプトの改良と機能追加を行いました。今回は、そのパワーアップしたバージョンについてご紹介します。

前回のおさらい

前回のスクリプトでは、主に以下のことを行いました。

1. fitdecodeを使ってFITファイルを読み込む。
2. レコードメッセージからタイムスタンプ、距離、高度、速度などのデータを抽出する。
3. pandasでDataFrameに格納する。
4. matplotlibで基本的な時系列グラフ(時間に対する速度や高度など)を描画する。

これはFITファイル分析の第一歩としては十分でしたが、実用性を考えるといくつか改善したい点がありました。

今回の主な改善点と新機能

新しいスクリプトでは、以下の点が進化しています。

1. 堅牢なデータ読み込みと前処理 (read_fit_file):

    ◦ 必須フィールドチェック: レコードごとに必要なフィールド(timestamp, distance, altitude, speed, grade)が揃っているかを確認し、不足しているレコードはスキップするようにしました。これにより、データ欠損によるエラーを防ぎます。
    ◦ 厳密なデータ型変換とNaN処理: pd.to_numericのerrors='coerce'オプションを使い、数値に変換できないデータをNaN(Not a Number)として扱います。その後、必須カラムにNaNを含む行を削除し、警告を表示することで、分析の信頼性を高めました。
    ◦ 差分データ(distance_delta)の計算: 各レコード間の移動距離をdiff()で計算し、distance_deltaとして追加しました。これは後述する距離ベースの分析や統計計算に不可欠です。
    ◦ エラーハンドリング強化: FileNotFoundErrorやfitdecode.FitDecodeErrorに加え、予期せぬエラーも捕捉し、標準エラー出力(sys.stderr)にメッセージを表示して終了するように改善しました。

2. 多彩なグラフ描画機能:

    ◦ 距離ベースのプロット: 時間軸だけでなく、走行距離(km)を横軸にしたグラフ(高度、速度、勾配)を追加しました。コースプロフィールを直感的に把握できます (plot_distance_series)。
    ◦ 2軸グラフ: 距離を横軸に、速度と高度、または速度と勾配のように、単位の異なる2つのデータを1つのグラフに表示できるようにしました (plot_dual_axis_distance)。相関関係を見るのに便利です。
    ◦ 分布グラフ: ヒストグラム (plot_histogram)、箱ひげ図 (plot_box)、バイオリンプロット (plot_violin) を追加し、勾配などのデータの分布状況を可視化できるようにしました。
    ◦ 棒グラフ: 勾配ごとの平均速度や走行距離など、カテゴリ別の集計結果を分かりやすく表示します (plot_bar)。
    ◦ プロットの改善:
        ▪ seabornで見やすいスタイルを設定。
        ▪ 時間軸グラフのX軸ラベルを「時:分:秒」形式に整形 (mdates.DateFormatter)。
        ▪ 各プロットを個別のPNGファイルとして、指定したディレクトリ (fit_analysis_plots) に自動保存。
        ▪ plt.close(fig)でメモリリークを防止。

実際にグラフは以下のようなものが作成され、保存されます。

3. 勾配に関する詳細な統計分析 (calculate_grade_stats):

    ◦ 勾配別集計: 勾配を一定の範囲(例: 0.5%ごと)で丸めてグループ化し、各勾配範囲での平均速度(km/h)と合計走行距離(m)を計算します。どの勾配でどれだけ走り、どの程度の速度が出ていたかが分かります。
    ◦ 距離による重み付き勾配統計: 単純な勾配の平均値だけでなく、各レコード間の移動距離 (distance_delta) で重み付けした平均勾配と標準偏差を計算します。例えば、緩い勾配を長く走った場合、その勾配が全体の平均値により強く影響するようになります。これにより、体感に近い活動全体の勾配特性を数値化できます。

4. コード構成と使いやすさ:

    ◦ 処理を関数(read_fit_file, 各plot_*関数, calculate_grade_stats, main)に分割し、見通しと再利用性を向上させました。
    ◦ 定数(ファイルパス、変換係数など)を冒頭にまとめました。
    ◦ 処理済みの主要データをCSVファイル (output_activity_data.csv) に保存する機能を追加しました。これにより、他のツールでの再分析や共有が容易になります。
    ◦ スクリプトとして直接実行するための if __name__ == "__main__": を使用しています。

実際のコードは以下の通りです。

import fitdecode
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import seaborn as sns
import sys
import os
from datetime import datetime

# 定数
MPS_TO_KPH = 3.6  # m/s から km/h への変換係数
REQUIRED_COLUMNS = ['timestamp', 'distance', 'altitude', 'speed', 'grade']
FIT_FILE_PATH = '############.fit' # .fitファイルのパスを指定
OUTPUT_CSV_PATH = 'output_activity_data.csv'

# Seaborn スタイル設定
sns.set(style='darkgrid')

def read_fit_file(filepath):
    """FITファイルを読み込み、レコードデータをDataFrameとして返す"""
    datas = []
    try:
        with fitdecode.FitReader(filepath) as fit:
            print(f"Reading FIT file: {filepath}")
            for frame in fit:
                if isinstance(frame, fitdecode.FitDataMessage) and frame.name == 'record':
                    data = {}
                    has_required_fields = True
                    # 必要なフィールドのみを抽出
                    required_fields = {'timestamp', 'distance', 'altitude', 'speed', 'grade'}
                    present_fields = {field.name for field in frame.fields}

                    if required_fields.issubset(present_fields):
                         for field in frame.fields:
                              if field.name in required_fields:
                                   data[field.name] = field.value
                         datas.append(data)
                    # else:
                    #     print(f"Skipping record due to missing fields: {required_fields - present_fields}", file=sys.stderr)

            print(f"Successfully read {len(datas)} records.")

    except FileNotFoundError:
        print(f"Error: File not found at {filepath}", file=sys.stderr)
        sys.exit(1)
    except fitdecode.FitDecodeError as e:
        print(f"Error decoding FIT file: {e}", file=sys.stderr)
        sys.exit(1)
    except Exception as e:
        print(f"An unexpected error occurred: {e}", file=sys.stderr)
        sys.exit(1)

    if not datas:
        print("No record messages found in the FIT file.", file=sys.stderr)
        sys.exit(1)

    df = pd.DataFrame(datas)

    # データ型の変換と検証
    if 'timestamp' in df.columns:
        df['timestamp'] = pd.to_datetime(df['timestamp'])
    else:
        print("Error: 'timestamp' column is missing.", file=sys.stderr)
        sys.exit(1)

    # 他の必須カラムの存在チェック
    missing_cols = [col for col in REQUIRED_COLUMNS if col not in df.columns]
    if missing_cols:
        print(f"Error: Missing required columns: {', '.join(missing_cols)}", file=sys.stderr)
        sys.exit(1)

    # 数値型に変換(エラー時はNaNにする)
    for col in ['distance', 'altitude', 'speed', 'grade']:
        df[col] = pd.to_numeric(df[col], errors='coerce')

    # NaNを含む行があれば削除または警告(ここでは削除を選択)
    initial_rows = len(df)
    df.dropna(subset=REQUIRED_COLUMNS, inplace=True)
    if len(df) < initial_rows:
        print(f"Warning: Dropped {initial_rows - len(df)} rows containing NaN values in required columns.", file=sys.stderr)

    if df.empty:
        print("Error: DataFrame is empty after cleaning.", file=sys.stderr)
        sys.exit(1)

    # 速度をkm/hに変換
    df['speed_kph'] = df['speed'] * MPS_TO_KPH

    # 距離の差分(各レコード間の移動距離)を計算
    # distanceは累積距離なので、diff()で各レコード間の移動距離を求める
    df['distance_delta'] = df['distance'].diff().fillna(0)
    # 最初のレコードのdistance_deltaが0になるように調整
    # (もし最初のレコードのdistanceが0でない場合、その区間の距離が失われる可能性があるが、
    #  通常は0から始まるため、ここではfillna(0)で対応)
    df.loc[df['distance_delta'] < 0, 'distance_delta'] = 0 # 稀な負の差分を補正

    return df

def plot_time_series(df, y_col, y_label, title, filename):
    """時系列データをプロットし、ファイルに保存"""
    fig, ax = plt.subplots(figsize=(12, 6))
    ax.plot(df['timestamp'], df[y_col])
    ax.set_xlabel('Time')
    ax.set_ylabel(y_label)
    ax.set_title(title)
    ax.grid(True)
    # X軸のフォーマット設定
    ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S'))
    fig.autofmt_xdate() # ラベルが重ならないように自動調整
    plt.savefig(f"{filename}.png")
    print(f"Saved plot: {filename}.png")
    plt.close(fig)

def plot_distance_series(df, y_col, y_label, title, filename):
    """距離ベースのデータをプロットし、ファイルに保存"""
    fig, ax = plt.subplots(figsize=(12, 6))
    ax.plot(df['distance'] / 1000, df[y_col]) # 距離をkmに変換
    ax.set_xlabel('Distance / km')
    ax.set_ylabel(y_label)
    ax.set_title(title)
    ax.grid(True)
    plt.savefig(f"{filename}.png")
    print(f"Saved plot: {filename}.png")
    plt.close(fig)

def plot_xy(df, x_col, y_col, x_label, y_label, title, filename, x_scale=1):
    """X-Yプロットを生成し、ファイルに保存"""
    fig, ax = plt.subplots(figsize=(10, 6))
    ax.plot(df[x_col] * x_scale, df[y_col])
    ax.set_xlabel(x_label)
    ax.set_ylabel(y_label)
    ax.set_title(title)
    ax.grid(True)
    plt.savefig(f"{filename}.png")
    print(f"Saved plot: {filename}.png")
    plt.close(fig)

def plot_dual_axis_distance(df, y1_col, y2_col, y1_label, y2_label, title, filename):
    """距離をX軸とし、2つのY軸を持つプロットを生成し、ファイルに保存"""
    fig, ax1 = plt.subplots(figsize=(12, 6), constrained_layout=True)
    x_km = df['distance'] / 1000 # 距離をkmに変換

    # 1つ目のY軸 (左)
    color1 = 'C0'
    ax1.set_xlabel('Distance / km')
    ax1.set_ylabel(y1_label, color=color1)
    ax1.plot(x_km, df[y1_col], color=color1, label=y1_label.split('/')[0]) # ラベルを短縮
    ax1.tick_params(axis='y', labelcolor=color1)
    ax1.grid(True)

    # 2つ目のY軸 (右)
    ax2 = ax1.twinx()
    color2 = 'C1'
    ax2.set_ylabel(y2_label, color=color2)
    ax2.plot(x_km, df[y2_col], color=color2, label=y2_label.split('/')[0]) # ラベルを短縮
    ax2.tick_params(axis='y', labelcolor=color2)

    fig.suptitle(title)
    # 凡例を結合して表示
    lines1, labels1 = ax1.get_legend_handles_labels()
    lines2, labels2 = ax2.get_legend_handles_labels()
    ax1.legend(lines1 + lines2, labels1 + labels2, loc='best')

    plt.savefig(f"{filename}.png")
    print(f"Saved plot: {filename}.png")
    plt.close(fig)

def plot_histogram(series, x_label, title, filename, bins=30):
    """ヒストグラムをプロットし、ファイルに保存"""
    fig, ax = plt.subplots(figsize=(10, 6))
    ax.hist(series.dropna(), bins=bins) # NaN値を除外
    ax.set_xlabel(x_label)
    ax.set_ylabel('Frequency (records)')
    ax.set_title(title)
    plt.savefig(f"{filename}.png")
    print(f"Saved plot: {filename}.png")
    plt.close(fig)

def plot_bar(x_data, y_data, x_label, y_label, title, filename):
    """棒グラフをプロットし、ファイルに保存"""
    fig, ax = plt.subplots(figsize=(12, 6))
    ax.bar(x_data, y_data)
    ax.set_xlabel(x_label)
    ax.set_ylabel(y_label)
    ax.set_title(title)
    plt.xticks(rotation=45, ha='right') # ラベルが見やすいように回転
    plt.tight_layout()
    plt.savefig(f"{filename}.png")
    print(f"Saved plot: {filename}.png")
    plt.close(fig)

def plot_box(data, x_label, y_label, title, filename):
    """箱ひげ図をプロットし、ファイルに保存"""
    fig, ax = plt.subplots(figsize=(8, 6))
    ax.boxplot(data)
    ax.set_xlabel(x_label)
    ax.set_ylabel(y_label)
    ax.set_title(title)
    plt.savefig(f"{filename}.png")
    print(f"Saved plot: {filename}.png")
    plt.close(fig)

def plot_violin(data, x_label, y_label, title, filename):
    """バイオリンプロットをプロットし、ファイルに保存"""
    fig, ax = plt.subplots(figsize=(8, 6))
    ax.violinplot(data, showmeans=True) # 平均値も表示
    ax.set_xlabel(x_label)
    ax.set_ylabel(y_label)
    ax.set_title(title)
    plt.savefig(f"{filename}.png")
    print(f"Saved plot: {filename}.png")
    plt.close(fig)

def calculate_grade_stats(df):
    """勾配ごとの統計情報(平均速度、合計距離)を計算"""
    # 勾配を丸めてグルーピングしやすくする(例:0.5%単位)
    df['grade_rounded'] = (df['grade'] * 2).round() / 2

    # 勾配ごとの平均速度 (km/h)
    avg_speed_per_grade = df.groupby('grade_rounded')['speed_kph'].mean()

    # 勾配ごとの合計距離 (m)
    distance_per_grade = df.groupby('grade_rounded')['distance_delta'].sum()

    # 距離で重み付けした勾配リストを作成
    # distance_deltaが0より大きいレコードのみを対象
    weighted_grades = []
    df_filtered = df[df['distance_delta'] > 0]
    for index, row in df_filtered.iterrows():
        # 移動距離(m)に応じて、そのレコードの勾配を追加
        # 整数に丸めて回数を決める(より正確には重み付き統計を使うべきだが、簡略化)
        count = int(round(row['distance_delta']))
        if count > 0:
            weighted_grades.extend([row['grade']] * count)

    if not weighted_grades:
         print("Warning: No distance data available to calculate weighted grade statistics.", file=sys.stderr)
         avg_grade_weighted = np.nan
         std_dev_grade_weighted = np.nan
    else:
        avg_grade_weighted = np.average(weighted_grades)
        std_dev_grade_weighted = np.std(weighted_grades)

    return avg_speed_per_grade, distance_per_grade, weighted_grades, avg_grade_weighted, std_dev_grade_weighted


def main():
    """メイン処理"""
    # 1. FITファイル読み込みとデータ準備
    df = read_fit_file(FIT_FILE_PATH)

    # 出力ディレクトリ作成
    output_dir = "fit_analysis_plots"
    os.makedirs(output_dir, exist_ok=True)

    # 2. プロット生成
    print("\nGenerating plots...")
    # 時系列プロット
    plot_time_series(df, 'distance', 'Distance / m', 'Distance vs Time', os.path.join(output_dir, 'fig1_distance_time'))
    plot_time_series(df, 'altitude', 'Altitude / m', 'Altitude vs Time', os.path.join(output_dir, 'fig2_altitude_time'))
    plot_time_series(df, 'speed_kph', 'Speed / km/h', 'Speed vs Time', os.path.join(output_dir, 'fig3_speed_time'))
    plot_time_series(df, 'grade', 'Grade / %', 'Grade vs Time', os.path.join(output_dir, 'fig4_grade_time'))

    # 距離ベースのプロット
    plot_distance_series(df, 'altitude', 'Altitude / m', 'Altitude vs Distance', os.path.join(output_dir, 'fig2_1_altitude_distance'))
    plot_distance_series(df, 'speed_kph', 'Speed / km/h', 'Speed vs Distance', os.path.join(output_dir, 'fig3_1_speed_distance'))
    plot_distance_series(df, 'grade', 'Grade / %', 'Grade vs Distance', os.path.join(output_dir, 'fig4_1_grade_distance'))

    # 複合プロット (距離ベース)
    plot_dual_axis_distance(df, 'speed_kph', 'altitude', 'Speed / km/h', 'Altitude / m', 'Speed & Altitude vs Distance', os.path.join(output_dir, 'fig3_2_speed_altitude_distance'))
    plot_dual_axis_distance(df, 'speed_kph', 'grade', 'Speed / km/h', 'Grade / %', 'Speed & Grade vs Distance', os.path.join(output_dir, 'fig4_2_speed_grade_distance'))

    # 分布プロット
    plot_histogram(df['grade'], 'Grade / %', 'Grade Distribution (Frequency)', os.path.join(output_dir, 'fig5_grade_histogram'))

    # 3. 勾配ごとの統計計算とプロット
    print("\nCalculating grade statistics...")
    avg_speed_per_grade, distance_per_grade, weighted_grades, avg_grade_weighted, std_dev_grade_weighted = calculate_grade_stats(df)

    # 結果表示
    print("\n--- Grade Statistics ---")
    print("Average Speed per Grade (km/h):")
    print(avg_speed_per_grade)
    print("\nDistance per Grade (m):")
    print(distance_per_grade)
    if weighted_grades:
        print(f"\nWeighted Average Grade: {avg_grade_weighted:.2f} %")
        print(f"Weighted Standard Deviation of Grade: {std_dev_grade_weighted:.2f} %")
    else:
         print("\nWeighted grade statistics could not be calculated (likely no distance data).")


    # 統計プロット
    if not avg_speed_per_grade.empty:
        plot_bar(avg_speed_per_grade.index.astype(str), avg_speed_per_grade.values, 'Grade / %', 'Average Speed / km/h', 'Average Speed per Grade', os.path.join(output_dir, 'fig6_avg_speed_per_grade'))
    else:
        print("Skipping average speed per grade plot (no data).")

    if not distance_per_grade.empty:
        plot_bar(distance_per_grade.index.astype(str), distance_per_grade.values, 'Grade / %', 'Total Distance / m', 'Distance Distribution per Grade', os.path.join(output_dir, 'fig7_distance_per_grade'))
    else:
         print("Skipping distance per grade plot (no data).")


    if weighted_grades:
        plot_box(weighted_grades, 'Activity', 'Grade / %', 'Grade Distribution (Box Plot, Weighted by Distance)', os.path.join(output_dir, 'fig8_grade_boxplot'))
        plot_violin(weighted_grades, 'Activity', 'Grade / %', 'Grade Distribution (Violin Plot, Weighted by Distance)', os.path.join(output_dir, 'fig9_grade_violinplot'))
    else:
         print("Skipping box and violin plots (no weighted grade data).")


    # 4. CSVファイル保存
    try:
        # 保存するカラムを選択 (元データ + 計算結果の一部)
        columns_to_save = ['timestamp', 'distance', 'altitude', 'speed', 'grade', 'speed_kph', 'distance_delta']
        df_to_save = df[[col for col in columns_to_save if col in df.columns]]
        df_to_save.to_csv(OUTPUT_CSV_PATH, index=False, date_format='%Y-%m-%d %H:%M:%S')
        print(f"\nSuccessfully saved data to {OUTPUT_CSV_PATH}")
    except Exception as e:
        print(f"Error saving data to CSV: {e}", file=sys.stderr)

    print("\nAnalysis complete.")

if __name__ == "__main__":
    main()

コードハイライト

いくつか特徴的な部分を見てみましょう。

データ読み込みと前処理の強化 (read_fit_file)

def read_fit_file(filepath):
    # ... (fitdecodeでの読み込み) ...
    df = pd.DataFrame(datas)

    # データ型の変換と検証
    if 'timestamp' in df.columns:
        df['timestamp'] = pd.to_datetime(df['timestamp'])
    else:
        print("Error: 'timestamp' column is missing.", file=sys.stderr)
        sys.exit(1)

    # 他の必須カラムの存在チェック
    missing_cols = [col for col in REQUIRED_COLUMNS if col not in df.columns]
    if missing_cols:
        print(f"Error: Missing required columns: {', '.join(missing_cols)}", file=sys.stderr)
        sys.exit(1)

    # 数値型に変換(エラー時はNaNにする)
    for col in ['distance', 'altitude', 'speed', 'grade']:
        df[col] = pd.to_numeric(df[col], errors='coerce')

    # NaNを含む行があれば削除または警告
    initial_rows = len(df)
    df.dropna(subset=REQUIRED_COLUMNS, inplace=True)
    if len(df) < initial_rows:
        print(f"Warning: Dropped {initial_rows - len(df)} rows containing NaN values in required columns.", file=sys.stderr)

    if df.empty:
        print("Error: DataFrame is empty after cleaning.", file=sys.stderr)
        sys.exit(1)

    # 速度をkm/hに変換
    df['speed_kph'] = df['speed'] * MPS_TO_KPH

    # 距離の差分(各レコード間の移動距離)を計算
    df['distance_delta'] = df['distance'].diff().fillna(0)
    df.loc[df['distance_delta'] < 0, 'distance_delta'] = 0 # 稀な負の差分を補正

    return df

エラーチェック、型変換、NaN処理、そしてdistance_deltaの計算を追加しています。

距離ベースの2軸プロット (plot_dual_axis_distance)

def plot_dual_axis_distance(df, y1_col, y2_col, y1_label, y2_label, title, filename):
    fig, ax1 = plt.subplots(figsize=(12, 6), constrained_layout=True)
    x_km = df['distance'] / 1000 # 距離をkmに変換

    # 1つ目のY軸 (左)
    color1 = 'C0'
    ax1.set_xlabel('Distance / km')
    ax1.set_ylabel(y1_label, color=color1)
    ax1.plot(x_km, df[y1_col], color=color1, label=y1_label.split('/')[0])
    ax1.tick_params(axis='y', labelcolor=color1)
    ax1.grid(True)

    # 2つ目のY軸 (右)
    ax2 = ax1.twinx() # ★X軸を共有する第二のY軸を作成
    color2 = 'C1'
    ax2.set_ylabel(y2_label, color=color2)
    ax2.plot(x_km, df[y2_col], color=color2, label=y2_label.split('/')[0])
    ax2.tick_params(axis='y', labelcolor=color2)

    fig.suptitle(title)
    # 凡例を結合して表示
    lines1, labels1 = ax1.get_legend_handles_labels()
    lines2, labels2 = ax2.get_legend_handles_labels()
    ax1.legend(lines1 + lines2, labels1 + labels2, loc='best')

    plt.savefig(f"{filename}.png")
    print(f"Saved plot: {filename}.png")
    plt.close(fig)

ax1.twinx() を使って、同じX軸(距離)を共有する2つのY軸を持つグラフを作成しています。

使い方

ライブラリのインストール:
pip install fitdecode pandas numpy matplotlib seaborn

スクリプトの保存: 上記のPythonコードを fit_analyzer_v2.py のような名前で保存します。

FITファイルの準備: 分析したい .fit ファイルを用意し、スクリプト内の FIT_FILE_PATH = '##########.fit' の部分を実際のファイルパスに書き換えます。
実行: ターミナルでスクリプトを実行します。

python fit_analyzer_v2.py

結果の確認: スクリプトと同じディレクトリに fit_analysis_plots というフォルダが作成され、その中に各種グラフのPNGファイルが保存されます。また、output_activity_data.csv というCSVファイルも生成されます。コンソールには、読み込み状況、保存されたプロットのファイル名、計算された勾配統計が表示されます。

まとめ

今回の改良により、FITファイルの分析スクリプトは、単なるデータ抽出・可視化ツールから、分析を行えるツールへと進化しました。生成されるグラフやCSVデータを活用すれば、特定のコースの攻略法を考えたり、トレーニングの成果を評価したりといった応用も可能かもしれません。
よろしければ、ご自身のサイクリングデータを分析してみてください!コードをさらにカスタマイズして、パワーメーターのデータ分析を追加したり、複数のアクティビティを比較する機能を追加したりするのも面白いかもしれませんね。
Happy Analyzing!

Discussion