🌟

ROS2でPythonノードを作る【ROS2】

2023/03/02に公開

はじめに

こんにちは。初めて記事を書きます。

ここでは、ロボットの開発プラットフォームであるRobot Operating System2(ROS2)でPythonノードを作る方法について書いていきます。
よくある公式チュートリアルではなく、実際に私の研究や開発で学んだことなどを備忘録的な感じで描いていきます。

C++ではノードを作ったことがないので、C++については他の方のサイトを参考にしてください。

あと、githubなどでサンプルコードを公開したかったのですが、開発環境を用意できなかったので、ハンズオンだと思って手を動かしながら読んでいただけると幸いです。

ROS/ROS2とは

ROSはWillowGarage社によって開発・管理されているオープンなロボット開発プラットフォームです。
ロボットを開発・制御・操作するためのサンプルコードやパッケージが数多く公開されているので、必要な機能が検索するだけで見つかったりします。
(バージョン違いで動かなかったりするので,そのまま使えるとは限りませんが)
ROS以外にもロボットを制御する手段はありますが、開発例が多いことから参考になる情報が多いプラットフォームであるといえます。

ROS2はROSの次世代バージョンです。
対応OSの拡張やROSの課題点を解決したバージョンとなっていますが、既存のROSユーザーに仕様変更を強いるのを避けるためにROSとは切り離されて開発されました。
そのため、ROSのライブラリやコード構成をそのままROS2で使うことはできません。
本記事ではROS2でのノードの書き方を説明します。

Publish/Subscribe型通信

コードの書き方を説明する前にROS/ROS2の基本となるPublish/Subscribe型通信についても軽く触れておきます。
知ってる方は読み飛ばして構いません。
ROS/ROS2ではPublish/Subscribe型通信にてプログラム間のデータ通信を行なっています。
Publish/Subscribe型通信は https://itpfdoc.hitachi.co.jp/manuals/link/cosmi_v0970/03Y0760D/EY070390.HTM など、他のサイトで詳しく説明されているので詳しく説明しませんが、大まかにいうと特定の形式のデータを発信する「Publisher」と、特定の形式のデータを受信する「Subscriber」がデータをやり取りする形式です。
自分はマンションの集合ポストをイメージしています。

  • Publisher(配達員)は特定の人の名前の書かれた手紙をその人のポストに入れる
  • Subscriber(住人)は特定のポストに入っている手紙だけを読める

あまり共感されないイメージかもしれないので無視して構いません。
とりあえず、

  • Publisher(データを発信するやつ)
  • Subscriber(データを受信するやつ)

があることを覚えておけば良いです。

メッセージ

メッセージはPublisherが発信するデータ、そしてSubscribeが受信するデータのことです。
メッセージには型があり、ROSではINT型やSTRING型などシンプルなものから、Pose型など構造体みたいな型も存在します。
また、カスタムメッセージという自分で定義した型のメッセージも通信可能です(この記事では紹介しません)。
とりあえず

  • メッセージには型がある

ことを覚えておけば良いです。

トピック

トピックはPublisherがメッセージを送信する場所、そしてSubscriberがメッセージを受信する場所です。
Publisherは指定したトピックにメッセージを送信し、Subscriberは指定したトピックからメッセージを受信します。

説明が難しいのですが、ここで先ほどの集合ポストの例が本領を発揮します。
トピックはポストみたいなものです。
トピックには名前がつけられており、Publisher・Subscriberはそれぞれトピックの名前を指定してメッセージの通信を行います。
部屋番号の書かれたポストに手紙を入れる・取り出すのと同じイメージです。
1つのトピックには1つの形式のメッセージしか送れないので、異なる形式のメッセージを送る場合には別のトピックにメッセージを送る必要があります。

  • トピックは「名前」で区別される
  • 1つのトピックには1つの型のメッセージしか通信できない

を覚えておけば大丈夫です。

ノード

ROSのノードはメッセージをPublishするプログラム、メッセージをSubscribeするプログラム、その両方を行うプログラムなど、Publish/Subscribe機能を持ったプログラムのことです。
本記事ではこれを作ります。

Pub/Subまとめ

以上がROSのPub/Subの簡単な説明です。
詳細な説明については他のサイトで紹介されていますが、上述した内容を把握していればノードは書けると思います。

補足

ROSにはサービスという通信形式もありますが、自分は使ったことがないのでここでは紹介しません。
気になる方は「ROS サービス」などで調べてみてください。

ROS2ノードの作成

この記事では、String型のメッセージをSubscribeし、受け取ったメッセージに「!」マークを加えたString型のメッセージをPublishする"responder"ノードを作ります。

ROS2のノードを実行するには、以下のような流れで作業していく必要があります。

  1. ワークスペースの作成
  2. パッケージの作成
  3. ノードの作成
  4. ビルド
  5. 実行

これらについて順を追って説明していきます。

1. ワークスペースの作成

ワークスペースは、簡単に言ってしまうとディレクトリです。
直訳して作業場なので、ROSの開発作業をするディレクトリと捉えてもらって良いです。
ROS2のチュートリアルなどでは「colcon_ws」というワークスペースが作られていますが、プロジェクトなどに応じて名前を変えても大丈夫です。
私は単一の「colcon_ws」に全てのROSコードを突っ込んでいましたが、ソースの管理が面倒になるので、可能であればプロジェクトごとにワークスペースを作りましょう(例えばテスト用ノードなどはtest_wsなど)。
ワークスペースの中にはsrcディレクトリを作ります。

$ mkdir colcon_ws/src

ワークスペースはプロジェクトごとに作るものなので、既にワークスペースがあれば毎回作る必要はありません。

2. パッケージの作成

ROSではパッケージと呼ばれるコード群からノードを呼び出して実行します。
パッケージは基本的に「機能」ごとに作ります。
例えば、受け取ったメッセージに対して応答する「responderパッケージ」、高度なものだと移動ロボットの経路生成を行う「path_plannningパッケージ」など、機能ごとに命名してパッケージを作ります。
パッケージ名には空白を入れられないので、アンダースコア(_)などで区切ります。

ROS2Pythonチュートリアルでは「py_pubsub」というパッケージを作っていますが、今回は「responder」パッケージを作ります。
ROS2では以下のコマンドで作成可能です。

$ source /opt/ros/galactic/setup.bash
$ cd colcon_ws/src
$ ros2 pkg create responder --node-name responder --build-type ament_python

今回はROS2のバージョンがGalacticなので、/opt/ros/galactic/setup.bashをsourceしていますが、バージョンが異なる場合はそのバージョンに読み替えてください。

--node-nameや--build-typeの引数がなくてもパッケージは作れますが、ここで指定しておくと自動でパッケージを構成してくれるのでオススメです。(1つのパッケージ内に複数のノードを入れる場合やC++とpythonを混在させる場合は指定しなくても良い)

C++でノードを作る場合は--build-typeをament_cppに置き替えてください。

これらのコマンドを実行するとsrcディレクトリの下にresponderディレクトリが生成されます。
responderディレクトリの構成は以下の通りとなっています。

colcon_ws/src/responder/

  • responder(コードフォルダ)
    • __ init __.py(中身は空)
    • responder.py(メインのコード。この後記述していく)
  • resource
    • responder(謎ファイル)
  • test(用途不明)
    • test_copyright.py
    • test_flake8.py
    • test_pep257.py
  • package.xml(--node-name引数でパッケージ作成した場合はノータッチでOK)
  • setup.cfg(--node-name引数でパッケージ作成した場合はノータッチでOK)
  • setup.py(--node-name引数でパッケージ作成した場合はノータッチでOK)

以上でパッケージの作成は完了です。

ノードの作成

ここではトピック名「/base_msg」のメッセージをSubscribeし、トピック名「/ex_msg」のメッセージをPublishする"responder"ノードを作っていきます。
ここでは、パッケージで作成したcolcon_ws/src/responder/responder/responder.pyにコードを書いていきます。

初期状態では以下のようになっています。

responder.py
def main():
    print('Hi from responder.')

if __name__ == '__main__':
    main()

これを以下のように書き換える。

responder.py

import rclpy  # ROS2のPythonモジュール
from rclpy.node import Node
from std_msgs.msg import String # トピック通信に使うStringメッセージ型をインポート

class Responder(Node):
    def __init__(self):
        super().__init__('responder') # ROS1でいうrospy.init_node('node_name')
        self.sub = self.create_subscription(String, '/base_msg', self.msg_callback)# subscriberの宣言
        self.pub = self.create_publisher(String,'/ex_msg', 10)# publisherの宣言

    def msg_callback(self, msg):
        print(msg) # Subscribeしたメッセージをprint
	msg.data = msg.data + "!" # メッセージに「!」を付け加える処理
        self.pub.publish(msg) # 処理したメッセージをPublish

def main():
    rclpy.init()
    node = Responder()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

書き方としては、クラスを定義し、init関数内でSubscriberとPublisher、ノード名を宣言します。

ノード名の宣言は以下の通りです。

super().__init__('responder')

この' '内にノード名を書くことでROS上にノード名が登録されます。
PublisherとSubscriberは以下のように定義します。

self.sub = self.create_subscription(String, '/base_msg', self.msg_callback, 10)
self.pub = self.create_publisher(String,'/ex_msg', 10)

Subscriberはself.create_subscriptionメソッドで宣言でき、引数は「メッセージの型」・「トピック名」・「callback関数」「キューサイズ」となっています。
callback関数はメッセージをSubscribeするたびに実行される関数で、この中にはSubscribe
したメッセージを処理する内容を記述します。
Publisherはself.create_publisherメソッドで宣言でき、引数は「メッセージの型」・「トピック名」・「キューサイズ」となっています。

これらが宣言できたら、次はcallback関数の中身です。
callback関数ではSubscribeしたメッセージを使った処理を行います。
今回はString型ですが、String型の構成は公式ドキュメントのstd_msgs/Stringにある通り、下位の「data」を参照することで中身のstring型のデータに参照できます。
msg.data = msg.data + "!"
型によって構造が決まっているので、扱うデータに応じて必要な項目を参照して処理を行います。

メイン関数については、複数のクラスを作らない限りほぼ上に書いた通りなので、おまじないとして書きましょう。

ビルド

コードを書き終えたらビルドを行います。
cd colcon_wsなどでcolcon_wsに移動して

colcon build --symlink-install --packages-select responder

を実行します。

--packages-selectは特定のパッケージのみをビルドしたい時に行うコマンドです。
なくてもビルドできるが、その場合colcon_ws内のパッケージが全て再ビルドされるので、ワークスペースのパッケージ数が多いと時間がかかります(少なかったらpackages_selectなしでも良い)。

エラーがなく

Summary: 1 package finished

の文字が出たらビルド完了
この後に

$ source colcon_ws/install/setup.bash

を実行することで、ビルドしたノードの環境を反映させてビルド完了です。

実行

1つ目のターミナルで以下を実行します。

$ ros2 run responder responder

ROS2ではros2 run <パッケージ名> <ノード名>で実行するため、今回は上のようになります。
このコマンドで実行ができます。

実行後、responderノードはトピック"/base_msg"にメッセージが届くのを待機する状態になります。
このままではノードが動いているのかを確認できないので、pubコマンドで動作確認をします。
2つ目のターミナルで以下のコマンドを実行します。

ros2 topic pub /base_msg std_msgs/msg/String "{data: 'hello'}"

これで1秒間に1回/base_msgにメッセージがpublishされます。
これでresponderのcallback関数が呼び出されます。
そのため1つ目のターミナルでは"hello"がprintされます。

ここで、responderのpublishした内容を見るためにechoコマンドで動作確認をします。
3つ目のターミナルで以下のコマンドを実行します。

ros2 topic echo /ex_msg

これにより、3つ目のターミナルにてhello!と「!」が付け加えたデータがpublishされていることを確認できます。

以上がresponderノードの作り方でした。

応用1: 複数のトピックをSubscribeするノードの作り方

responderでは1つのメッセージを受け取り、そのメッセージを処理したデータをpublishしていました。
一方で、実際にノードを作ると2種類のメッセージを受け取り、そのメッセージから演算した結果をpublishする場合もあります。(例えば、ノード1から人の名前、ノード2から"hello"や"good bye"などの挨拶データがpublishされるなど)

その場合には以下のようにsubscriberとcallback関数を増やします。

responder2.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import String

class Responder2(Node):
    def __init__(self):
        super().__init__('responder2')
        self.sub1 = self.create_subscription(String, '/name_msg', self.name_callback
	self.sub2 = self.create_subscription(String, '/greet_msg', self.greet_callback
        self.pub = self.create_publisher(String,'/concat_msg', 10)
        self.name = str()
	self.greet = str()

    def name_callback(self, name):
        print(name)
	self.name = name.data
	
    def greet_callback(self, greet):
        print(greet)
	self.greet = greet.data
	msg = String()
	msg.data = self.name + ", " + self.greet
        self.pub.publish(msg)

def main():
    rclpy.init()
    node = Responder2()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

このようにsubscriberとcallback関数を増やすだけで実現できます。

一方で、この書き方では「並列処理」ができないという課題点があります。
そのため、いずれかのcallback関数の処理が重い時、publishする結果に誤りが出る可能性があります。

完全に並列で処理する方法を調べたところ、次のような方法がありました。(もっと調べれば他の解決方法もあるかもしれません)

応用2: 複数のクラスに分けたノードの実行

先ほどは1つのクラスに2つのsubscriberを宣言しましたが、場合によっては2つのクラスに分けたいときがあります。
その場合は以下のように書きます。

responder2.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
import threading

g_name = str()
g_greet = str()

class NameSub(Node):
    def __init__(self):
        super().__init__('name_sub')
        self.sub = self.create_subscription(String, '/name_msg', self.name_callback
	
    def name_callback(self, name):
        global g_name
        print(name)
	g_name = name.data	
	
class GreetSub(Node):
	def __init__(self):
	super().__init__(greet_sub)
	self.sub = self.create_subscription(String, '/greet_msg', self.greet_callback
        self.pub = self.create_publisher(String,'/concat_msg', 10)
	
    def greet_callback(self, greet):
        global g_name, g_greet
        print(greet)
	g_greet = greet.data
	msg = String()
	msg.data = g_name + ", " + g_greet
        self.pub.publish(msg)

def main():
    rclpy.init()
    node1 = NameSub()
    node2 = GreetSub()
    executor = rclpy.executors.MultiThreadedExecutor()
    executor.add_node(node1)
    executor.add_node(node2)
    executor_thread = threading.Thread(target=executor.spin, daemon=True)
    executor_thread.start()
    try:
        while rclpy.ok():
            time.sleep(2)#これなくても良いかも
    except KeyboardInterrupt:
        pass
    rclpy.shutdown()
    executor_thread.join()

if __name__ == '__main__':
    main()

このようにrclpyのMutiThreadExecutor()メソッドとthreadingを使うことで2つのクラスに分けた状態で起動できます。
また、この方法でノードを起動すると、Callback関数を並列に処理できます。
そのため、片方に時間のかかる処理を入れてももう片方のCallbackに遅延が出ません。

ただし、これを使ってノードを起動すると自分のPCではファンがフル回転していたので、内部的にはすごい負荷がかかっている可能性があります。
そのため、上述した「複数のトピックをSubscribeするノードの作り方」でノードを作ることをお勧めします。

応用3: Launchファイルの書き方

ROSではLaunchという指定したノードを一斉に起動するコマンドがあります。
launchファイルは一般的にパッケージの中にlaunchディレクトリを作り、その中に配置します。

そのため以下のように書いていきます。

共通操作

$ cd colcon_ws/src
$ ros2 pkg create responder_launch
$ cd respoder_launch
$ mkdir launch
$ cd launch

XML形式で書く場合

$ touch responder_launch.xml
responder_launch.xml
<launch>
  <node pkg="responder" exec="responder" name="responder"/>
  <node pkg="responder2" exec="responder2" name="responder2"/>
</launch>

Python形式で書く場合

$ touch responder_launch.py
responder_launch.py
from launch import LaunchDescription
from launch_ros.actions import Node

def generate_launch_description():
    return LaunchDescription([
        Node(
            package='responder',
            namespace='responder',
            executable='responder'
        ),
        Node(
            package='responder2',
            namespace='responder2',
            executable='responder2'
        )
    ])


これでresponderノードとresponder2ノードを一斉に起動するlaunchファイルが書けました。
最後に以下を実行してlaunchファイルを起動します。

$ ros2 launch responder_launch responder_launch.xml(or .py)


これで複数のノードを起動するlaunchファイルを書けました。

## ROS2 Dashingの場合
ROS2ではバージョンがDashing → Eloquent → Foxy → Galacticと更新されていきました。
それぞれUbuntuのバージョンごとに対応しており、DashingはUbuntu18.04に対応しています。

Ubuntu18.04で開発されている例は多いので、ROS2 Dashingを使う方もそれなりにいると思いますが、DashingはEloquent以降と異なる点が多いです。
そのうち、最も違うのがlaunchファイルの書き方です。

Dashingでは、なぜかXML形式のlaunchが起動できません。(詳しい人がやれば起動できるのかもしれないが)
そのためPythonファイルでlaunchする必要がありますが、Pythonでの書き方も少しEloquent以降と違っています。
```diff python: responder_launch.py
from launch import LaunchDescription
from launch_ros.actions import Node

def generate_launch_desctiption():
    return LaunchDescription([
        Node(
         package='responder',
         node_executable='responder',
         node_name='responder'
        ),
        Node(
         package='responder2',
         node_executable='responder2',
         node_name='responder2'
        ),
    )]

その後CMakeLists.txtの該当箇所を変更

CMakeLists.txt
fine_package(ament_cmake REQUIRED)
# uncomment the following section in order to fill in
# futher dependencies manually.
# find_package(<dependency> REQUIRED)

install(DIRECTORY
    launch
    DESTINATION share/${PROJECT_NAME}/
)

if(BUILD_TESTING)
    find_package(ament_lint_auto REQUIRED)

executableがnode_executableになっていたりCMakeLists.txtを書き換えたりと、色々と違うのでDashing環境で起動させる場合には気をつけてください。

おわりに

以上がROS2のノードの作成からLaunchまでの方法でした。
ほとんど自分が大学院で経験したことの備忘録になりますが、参考になっていれば幸いです。

それではまた。

補足:ROS1のノードの作り方

補足としてROS1のノードの作り方についても記しておきます。
大まかな流れはROS2と同じです。

ワークスペースの作成

$ mkdir catkin_ws/src

パッケージの作成

$ cd catkin_ws/src
$ source /opt/ros/melodic/setup.bash
$ catkin_create_pkg responder rospy std_msgs

ノードの作成

$ cd responder
$ mkdir scripts
$ cd scripts
$ touch responder.py
responder.py
import rospy
from std_msgs.msg import String

Class Responder(object):
    def __init__(self):
        self.sub = rospy.Subscriber('base_msg', String, self.msg_callback, queue_size=10)
	self.pub = rospy.Publisher('ex_msg', String, queue_size=10)
    
    def callback(self, msg):
        print(msg)
	msg.data = msg.data + "!"
	self.pub.publish(msg)

if __name__ == '__main__':
    rospy.init_node('responder')
    responder = Responder()
    rospy.spin()

ちなみに、ROS1では複数のクラスでの起動が楽だったり、複数トピックの並列Subscribeが可能(なはず)なので、この辺はROS1の方が便利です。

ビルド

$ cd catkin_ws
$ catkin_make

実行

ターミナル1

$ rosrun responder responder_node

ターミナル2

$ rostopic pub /base_msg std_msgs/msg/String "{data: 'hello'}"

ターミナル3

$ rostopic echo /ex_msg

rosrunros2 runとなるなど、ros2では" "が追加されているので、この辺は紛らわしいですね。

Discussion