🛣️

自動運転AIチャレンジ 自作AIモデルで車両を制御する(データ収集編)

2024/01/09に公開

背景

自動運転AIチャレンジ2023(シミュレーション)に参加しており、自作AIの構築とAutoware環境への組み込みにチャレンジしたいと考えております。予測した経路に対して車両を制御するAIを目指していきます。今回の記事ではAIの訓練に必要なデータを収集するノードの作成方法について記載します。

環境

以下のaichallenge2023-racingのリポジトリを使用しております。

https://github.com/AutomotiveAIChallenge/aichallenge2023-racing

方針

本大会のベースのフレームワークの一つとなる Autoware microを使用し、走行中に発信されたトピックを収集していきます。
※ Autoware microの詳細についてはこちらをご参照ください

image.png

経路情報と制御コマンドのトピック

今回は走行中に生成された経路情報(Trajectory)から車両の制御コマンド(AckermannControlCommand)を推定するモデルを作ることが目標なため、走行中に発信された以下のトピックを保存します。

経路情報

トピック名 送信元ノード
"/planning/scenario_planning/trajectory" path_to_trajectory autoware_auto_planning_msgs.msg.Trajectory

制御コマンド

トピック名 送信元ノード
"/control/command/control_cmd" simple_pure_pursuit autoware_auto_control_msgs.msg.AckermannControlCommand

Trajectory や AckermannControlCommand などのAutowareで定義されている変数の型はこちらに記載されているようです。

Trajectory型

Trajectory型は以下のようになり、TrajectoryPoint といTrajectoryPoint 型の配列の変数を持っています。

Trajectory 定義
struct Trajectory {
    std_msgs::msg::Header header;
    sequence<autoware_auto_planning_msgs::msg::TrajectoryPoint, 10000> points;
    };

次に、TrajectoryPoint型は以下のように定義されています。

TrajectoryPoint 定義
struct TrajectoryPoint {
      builtin_interfaces::msg::Duration time_from_start;

      geometry_msgs::msg::Pose pose;

      @default (value=0.0)
      float longitudinal_velocity_mps;

      @default (value=0.0)
      float lateral_velocity_mps;

      @default (value=0.0)
      float acceleration_mps2;

      @default (value=0.0)
      float heading_rate_rps;

      @default (value=0.0)
      float front_wheel_angle_rad;

      @default (value=0.0)
      float rear_wheel_angle_rad;
    };

Pose型の変数 pose と float型の変数 longitudinal_velocity_mps, acceleration_mps2 を持っています。つまり、Trajectory 型のデータでは通過したい座標(point)とその座標での目標速度、アクセル(longitudinal_velocity_mps, acceleration_mps2)を経路点分保持していることになります。
※ 詳細は割愛しますが、Pose型では車両位置のxyz座標の値と車両姿勢のクォータニオンの値が格納されています。

AckermannControlCommand型

AckermannControlCommand型は以下のように定義されています。変数 longitudinal に進行方向への制御値が格納されており、変数 lateral に横方向への制御値が格納されています。

AckermannControlCommand 定義
struct AckermannControlCommand {
	builtin_interfaces::msg::Time stamp;
	autoware_auto_control_msgs::msg::AckermannLateralCommand lateral;
	autoware_auto_control_msgs::msg::LongitudinalCommand longitudinal;
};

LongitudinalCommand型は以下のようになります。

LongitudinalCommand 定義
    struct LongitudinalCommand {
      builtin_interfaces::msg::Time stamp;

      @verbatim (language="comment", text=
        " Desired platform speed in meters per second."
        " A positive value indicates movement in the positive X direction of the vehicle "
        " while a negative value indicates movement in the negative X direction of the vehicle.")
      @default (value=0.0)
      float speed;

      @verbatim (language="comment", text=
        " Desired platform acceleration in meters per second squared."
        " A positive value indicates acceleration while a negative value indicates deceleration.")
      @default (value=0.0)
      float acceleration;

      @verbatim (language="comment", text=
        " Desired platform jerk in meters per second cubed")
      @default (value=0.0)
      float jerk;
    };

また、AckermannLateralCommand型は以下のようになります。

AckermannLateralCommand 定義
struct AckermannLateralCommand {
	builtin_interfaces::msg::Time stamp;

	@verbatim (language="comment", text=
	" Desired angle of the steering tire in radians left (positive)"
	" or right (negative) of center (0.0)")
	@default (value=0.0)
	float steering_tire_angle;

	@verbatim (language="comment", text=
	" Desired rate of change of the steering tire angle in radians per second")
	@default (value=0.0)
	float steering_tire_rotation_rate;
};

LongitudinalCommand型の変数 speedに走行速度が格納されており、AckermannLateralCommand型の変数 steering_tire_angle にステアリングの角度が格納されています。

その他のトピック

上記で示したトピック以外に、今後のAIモデルの拡張も考え、以下のトピックも収集するようにします。

コース情報(経路情報)

トピック名 送信元ノード
"/planning/scenario_planning/lane_driving/behavior_planning/path_with_lane_id" behavior_path_planner autoware_auto_planning_msgs.msg.PathWithLaneId

自己位置情報(オドメトリ情報)

トピック名 送信元ノード
"/awsim/ground_truth/localization/kinematic_state" Odometry Sensor(AWSIM) nav_msgs.msg.Odometry

検知物体情報

トピック名 送信元ノード
"/awsim/ground_truth/perception/object_recognition/detection/objects" Object Detection Sensor(AWSIM) autoware_auto_perception_msgs.msg.DetectedObjects

補足ですが、"/awsim/ground_truth/localization/kinematic_state" と "/awsim/ground_truth/perception/object_recognition/detection/objects"はシミュレータ(AWSIM)から直接的に自己位置情報と検知物体情報を受け取るため、それぞれの真値になります。

実装

今回は収集したデータをnumpy の .npy ファイルとして保存していきます。実装したスクリプトを以下より説明していきます。

ROSノードの作成

まず、ROSノードのクラスを宣言します。クラス内に今回収集したい経路情報と制御コマンド情報を受信するsubscriberを宣言します。

class DriveDataCollector(Node):
    def __init__(self):
        super().__init__('drive_data_collector')
        # Trajectory のsubscriber
        self.create_subscription(Trajectory, 
                                 "/planning/scenario_planning/trajectory", 
                                 self.onTrajectory, 
                                 1)
        # 制御コマンド のsubscriber
        self.create_subscription(AckermannControlCommand, 
                                 "/control/command/control_cmd", 
                                 self.onControl, 
                                 1)

次に以下の行でデータを保存するフォルダを作成しています。

        # 保存するディレクトリを作成
        dt_now = datetime.datetime.now()
        self.save_dir = "collected_data/" +  dt_now.strftime('%Y-%m-%d-%H-%M-%S')

        os.makedirs(self.save_dir, exist_ok=True)
        os.makedirs(self.save_dir + "/path/", exist_ok=True)
        os.makedirs(self.save_dir + "/trajectory/", exist_ok=True)
        os.makedirs(self.save_dir + "/control/", exist_ok=True)
        os.makedirs(self.save_dir + "/pose/", exist_ok=True)
        os.makedirs(self.save_dir + "/objects/", exist_ok=True)

保存を実行する関数を宣言します。今回はタイマーコールバック関数で周期的に保存を実行することとしました。同じようなデータが大量に保存されても嬉しくないので、保存する周期を0.25sとします。

	timer_period = 0.25
	self.timer = self.create_timer(timer_period, self.timer_callback)

その他のトピックについても同様にコールバック関数を宣言します。

        # PathWithLaneID のsubscriber
        self.create_subscription(PathWithLaneId, 
                                 "/planning/scenario_planning/lane_driving/behavior_planning/path_with_lane_id", 
                                 self.onPathWithLaneId, 
                                 1)
				 
        # 自己位置、実速度 のsubscriber
        self.create_subscription(Odometry, 
                                 "/awsim/ground_truth/localization/kinematic_state", 
                                 self.onOdometry, 
                                 1)
	# 検知物体のsubscriber
        self.create_subscription(DetectedObjects,
                                 "/awsim/ground_truth/perception/object_recognition/detection/objects",
                                 self.onDetectObjects,
                                 1)

データ保持用のコールバック関数の作成

次にコールバック関数を作成します。データを保存するタイマーコールバック関数以外は基本的に最新のデータに更新するのみを行います。オドメトリ情報からは実速度、自己位置情報、姿勢情報のみを抽出します。

    def onTrajectory(self, msg):
        self.trajectory = msg

    def onControl(self, msg):
        self.control = msg
    def onOdometry(self, msg):
        self.twist_linear = msg.twist.twist.linear
        self.position = msg.pose.pose.position
        self.orientation = msg.pose.pose.orientation
    def onPathWithLaneId(self, msg):
        self.path = msg

検知物体情報は物体のクラスラベル、存在確率、位置座標、姿勢、移動速度を辞書型の変数に格納します。

    def onDetectObjects(self, msg):
        
        for obj in msg.objects:
            data = {}
            data["label"] =  obj.classification[0].label
            data["probability"] = obj.classification[0].probability
            data["existance_probability"] = obj.existence_probability
            data["position"] = np.array([obj.kinematics.pose_with_covariance.pose.position.x,
                            obj.kinematics.pose_with_covariance.pose.position.y,
                            obj.kinematics.pose_with_covariance.pose.position.z])
            data["orientation"] = np.array([obj.kinematics.pose_with_covariance.pose.orientation.x,
                            obj.kinematics.pose_with_covariance.pose.orientation.y,
                            obj.kinematics.pose_with_covariance.pose.orientation.z,
                            obj.kinematics.pose_with_covariance.pose.orientation.w])
            data["twist"] = np.array([obj.kinematics.twist_with_covariance.twist.linear.x])
            self.objects.append(data)

データ保存用のコールバック関数の作成

次にデータを保存するコールバック関数を作成します。それぞれのデータをnumpyの型に変換してから保存していきます。まずはnumpyのarray型へ変換する関数を準備します。

    def _PointList2Numpy(self, points):
        k = []
        for i in range(len(points)):
            k.append([points[i].x, points[i].y])
        return np.array(k)
    def _Trajectory2Numpy(self, trajectory):
        points_vel_list = []
        points_pose_list = []
        for p in trajectory.points:
            points_vel_list.append(p.longitudinal_velocity_mps)
            points_pose_list.append([p.pose.position.x, p.pose.position.y])
        return np.array(points_pose_list), np.array(points_vel_list)

上記の _PointList2Numpy ではPoint型の配列を受け取り、格要素からxy座標の値を抽出して,
Nx2のnumpy.array型として返します。同様に、_Trajectory2Numpy はTrajectory型のデータを受け取り、TrajectoryPoints からxy座標の値と目標速度の値を抜き出して、numpy.array型のデータを2つ返します。
最後に保存する部分を記載していきます。

if (self.path is not None) and \
            (self.control is not None) and \
            (self.trajectory is not None) and \
            (self.position is not None) and \
            (self.orientation is not None):

            # 生成された経路(Trajectory)をnumpyy形式に変更
            trj_points, trj_vels = self._Trajectory2Numpy(self.trajectory)

            # 制御コマンドをnumpyy形式に変更
            speed = math.ceil(self.control.longitudinal.speed * 3.6)
            accel = self.control.longitudinal.acceleration
            steering_angle = self.control.lateral.steering_tire_angle
            control_cmd = np.array([speed, accel, steering_angle])
            ids_str = '{0:07}'.format(self.ids)
               
            # ファイル名を設定
            control_cmd_file = "{}/{}.npy".format(self.save_dir + "/control/", ids_str)
            trj_file = "{}/{}.npy".format(self.save_dir + "/trajectory/", ids_str)
            # npy 形式で保存
            np.save(control_cmd_file, control_cmd)
            np.save(trj_file, trj_points)

上記では制御コマンド、経路情報を保存しています。制御コマンドは、[speed, accel, steering_angle] の3次元のベクトルとして保存しています

スクリプト全体

開発したスクリプトの全体は以下になります。コースの境界線情報、自己位置情報、検知物体情報を保存する行も追加しています。

drivedata_collector_node.py
#! /usr/bin/env python
# -*- coding: utf-8 -*-

import time
import rclpy
import numpy as np
from rclpy.node import Node
import math
import datetime
import os
from autoware_auto_planning_msgs.msg import Trajectory, PathWithLaneId
from autoware_auto_control_msgs.msg import AckermannControlCommand
from nav_msgs.msg import Odometry
from autoware_auto_perception_msgs.msg import DetectedObjects

class DriveDataCollector(Node):
    def __init__(self):
        super().__init__('drive_data_collector')
        timer_period = 0.25
        # PathWithLaneID のsubscriber
        self.create_subscription(PathWithLaneId, 
                                 "/planning/scenario_planning/lane_driving/behavior_planning/path_with_lane_id", 
                                 self.onPathWithLaneId, 
                                 1)
        # Trajectory のsubscriber
        self.create_subscription(Trajectory, 
                                 "/planning/scenario_planning/trajectory", 
                                 self.onTrajectory, 
                                 1)
        # 制御コマンド のsubscriber
        self.create_subscription(AckermannControlCommand, 
                                 "/control/command/control_cmd", 
                                 self.onControl, 
                                 1)
        # 自己位置、実速度 のsubscriber
        self.create_subscription(Odometry, 
                                 "/awsim/ground_truth/localization/kinematic_state", 
                                 self.onOdometry, 
                                 1)
        self.create_subscription(DetectedObjects,
                                 "/awsim/ground_truth/perception/object_recognition/detection/objects",
                                 self.onDetectObjects,
                                 1)

    
        self.timer = self.create_timer(timer_period, self.timer_callback)

        self.path = None
        self.trajectory = None
        self.control = None
        self.position = None
        self.orientation = None
        self.objects = []

        # 保存するディレクトリを作成
        dt_now = datetime.datetime.now()
        self.save_dir = "collected_data/" +  dt_now.strftime('%Y-%m-%d-%H-%M-%S')

        os.makedirs(self.save_dir, exist_ok=True)
        os.makedirs(self.save_dir + "/path/", exist_ok=True)
        os.makedirs(self.save_dir + "/trajectory/", exist_ok=True)
        os.makedirs(self.save_dir + "/control/", exist_ok=True)
        os.makedirs(self.save_dir + "/pose/", exist_ok=True)
        os.makedirs(self.save_dir + "/objects/", exist_ok=True)

        self.ids = 0
    
    def onPathWithLaneId(self, msg):
        self.path = msg
        #self.get_logger().info("{}".format(msg))

    def onTrajectory(self, msg):
        self.trajectory = msg
        #self.get_logger().info("{}".format(msg))

    def onControl(self, msg):
        self.control = msg
        #self.get_logger().info("{}".format(msg))
    
    def onOdometry(self, msg):
        self.twist_linear = msg.twist.twist.linear
        self.position = msg.pose.pose.position
        self.orientation = msg.pose.pose.orientation
        #self.get_logger().info("{}".format(msg))

    def onDetectObjects(self, msg):
        
        for obj in msg.objects:
            data = {}
            data["label"] =  obj.classification[0].label
            data["probability"] = obj.classification[0].probability
            data["existance_probability"] = obj.existence_probability
            data["position"] = np.array([obj.kinematics.pose_with_covariance.pose.position.x,
                            obj.kinematics.pose_with_covariance.pose.position.y,
                            obj.kinematics.pose_with_covariance.pose.position.z])
            data["orientation"] = np.array([obj.kinematics.pose_with_covariance.pose.orientation.x,
                            obj.kinematics.pose_with_covariance.pose.orientation.y,
                            obj.kinematics.pose_with_covariance.pose.orientation.z,
                            obj.kinematics.pose_with_covariance.pose.orientation.w])
            data["twist"] = np.array([obj.kinematics.twist_with_covariance.twist.linear.x])
            self.objects.append(data)
    
            #self.get_logger().info("{}".format(data))

    def timer_callback(self):
        if (self.path is not None) and \
            (self.control is not None) and \
            (self.trajectory is not None) and \
            (self.position is not None) and \
            (self.orientation is not None):
            # コースの境界線をnumpy形式に変更
            left_bound = self._PointList2Numpy(self.path.left_bound)
            right_bound = self._PointList2Numpy(self.path.right_bound)

            # 生成された経路(Trajectory)をnumpyy形式に変更
            trj_points, trj_vels = self._Trajectory2Numpy(self.trajectory)

            # 制御コマンドをnumpyy形式に変更
            speed = math.ceil(self.control.longitudinal.speed * 3.6)
            accel = self.control.longitudinal.acceleration
            steering_angle = self.control.lateral.steering_tire_angle
            control_cmd = np.array([speed, accel, steering_angle])
            ids_str = '{0:07}'.format(self.ids)

            # Odometory の値をnumpy 形式に変更
            odometry_speed = float(self.twist_linear.x * 3.6)
            position_and_orientation = np.array([self.position.x, 
                                                self.position.y, 
                                                self.position.z,
                                                self.orientation.x,
                                                self.orientation.y,
                                                self.orientation.z,
                                                self.orientation.w])                
            # ファイル名を設定
            control_cmd_file = "{}/{}.npy".format(self.save_dir + "/control/", ids_str)
            trj_file = "{}/{}.npy".format(self.save_dir + "/trajectory/", ids_str)
            path_left_file = "{}/{}_left.npy".format(self.save_dir + "/path/", ids_str)
            path_right_file = "{}/{}_right.npy".format(self.save_dir + "/path/", ids_str)
            pose_file = "{}/{}.npy".format(self.save_dir + "/pose/", ids_str)
            
            # npy 形式で保存
            np.save(control_cmd_file, control_cmd)
            np.save(trj_file, trj_points)
            np.save(path_left_file, left_bound)
            np.save(path_right_file, right_bound)
            np.save(pose_file, position_and_orientation)


            for obj_id, obj in enumerate(self.objects):
                fname = "{}/objects/{}_{}_classify.npy".format(self.save_dir, ids_str, obj_id)
                np.save(fname, np.array([obj["label"],
                                         obj["probability"],
                                         obj["existance_probability"]]))
                
                fname = "{}/objects/{}_{}_pose.npy".format(self.save_dir, ids_str, obj_id)
                np.save(fname, obj["position"])

                fname = "{}/objects/{}_{}_orientation.npy".format(self.save_dir, ids_str, obj_id)
                np.save(fname, obj["orientation"])
                
                fname = "{}/objects/{}_{}_twist.npy".format(self.save_dir, ids_str, obj_id)
                np.save(fname, obj["twist"])

            self.ids += 1
            self.get_logger().info("save {}".format(ids_str))

            # 同じ値を保存しないように初期化
            self.trajectory = None
            self.control = None
            self.trajectory = None
            self.position = None
            self.orientation = None
            self.objects = []

    def _PointList2Numpy(self, points):
        k = []
        for i in range(len(points)):
            k.append([points[i].x, points[i].y])
        return np.array(k)
    
    def _Trajectory2Numpy(self, trajectory):
        points_vel_list = []
        points_pose_list = []
        for p in trajectory.points:
            points_vel_list.append(p.longitudinal_velocity_mps)
            points_pose_list.append([p.pose.position.x, p.pose.position.y])
        return np.array(points_pose_list), np.array(points_vel_list)

def main(args=None):
    print('Hi from drive_data_collector')

    rclpy.init(args=args)
    
    node = DriveDataCollector()
    rclpy.spin(node)

    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

まとめ

今回はAIチャレンジ2023(シミュレーション)のAI構築のために訓練データを収集する方法を記載しました。今後は収集したデータを用いてAIのモデルを訓練させていきます。

Discussion