🚶

Pythonで人流トラジェクトリーデータを縦持ちに変換し、移動距離を計算する方法

に公開

はじめに

今回は「人流トラジェクトリーデータ」を取り扱い、会場内の人の動きを可視化する方法をご紹介します。
人流データをどうやって見せるか考えた結果、
会場のヒートマップを作って、その上に人の動きの線を重ねてみることにしました。

生データはトラジェクトリーデータ(時間軸に沿った「横持ち」形式)だったため、
まずは扱いやすい「縦持ち」形式に変換する処理を行いました。

人流データ(サンプル用に作成したもの)

今回使うデータのサンプルは以下のような構造です。

カラム名 説明
検知数 0.1sec間にセンサーが検知した人数
time_step 時間を表すタイムスタンプ
id センサーが認識した1人ひとりに付けられたユニークID
x 座標のX値
y 座標のY値

環境

python3.x

フォルダ構成

├── 1_flow/
│   └── swap_rows_cols.py       # 横持ちデータを縦持ちデータに変換するスクリプト
├── 2_data/
│   └── 人流データ.csv           # 解析に使う横持ちの人流データ(CSVなど)
├── 3_output/
│   ├── データ縦持ち整形.csv      # 横持ちデータを縦持ちに変換した結果のデータ
│   └── データ縦持ち整形_距離追加.csv  # 縦持ちデータに距離情報を追加した結果のデータ

コード解説

swap_rows_cols.py

import pandas as pd
import numpy as np
import os

#=============================================
# ファイル・フォルダ情報の設定
#=============================================
INPUT_FILENAME = "人流データ.csv"
INPUT_FOLDER = "2_data"
OUTPUT_FILENAME = "データ縦持ち整形.csv"
OUTPUT_WITH_DIST_FILENAME = "データ縦持ち整形_距離追加.csv"
OUTPUT_FOLDER = "3_output"

#=============================================
# パスの取得
#=============================================
# カレントパス
current_dir = os.getcwd()
# パレントパス
parent_dir = os.path.dirname(current_dir)
# 入力データのCSVファイルのフルパス
input_path = os.path.join(parent_dir, INPUT_FOLDER, INPUT_FILENAME)

# 出力フォルダのパス
output_fol_path = os.path.join(parent_dir, OUTPUT_FOLDER)
os.makedirs(output_fol_path, exist_ok=True)

# 加工済みデータの出力ファイルパス
output_path = os.path.join(parent_dir, OUTPUT_FOLDER, OUTPUT_FILENAME)
# 加工済みデータに距離情報を追加したファイルの出力パス
output_WITH_path = os.path.join(parent_dir, OUTPUT_FOLDER, OUTPUT_WITH_DIST_FILENAME)

パスの組み立て

os.getcwd() で現在の作業ディレクトリを取得し、親ディレクトリを取得しています。
os.path.join()を使うことで、Windowsの \、Mac/Linuxの / といったパス区切り文字の違いを気にせず、安全にパスを組み立てることができます。


#=============================================
#  CSV読み込み
#=============================================
df = pd.read_csv(input_path, encoding="utf-8-sig")

#=============================================
# 横持を縦持ちへ
#=============================================
rows = []

for _, row in df.iterrows():
    detect_num = int(row["検知数"])
    time_step = row["time_step"]
    # 検知数分だけ展開
    for i in range(detect_num):
        # 1つ目の検知データは id, x, y
        # 2つ目以降は id_1, x_1, y_1 のように番号付きの列名
        id_col = "id" if i == 0 else f"id_{i}"
        x_col  = "x"  if i == 0 else f"x_{i}"
        y_col  = "y"  if i == 0 else f"y_{i}"
        rows.append([
            detect_num,
            time_step,
            row[id_col],
            row[x_col],
            row[y_col]
        ])

# 抽出したデータをDataFrame化
long_df = pd.DataFrame(rows, columns=["検知数", "time_step", "id", "x", "y"])
# CSVファイルとして保存
long_df.to_csv(output_path, index=False, encoding="utf-8-sig")

縦持ち形式への変換についての解説

  • df.iterrows() で元のDataFrameを1行ずつ処理。
  • 各行の「検知数」分だけループを回し、1人ずつの検知情報 (id, x, y)を抽出。
  • 2つ目以降の検知データは `"id_1", "x_1", "y_1" のように番号が付いた列名になっているため、ループ変数 i を使って列名を動的に指定しています。
  • 取り出したデータはリスト rows に追加していきます。
  • 最終的に rows を使って縦持ち(long形式)のDataFrameを作成し、CSVに保存しています。

#======================================================
# idのデータを取り出して、time_stepソート、移動距離追加
#======================================================  
df = long_df

rows = []
for _, r in df.iterrows():
    detect_num = int(r["検知数"])
    ts = r["time_step"]
    for i in range(detect_num):
        id_col = "id" if i == 0 else f"id_{i}"
        x_col  = "x"  if i == 0 else f"x_{i}"
        y_col  = "y"  if i == 0 else f"y_{i}"
        if id_col in df.columns and pd.notna(r.get(id_col, np.nan)):
            rows.append([
                r["検知数"],
                ts,
                r[id_col],
                r.get(x_col, np.nan),
                r.get(y_col, np.nan),
            ])

long_df = pd.DataFrame(rows, columns=["検知数", "time_step", "id", "x", "y"])

# 数値型に変換(エラーはNaNに置換)
long_df["time_step"] = pd.to_numeric(long_df["time_step"], errors="coerce")
long_df["id"] = pd.to_numeric(long_df["id"], errors="coerce")
long_df["x"] = pd.to_numeric(long_df["x"], errors="coerce")
long_df["y"] = pd.to_numeric(long_df["y"], errors="coerce")

# idごとにグループ化しtime_step順にソート後、距離計算
def calc_distance(g):
    g = g.sort_values("time_step")
    g["distance"] = np.sqrt((g["x"] - g["x"].shift())**2 + (g["y"] - g["y"].shift())**2)
    g["distance"] = g["distance"].fillna(0)
    return g

# 計算結果をCSV出力
long_df = long_df.groupby("id").apply(calc_distance).reset_index(drop=True)

# 出力フォルダをエクスプローラーで開く(Windows環境)
long_df.to_csv(output_WITH_path, index=False, encoding="utf-8-sig")

os.startfile(os.path.realpath(output_fol_path))

移動距離の計算

-id ごとにグループ化しtime_step 順にソート。

  • 各時点の座標 (x, y) と一つ前の座標との差を使ってユークリッド距離を計算し、distance列として追加。
  • 最初の座標には比較対象がないため距離は NaN となりますが、fillna(0) で0に置き換える。
  • 加工済みデータをCSVファイルに保存し、Windowsのエクスプローラーで出力フォルダを開く。

まとめ

今回はデータの縦持ち変換と移動距離の計算までを解説しました。
次はいよいよ、ヒートマップを作成し、その上に動線を重ねて可視化していきます。
https://zenn.dev/swatchp/articles/ba0bcf2ecd0ab7

参考リンク・素材について

GitHubリポジトリ

本記事で紹介したコードやサンプルデータはこちらのリポジトリで公開しています。
 https://github.com/iwakazusuwa/human-flow-preprocessing/tree/main

Discussion