🛠️
ROS1でroslibpyで任意のtopicをpub/subする
はじめに
roslibpyを使って任意のtopicをpub/subしたいケースがあったんですが、情報が案外なかったので記事にしました。
ROS1でのお話です。
roslibpy
Pythonのrosbridge clientです。
risbridge_servierを立ち上げて、別ターミナルでroslibpyを使ってROS1のpub/subが出来ます。
roslaunch rosbridge_server rosbridge_websocket.launch
基本的な使い方は公式のExample参照。
publish
公式のExampleみてもstd_msgs/String
を送信するケースしかないのでいくつか試しました。
topicはDict(辞書型)で扱うようになっているので、代入してDictを作っていくことも出来ますが、そのままベタ書きする方が直感的です。
sensor_msgs/Joy
sensor_msgs/Joyでaxes[1]だけ1.0を繰り返し送信するケースです。
def pub_joy():
client_pub.run()
pubtest = roslibpy.Topic(client_pub, '/joy', 'sensor_msgs/Joy')
interval = 0.01
send_time = 15.0
for i in range(int(send_time/interval)):
pubtest.publish(roslibpy.Message({
'axes': [
0,
1.0
]
}))
time.sleep(interval)
pubtest.unadvertise()
client_pub.close()
jsk_recognition_msgs/PolygonArray
もう少し複雑なケースで、rvizのjsk_rviz_pluginsのjsk_recognition_msgs/PolygonArrayの場合はこのようになります。
jsk_rviz_pluginsのPolygonArrayについてはこちら参照。
def pub_polygon():
ros_client = Ros('localhost', 9090)
publisher = Topic(ros_client, '/polygon', 'jsk_recognition_msgs/PolygonArray')
def start_sending():
while True:
if not ros_client.is_connected:
break
publisher.publish(Message({
'header' : {
'frame_id': 'map'
},
'polygons': [
{
'header' : {
'frame_id': 'map'
},
'polygon': {
'points': [
{'x': 1.0, 'y': 1.0,'z': 0},
{'x': -1.0, 'y': 1.0,'z': 0},
{'x': -1.0, 'y': -1.0,'z': 0},
{'x': 1.0, 'y': -1.0,'z': 0}
]
}
}
],
'labels': [0],
'likelihood': [1.0]
}))
time.sleep(0.01)
publisher.unadvertise()
ros_client.on_ready(start_sending, run_in_thread=True)
ros_client.run_forever()
subscribe
nav_msgs/Odometry
nav_msgs/Odometryのtwist,linear,xだけ取り出す場合です。
def rcv_topic(message : t.Dict[str, t.Any]) -> None:
print(message['twist']['twist']['linear']['x'])
return
def rcv_client() -> None:
topic: str = "odom"
type: str = "nav_msgs/Odometry"
client_sub.run()
if not client_sub.is_connected:
print("roslibpy client connect fail")
return
listener = roslibpy.Topic(client_sub, topic, type)
global start_time
start_time = time.time()
listener.subscribe(rcv_topic)
diagnostic_msgs/DiagnosticArray
diagnostic_msgs/DiagnosticArray
callbackしか書かないですが、arrayを取り出したいときもforreachでアクセスする感じです。
def receive_message(message):
for status in message['status']:
for value in status['values']:
print(value)
さいごに
案外情報がなかったのでもし使う方がいましたら参考になればと思います。
Discussion