ROS2: PyQt5+Qt Desingerでmsg可視化アプリを試作する
海洋ロボコンをやってた人です。
今回は、PyQt5+Qt DesingerでGUIプログラムしたアプリをROS2対応させ、msg可視化アプリを作ったときのメモおよびプログラムたちを備忘録としてまとめました。
他にも、「ROSboard」や「JSK Visualization」といったROSの強力な可視化ツールがたくさん揃っているので、車輪の再発明な点もありますが、Qt「キュート」の勉強も兼ねています。
※2023/02/26追記
本記事の「ROS2」表記、正しくは「ROS 2」です。
1: ROS2: PyQt5+Qt Desingerでmsg可視化アプリを試作する
この記述を読むことで
のように、msg可視化の簡易アプリを試作できます。
個人的なポイントとしては
- ROS2の接続/非接続の可視化
- Rviz同様、トピックのON/OFF切り替え
- Rviz同様、トピック名の変更にも対応
- アプリのアイコンも作成
という点です。
使用環境は以下です。
- Laptop PC
- Ubuntu 20.04 Foxy
1.1: 本記事のプログラム
この記事で扱うプログラムのコードはGithubよりクローンしてご活用ください。
必要なライブラリは以下です。
sudo apt install pyqt5-dev-tools
1.2: Qt DesingerでGUI部分を作る
まずQt「キュート」について、Qtはクロスプラットホームでアプリケーション開発できるフレームワークになります。
基本的にはC++で開発されており、Qt Desinger, Qt Creator といった開発ツールも整っています。
商用利用する場合には、商用ライセンスの購入が必要ですが、趣味での開発ならオープンソースライセンスで問題ないかと思います。
また、RvizのPanelなどはQtライブラリで実装もされており、身近なところで実は使われています。
Qt Desingerで装飾する
基本的なパーツを使って、装飾していきます。
左側の「Widget Box」から以下をドラッグして、パーツをメインウィンドウ内に設置します。
- Buttons
- Check Box
- Input Widgets
- Line Edit
- Containers
- Tab Widget
- Display Widgets
- Label
パーツは全選択後、ヘッダーパネル部の「Layout Horizontal」や「Layout Vertical」でまとめます。
各、ラベルやタイトルは任意の名前に変更してください。
また、Pythonプログラム側で、各ラベルを区別したいので
QObject > objectName
にもオブジェクトの名称をつけておきます。
すべてのラベルやテキストの準備が完了したら保存し、.uiファイルを.pyファイルへ変更します。
pyuic5 ui_LabelTest01.ui -o ui_LabelTest01.py
上記はui_LabelTest01
というファイルの変換ですが、任意の名称に変えてもらえば変換できます。
1.3: python側でROS2通信部を作る
続いて、UI部分のプログラムと同じ階層に、mainのプログラムを記述していきます。
まずはコンストラクタに
- デフォルトのtopic名やiconのパスの設定
- 各シグナルとスロットの設定
を記述します。
class MainWindow(QMainWindow):
def __init__(self, parent=None):
super(MainWindow, self).__init__(parent)
self.float_topic_name = '/micro_ros_arduino_node_publisher'
self.icon_path = "image/qt_ros_logo.png"
self.ui = Ui_MainWindow()
self.ui.setupUi(self)
self.setWindowIcon(QIcon(self.icon_path))
self.create_menubars()
self.show()
self.ui.checkBox_float_01.stateChanged.connect(self.connect_ros_float)
self.ui.checkBox_float_02.stateChanged.connect(self.update_ros_float)
self.ui.label_topic_setting_float.setText(self.float_topic_name)
self.ui.lineEdit_topic_float.editingFinished.connect(self.change_float_topic)
続いて、ROSの接続/非接続判断としてrclpy.spin_oneにtimeout_set
を与え、タイムアウト時間を超えたら、非接続に、そうでない場合は接続されるような関数を記述します。
def connect_ros_float(self, state):
if (Qt.Checked == state):
try:
# ROS2 init
rclpy.init(args=None)
self.node = Node('Qt_view_node')
self.pub = self.node.create_subscription(
Float64,
self.float_topic_name,
self.sub_float_callback,
10,
)
# spin once, timeout_sec 5[s]
timeout_sec_rclpy = 5
timeout_init = time.time()
rclpy.spin_once(self.node, timeout_sec=timeout_sec_rclpy)
timeout_end = time.time()
ros_connect_time = timeout_end - timeout_init
# Error Handle for rclpy timeout
if ros_connect_time >= timeout_sec_rclpy:
self.ui.label_ros2_state_float.setText("Couldn't Connect")
self.ui.label_ros2_state_float.setStyleSheet(
"color: rgb(255,255,255);"
"background-color: rgb(255,0,51);"
"border-radius:5px;"
)
else:
self.ui.label_ros2_state_float.setText("Connected")
self.ui.label_ros2_state_float.setStyleSheet(
"color: rgb(255,255,255);"
"background-color: rgb(18,230,95);"
"border-radius:5px;"
)
except:
pass
else:
self.node.destroy_node()
rclpy.shutdown()
あとは
- データをアップデートする
update_ros_float
関数 - 編集完了(Enterキー)したときのイベントを示す
change_float_topic
関数 - 指定したトピック名でcallbackする
sub_float_callback
類の関数
を定義します。
def update_ros_float(self, state):
if (Qt.Checked == state):
# create timer
self.timer = QTimer(self)
self.timer.timeout.connect(self.timer_float_update)
self.timer.start(10)
else:
self.timer.stop()
def change_float_topic(self):
self.float_topic_name = self.ui.lineEdit_topic_float.text()
self.ui.label_topic_setting_float.setText(self.float_topic_name)
### ROS2 Data Updater
def sub_float_callback(self, msg):
self.number = round(msg.data, 2)
# print(self.number)
self.update_float_data_label()
def update_float_data_label(self):
self.ui.label_data_num_float.setText(str(self.number))
self.show()
def timer_float_update(self):
rclpy.spin_once(self.node)
self.update_float_data_label()
self.show()
self.timer.start(10)
よりアプリケーションっぽくするためにQMenu
やQAction
などで装飾します。
### QMenu
def create_menubars(self):
menuBar = self.menuBar()
# Creating menus using a QMenu object
fileMenu = QMenu("&File", self)
fileMenu.addAction(self.exit_action())
fileMenu.addMenu(self.prefer_action())
menuBar.addMenu(fileMenu)
# Creating menus using a title
editMenu = menuBar.addMenu("&Edit")
editMenu.addMenu("Undo")
helpMenu = menuBar.addMenu("&Help")
helpMenu.addMenu("Get Started")
def prefer_action(self):
preferMenu = QMenu('Preferences', self)
preferAct = QAction(QIcon('image/setting.jpg'),'Setting', self)
preferMenu.addAction(preferAct)
return preferMenu
def exit_action(self):
# Exit Action, connect
exitAction = QAction(self.style().standardIcon(QStyle.SP_DialogCancelButton),
'&Exit', self)
exitAction.setShortcut('Ctrl+Q')
exitAction.setStatusTip('Exit application')
exitAction.triggered.connect(qApp.quit)
self.statusBar()
return exitAction
プログラムが完成したらターミナルを開き、mainのpythonプログラムを実行してみましょう。
python3 main_pyqt_ros2.py
表示できたら、ROS2のデモノードを起動させ
ros2 run demo_nodes_cpp talker_loaned_message
Topic Changed
に/chatter_pod
を入力後、Enterキーで反映。
ROS2 Connection
とFloat64 msg
のチェックボックスにチェックを入れて、アプリケーションの動作を確認します。
データをリアルタイムで可視化できていればOKです。
以上
Reference
PyQt5とpython3によるGUIプログラミング:実践編[0]
Qt Creator でのGUIアプリケーション開発 5:Signal/slot editor を用いたイベント処理
ゼッツプログラTV: Qt5/Qt6入門 #6 Qt Creatorの使い方1
QtDesignerを使用しない場合は以下のような形(2024年度現在はChatGPT生成も可能)
import sys
import os
import json
import time
import cv2
import numpy as np
from datetime import datetime
from PyQt5.QtWidgets import (QApplication, QMainWindow, QPushButton, QVBoxLayout, QWidget, QFileDialog, QLabel, QInputDialog, QTextEdit)
from PyQt5.QtCore import QThread, pyqtSignal
from pynput import mouse, keyboard
class RecordingThread(QThread):
update_signal = pyqtSignal(str)
def __init__(self):
super().__init__()
self.recording = False
self.stop_recording = False
self.events = []
self.start_time = None
def on_move(self, x, y):
if self.recording:
self.events.append(('move', x, y, time.time() - self.start_time))
def on_click(self, x, y, button, pressed):
if self.recording:
self.events.append(('click', x, y, button.name, pressed, time.time() - self.start_time))
def on_scroll(self, x, y, dx, dy):
if self.recording:
self.events.append(('scroll', x, y, dx, dy, time.time() - self.start_time))
def on_press(self, key):
if key == keyboard.Key.esc:
self.stop_recording = True
def run(self):
self.recording = True
self.stop_recording = False
self.events = []
self.start_time = time.time()
self.update_signal.emit("Status: Recording... Press Esc or Stop to stop.")
mouse_listener = mouse.Listener(on_move=self.on_move, on_click=self.on_click, on_scroll=self.on_scroll)
keyboard_listener = keyboard.Listener(on_press=self.on_press)
mouse_listener.start()
keyboard_listener.start()
while not self.stop_recording:
time.sleep(0.1)
mouse_listener.stop()
keyboard_listener.stop()
self.recording = False
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f'{timestamp}.json'
os.makedirs('recordings', exist_ok=True)
with open(f'recordings/{filename}', 'w') as f:
json.dump(self.events, f)
with open('recordings/manifest.json', 'a') as manifest_file:
manifest_entry = {'timestamp': timestamp, 'filename': filename}
json.dump(manifest_entry, manifest_file)
manifest_file.write('\n')
self.update_signal.emit(f"Status: Recording saved as {filename}")
class ReplayThread(QThread):
update_signal = pyqtSignal(str)
def __init__(self, filename, repeat, image_path=None):
super().__init__()
self.filename = filename
self.repeat = repeat
self.stop_replay = False
self.pause_replay = False
self.image_path = image_path
self.template = None
if image_path:
self.template = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
self.template_w, self.template_h = self.template.shape[::-1]
def detect_image(self, screen_image):
if self.template is None:
return False
screen_gray = cv2.cvtColor(screen_image, cv2.COLOR_BGR2GRAY)
result = cv2.matchTemplate(screen_gray, self.template, cv2.TM_CCOEFF_NORMED)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
return max_val >= 0.8
def capture_screen(self):
# Replace this with actual screen capture code for your OS
screen = np.array(cv2.imread('screenshot.png')) # Placeholder for screen capture
return screen
def on_press(self, key):
if key == keyboard.Key.esc:
self.pause_replay = not self.pause_replay
if self.pause_replay:
self.update_signal.emit("Replay paused. Press Esc again to resume or Stop to end.")
else:
self.update_signal.emit("Replay resumed.")
def run(self):
if not os.path.exists(self.filename):
self.update_signal.emit(f"File {self.filename} not found.")
return
with open(self.filename, 'r') as f:
events = json.load(f)
mouse_controller = mouse.Controller()
keyboard_listener = keyboard.Listener(on_press=self.on_press)
keyboard_listener.start()
for repeat_count in range(1, self.repeat + 1):
if self.stop_replay:
self.update_signal.emit("Replay stopped.")
keyboard_listener.stop()
return
self.update_signal.emit(f"Replaying {repeat_count}/{self.repeat}...")
start_time = 0
for event in events:
if self.stop_replay:
self.update_signal.emit("Replay stopped.")
keyboard_listener.stop()
return
while self.pause_replay:
time.sleep(0.1)
if self.image_path:
screen_image = self.capture_screen()
if self.detect_image(screen_image):
self.update_signal.emit("Error image detected, stopping replay.")
keyboard_listener.stop()
return
event_type = event[0]
if event_type == 'move':
_, x, y, event_time = event
sleep_time = event_time - start_time
time.sleep(max(sleep_time, 0))
mouse_controller.position = (x, y)
start_time = event_time
elif event_type == 'click':
_, x, y, button, pressed, event_time = event
sleep_time = event_time - start_time
time.sleep(max(sleep_time, 0))
mouse_controller.position = (x, y)
if pressed:
mouse_controller.press(mouse.Button.left if button == 'left' else mouse.Button.right)
else:
mouse_controller.release(mouse.Button.left if button == 'left' else mouse.Button.right)
start_time = event_time
elif event_type == 'scroll':
_, x, y, dx, dy, event_time = event
sleep_time = event_time - start_time
time.sleep(max(sleep_time, 0))
mouse_controller.position = (x, y)
mouse_controller.scroll(dx, dy)
start_time = event_time
keyboard_listener.stop()
self.update_signal.emit("Replay complete.")
class MouseRecorderApp(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("Mouse Recorder")
self.setGeometry(300, 300, 500, 350)
self.central_widget = QWidget()
self.setCentralWidget(self.central_widget)
self.layout = QVBoxLayout()
self.central_widget.setLayout(self.layout)
self.record_button = QPushButton("Start Recording")
self.record_button.clicked.connect(self.start_recording)
self.layout.addWidget(self.record_button)
self.stop_button = QPushButton("Stop Recording")
self.stop_button.clicked.connect(self.stop_recording)
self.layout.addWidget(self.stop_button)
self.replay_button = QPushButton("Replay")
self.replay_button.clicked.connect(self.replay_events)
self.layout.addWidget(self.replay_button)
self.stop_replay_button = QPushButton("Stop Replay")
self.stop_replay_button.clicked.connect(self.stop_replay)
self.layout.addWidget(self.stop_replay_button)
self.info_label = QLabel("Status: Idle")
self.layout.addWidget(self.info_label)
self.record_display = QTextEdit()
self.record_display.setReadOnly(True)
self.layout.addWidget(self.record_display)
self.record_thread = None
self.replay_thread = None
# Initialize button states
self.update_button_states()
def start_recording(self):
if self.record_thread and self.record_thread.recording:
self.info_label.setText("Already recording.")
return
self.record_thread = RecordingThread()
self.record_thread.update_signal.connect(self.update_status)
self.record_thread.start()
self.update_button_states()
def stop_recording(self):
if self.record_thread and self.record_thread.recording:
self.record_thread.stop_recording = True
self.update_button_states()
def replay_events(self):
options = QFileDialog.Options()
filename, _ = QFileDialog.getOpenFileName(self, "Select JSON File", "", "JSON Files (*.json)", options=options)
if filename:
repeat, ok = QInputDialog.getInt(self, "Repeat Count", "Enter number of repeats:", min=1)
if ok:
image_path, _ = QFileDialog.getOpenFileName(self, "Select Error Image (Optional)", "", "Image Files (*.png *.jpg *.bmp)", options=options)
self.replay_thread = ReplayThread(filename, repeat, image_path if image_path else None)
self.replay_thread.update_signal.connect(self.update_status)
self.replay_thread.start()
self.update_button_states()
def stop_replay(self):
if self.replay_thread:
self.replay_thread.stop_replay = True
self.update_button_states()
def update_status(self, message):
self.info_label.setText(message)
if self.record_thread and not self.record_thread.recording:
self.display_recording()
if self.replay_thread and self.replay_thread.stop_replay:
self.info_label.setText("Replay stopped.")
self.update_button_states()
def display_recording(self):
if self.record_thread:
self.record_display.clear()
for event in self.record_thread.events:
self.record_display.append(str(event))
def update_button_states(self):
# Update button states based on current recording/replay status
if self.record_thread and self.record_thread.recording:
self.record_button.setEnabled(False)
self.stop_button.setEnabled(True)
self.replay_button.setEnabled(False)
self.stop_replay_button.setEnabled(False)
else:
self.record_button.setEnabled(True)
self.stop_button.setEnabled(False)
self.replay_button.setEnabled(True)
self.stop_replay_button.setEnabled(self.replay_thread is not None and not self.replay_thread.stop_replay)
if __name__ == "__main__":
app = QApplication(sys.argv)
window = MouseRecorderApp()
window.show()
sys.exit(app.exec_())
Discussion