Open2

ros2 bag の mcap 形式のファイルを読み取って各メッセージのフレームレートを算出する

PINTOPINTO
calc_frame_ratio.py
"""
pip install mcap==1.1.1
"""

import numpy as np
from argparse import ArgumentParser
from mcap_ros2.decoder import DecoderFactory
from mcap.reader import make_reader
from collections import defaultdict
from tqdm import tqdm

def calc_topic_frame_ratio(mcap_file_path: str):
    # トピックごとのタイムスタンプを格納する辞書
    timestamps = defaultdict(list)

    # Mcap ファイルを開く
    print("")
    print(f"#### During data decoding...")
    with open(mcap_file_path, "rb") as f:
        reader = make_reader(f, decoder_factories=[DecoderFactory()])
        for schema, channel, message, ros_msg in tqdm(reader.iter_decoded_messages(), dynamic_ncols=True):
            timestamps[channel.topic].append(message.log_time)

    # トピックごとのフレームレートを計算
    frame_rates = {}
    print("")
    print(f"#### Calculating frame rate...")
    for topic, times in tqdm(timestamps.items(), dynamic_ncols=True):
        if len(times) > 1:
            # メッセージ間の平均時間を計算してフレームレートを求める
            intervals = np.diff(times)
            mean_interval = np.mean(intervals)
            frame_rate = 1e9 / mean_interval if mean_interval > 0 else 0
            frame_rates[topic] = frame_rate

    # 結果を表示
    for topic, rate in frame_rates.items():
        print(f"Topic: {topic}, Frame rate: {rate:.2f} Hz")


if __name__ == '__main__':
    parser = ArgumentParser()
    parser.add_argument(
        '-m',
        '--mcap_file',
        type=str,
        required=True,
    )
    args = parser.parse_args()
    mcap_file: str = args.mcap_file
    calc_topic_frame_ratio(mcap_file)