🛠️

ROS1でroslibpyで任意のtopicをpub/subする

2022/02/20に公開

はじめに

roslibpyを使って任意のtopicをpub/subしたいケースがあったんですが、情報が案外なかったので記事にしました。
ROS1でのお話です。

roslibpy

Pythonのrosbridge clientです。

https://roslibpy.readthedocs.io/en/latest/

risbridge_servierを立ち上げて、別ターミナルでroslibpyを使ってROS1のpub/subが出来ます。

roslaunch rosbridge_server rosbridge_websocket.launch  

基本的な使い方は公式のExample参照。

publish

公式のExampleみてもstd_msgs/Stringを送信するケースしかないのでいくつか試しました。
topicはDict(辞書型)で扱うようになっているので、代入してDictを作っていくことも出来ますが、そのままベタ書きする方が直感的です。

sensor_msgs/Joy

sensor_msgs/Joyでaxes[1]だけ1.0を繰り返し送信するケースです。

def pub_joy():
    client_pub.run()

    pubtest = roslibpy.Topic(client_pub, '/joy', 'sensor_msgs/Joy')

    interval = 0.01
    send_time = 15.0

    for i in range(int(send_time/interval)):
        pubtest.publish(roslibpy.Message({
            'axes': [
                0,
                1.0
            ]
            }))
        time.sleep(interval)
    pubtest.unadvertise()
    client_pub.close()

jsk_recognition_msgs/PolygonArray

もう少し複雑なケースで、rvizのjsk_rviz_pluginsのjsk_recognition_msgs/PolygonArrayの場合はこのようになります。

jsk_rviz_pluginsのPolygonArrayについてはこちら参照。

def pub_polygon():
    ros_client = Ros('localhost', 9090)

    publisher = Topic(ros_client, '/polygon', 'jsk_recognition_msgs/PolygonArray')

    def start_sending():
        while True:
            if not ros_client.is_connected:
                break

            publisher.publish(Message({
                'header' : {
                    'frame_id': 'map'
                },
                'polygons': [
                    {
                        'header' : {
                            'frame_id': 'map'
                        },
                        'polygon': {
                            'points': [
                                {'x': 1.0, 'y': 1.0,'z': 0},
                                {'x': -1.0, 'y': 1.0,'z': 0},
                                {'x': -1.0, 'y': -1.0,'z': 0},
                                {'x': 1.0, 'y': -1.0,'z': 0}
                            ]
                        }
                    }
                ],
                'labels': [0],
                'likelihood': [1.0]
            }))

            time.sleep(0.01)
        publisher.unadvertise()

    ros_client.on_ready(start_sending, run_in_thread=True)
    ros_client.run_forever()


subscribe

nav_msgs/Odometryのtwist,linear,xだけ取り出す場合です。


def rcv_topic(message : t.Dict[str, t.Any]) -> None:
    print(message['twist']['twist']['linear']['x'])
    return

def rcv_client() -> None:
    topic: str = "odom"
    type: str = "nav_msgs/Odometry"
    client_sub.run()
    if not client_sub.is_connected:
        print("roslibpy client connect fail")
        return
    listener = roslibpy.Topic(client_sub, topic, type)
    global start_time
    start_time = time.time()
    listener.subscribe(rcv_topic)

diagnostic_msgs/DiagnosticArray

diagnostic_msgs/DiagnosticArray

callbackしか書かないですが、arrayを取り出したいときもforreachでアクセスする感じです。


def receive_message(message):
    for status in message['status']:
        for value in status['values']:
            print(value)

さいごに

案外情報がなかったのでもし使う方がいましたら参考になればと思います。

Discussion