🍃

ROS2: PyQt5+Qt Desingerでmsg可視化アプリを試作する

2022/11/15に公開

海洋ロボコンをやってた人です。
今回は、PyQt5+Qt DesingerでGUIプログラムしたアプリをROS2対応させ、msg可視化アプリを作ったときのメモおよびプログラムたちを備忘録としてまとめました。

他にも、「ROSboard」や「JSK Visualization」といったROSの強力な可視化ツールがたくさん揃っているので、車輪の再発明な点もありますが、Qt「キュート」の勉強も兼ねています。


※2023/02/26追記

本記事の「ROS2」表記、正しくは「ROS 2」です。

1: ROS2: PyQt5+Qt Desingerでmsg可視化アプリを試作する

この記述を読むことで

https://twitter.com/tasada038/status/1592464497918480384

のように、msg可視化の簡易アプリを試作できます。


個人的なポイントとしては

  • ROS2の接続/非接続の可視化
  • Rviz同様、トピックのON/OFF切り替え
  • Rviz同様、トピック名の変更にも対応
  • アプリのアイコンも作成

という点です。


使用環境は以下です。

  • Laptop PC
    • Ubuntu 20.04 Foxy

1.1: 本記事のプログラム

この記事で扱うプログラムのコードはGithubよりクローンしてご活用ください。

https://github.com/tasada038/pyqt_ros2_app

必要なライブラリは以下です。

sudo apt install pyqt5-dev-tools

1.2: Qt DesingerでGUI部分を作る

まずQt「キュート」について、Qtはクロスプラットホームでアプリケーション開発できるフレームワークになります。

基本的にはC++で開発されており、Qt Desinger, Qt Creator といった開発ツールも整っています。

商用利用する場合には、商用ライセンスの購入が必要ですが、趣味での開発ならオープンソースライセンスで問題ないかと思います。

また、RvizのPanelなどはQtライブラリで実装もされており、身近なところで実は使われています。

Qt Desingerで装飾する

基本的なパーツを使って、装飾していきます。

左側の「Widget Box」から以下をドラッグして、パーツをメインウィンドウ内に設置します。

  • Buttons
    • Check Box
  • Input Widgets
    • Line Edit
  • Containers
    • Tab Widget
  • Display Widgets
    • Label

パーツは全選択後、ヘッダーパネル部の「Layout Horizontal」や「Layout Vertical」でまとめます。

各、ラベルやタイトルは任意の名前に変更してください。

また、Pythonプログラム側で、各ラベルを区別したいので
QObject > objectName
にもオブジェクトの名称をつけておきます。

すべてのラベルやテキストの準備が完了したら保存し、.uiファイルを.pyファイルへ変更します。

pyuic5 ui_LabelTest01.ui -o ui_LabelTest01.py

上記はui_LabelTest01というファイルの変換ですが、任意の名称に変えてもらえば変換できます。

1.3: python側でROS2通信部を作る

続いて、UI部分のプログラムと同じ階層に、mainのプログラムを記述していきます。

まずはコンストラクタに

  • デフォルトのtopic名やiconのパスの設定
  • 各シグナルとスロットの設定

を記述します。

main_pyqt_ros2.py
class MainWindow(QMainWindow):
    def __init__(self, parent=None):
        super(MainWindow, self).__init__(parent)
        self.float_topic_name = '/micro_ros_arduino_node_publisher'
        self.icon_path = "image/qt_ros_logo.png"
        self.ui = Ui_MainWindow()
        self.ui.setupUi(self)
        self.setWindowIcon(QIcon(self.icon_path))
        self.create_menubars()
        self.show()

        self.ui.checkBox_float_01.stateChanged.connect(self.connect_ros_float)
        self.ui.checkBox_float_02.stateChanged.connect(self.update_ros_float)
        self.ui.label_topic_setting_float.setText(self.float_topic_name)
        self.ui.lineEdit_topic_float.editingFinished.connect(self.change_float_topic)

続いて、ROSの接続/非接続判断としてrclpy.spin_oneにtimeout_setを与え、タイムアウト時間を超えたら、非接続に、そうでない場合は接続されるような関数を記述します。

main_pyqt_ros2.py
    def connect_ros_float(self, state):
        if (Qt.Checked == state):
            try:
                # ROS2 init
                rclpy.init(args=None)
                self.node = Node('Qt_view_node')
                self.pub = self.node.create_subscription(
                    Float64,
                    self.float_topic_name,
                    self.sub_float_callback,
                    10,
                )
                # spin once, timeout_sec 5[s]
                timeout_sec_rclpy = 5
                timeout_init = time.time()
                rclpy.spin_once(self.node, timeout_sec=timeout_sec_rclpy)
                timeout_end = time.time()
                ros_connect_time = timeout_end - timeout_init

                # Error Handle for rclpy timeout
                if ros_connect_time >= timeout_sec_rclpy:
                    self.ui.label_ros2_state_float.setText("Couldn't Connect")
                    self.ui.label_ros2_state_float.setStyleSheet(
                        "color: rgb(255,255,255);"
                        "background-color: rgb(255,0,51);"
                        "border-radius:5px;"
                    )
                else:
                    self.ui.label_ros2_state_float.setText("Connected")
                    self.ui.label_ros2_state_float.setStyleSheet(
                        "color: rgb(255,255,255);"
                        "background-color: rgb(18,230,95);"
                        "border-radius:5px;"
                    )
            except:
                pass
        else:
            self.node.destroy_node()
            rclpy.shutdown()

あとは

  • データをアップデートするupdate_ros_float関数
  • 編集完了(Enterキー)したときのイベントを示すchange_float_topic関数
  • 指定したトピック名でcallbackするsub_float_callback類の関数

を定義します。

main_pyqt_ros2.py
    def update_ros_float(self, state):
        if (Qt.Checked == state):
            # create timer
            self.timer = QTimer(self)
            self.timer.timeout.connect(self.timer_float_update)
            self.timer.start(10)
        else:
            self.timer.stop()        

    def change_float_topic(self):
        self.float_topic_name = self.ui.lineEdit_topic_float.text()
        self.ui.label_topic_setting_float.setText(self.float_topic_name)

    ### ROS2 Data Updater
    def sub_float_callback(self, msg):
        self.number = round(msg.data, 2)
        # print(self.number)
        self.update_float_data_label()

    def update_float_data_label(self):
        self.ui.label_data_num_float.setText(str(self.number))
        self.show()

    def timer_float_update(self):
        rclpy.spin_once(self.node)
        self.update_float_data_label()
        self.show()
        self.timer.start(10)

よりアプリケーションっぽくするためにQMenuQActionなどで装飾します。

main_pyqt_ros2.py
    ### QMenu
    def create_menubars(self):
        menuBar = self.menuBar()
        # Creating menus using a QMenu object
        fileMenu = QMenu("&File", self)
        fileMenu.addAction(self.exit_action())
        fileMenu.addMenu(self.prefer_action())

        menuBar.addMenu(fileMenu)
        # Creating menus using a title
        editMenu = menuBar.addMenu("&Edit")
        editMenu.addMenu("Undo")
        helpMenu = menuBar.addMenu("&Help")
        helpMenu.addMenu("Get Started")

    def prefer_action(self):
        preferMenu = QMenu('Preferences', self)
        preferAct = QAction(QIcon('image/setting.jpg'),'Setting', self)
        preferMenu.addAction(preferAct)

        return preferMenu

    def exit_action(self):
       # Exit Action, connect
        exitAction = QAction(self.style().standardIcon(QStyle.SP_DialogCancelButton),
                             '&Exit', self)       
        exitAction.setShortcut('Ctrl+Q')
        exitAction.setStatusTip('Exit application')
        exitAction.triggered.connect(qApp.quit)
        self.statusBar()
        return exitAction

プログラムが完成したらターミナルを開き、mainのpythonプログラムを実行してみましょう。

python3 main_pyqt_ros2.py

表示できたら、ROS2のデモノードを起動させ

ros2 run demo_nodes_cpp talker_loaned_message

Topic Changed/chatter_podを入力後、Enterキーで反映。
ROS2 ConnectionFloat64 msgのチェックボックスにチェックを入れて、アプリケーションの動作を確認します。

データをリアルタイムで可視化できていればOKです。

以上

Reference

PyQt5とpython3によるGUIプログラミング:実践編[0]

Ar-Ray-code/ROS2PyQt5-example

Qt Creator でのGUIアプリケーション開発 5:Signal/slot editor を用いたイベント処理

ゼッツプログラTV: Qt5/Qt6入門 #6 Qt Creatorの使い方1


QtDesignerを使用しない場合は以下のような形(2024年度現在はChatGPT生成も可能)

import sys
import os
import json
import time
import cv2
import numpy as np
from datetime import datetime
from PyQt5.QtWidgets import (QApplication, QMainWindow, QPushButton, QVBoxLayout, QWidget, QFileDialog, QLabel, QInputDialog, QTextEdit)
from PyQt5.QtCore import QThread, pyqtSignal
from pynput import mouse, keyboard

class RecordingThread(QThread):
    update_signal = pyqtSignal(str)

    def __init__(self):
        super().__init__()
        self.recording = False
        self.stop_recording = False
        self.events = []
        self.start_time = None

    def on_move(self, x, y):
        if self.recording:
            self.events.append(('move', x, y, time.time() - self.start_time))

    def on_click(self, x, y, button, pressed):
        if self.recording:
            self.events.append(('click', x, y, button.name, pressed, time.time() - self.start_time))

    def on_scroll(self, x, y, dx, dy):
        if self.recording:
            self.events.append(('scroll', x, y, dx, dy, time.time() - self.start_time))

    def on_press(self, key):
        if key == keyboard.Key.esc:
            self.stop_recording = True

    def run(self):
        self.recording = True
        self.stop_recording = False
        self.events = []
        self.start_time = time.time()
        self.update_signal.emit("Status: Recording... Press Esc or Stop to stop.")

        mouse_listener = mouse.Listener(on_move=self.on_move, on_click=self.on_click, on_scroll=self.on_scroll)
        keyboard_listener = keyboard.Listener(on_press=self.on_press)
        mouse_listener.start()
        keyboard_listener.start()

        while not self.stop_recording:
            time.sleep(0.1)

        mouse_listener.stop()
        keyboard_listener.stop()
        self.recording = False

        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        filename = f'{timestamp}.json'
        os.makedirs('recordings', exist_ok=True)
        with open(f'recordings/{filename}', 'w') as f:
            json.dump(self.events, f)

        with open('recordings/manifest.json', 'a') as manifest_file:
            manifest_entry = {'timestamp': timestamp, 'filename': filename}
            json.dump(manifest_entry, manifest_file)
            manifest_file.write('\n')

        self.update_signal.emit(f"Status: Recording saved as {filename}")

class ReplayThread(QThread):
    update_signal = pyqtSignal(str)

    def __init__(self, filename, repeat, image_path=None):
        super().__init__()
        self.filename = filename
        self.repeat = repeat
        self.stop_replay = False
        self.pause_replay = False
        self.image_path = image_path
        self.template = None
        if image_path:
            self.template = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
            self.template_w, self.template_h = self.template.shape[::-1]

    def detect_image(self, screen_image):
        if self.template is None:
            return False
        screen_gray = cv2.cvtColor(screen_image, cv2.COLOR_BGR2GRAY)
        result = cv2.matchTemplate(screen_gray, self.template, cv2.TM_CCOEFF_NORMED)
        min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
        return max_val >= 0.8

    def capture_screen(self):
        # Replace this with actual screen capture code for your OS
        screen = np.array(cv2.imread('screenshot.png'))  # Placeholder for screen capture
        return screen

    def on_press(self, key):
        if key == keyboard.Key.esc:
            self.pause_replay = not self.pause_replay
            if self.pause_replay:
                self.update_signal.emit("Replay paused. Press Esc again to resume or Stop to end.")
            else:
                self.update_signal.emit("Replay resumed.")

    def run(self):
        if not os.path.exists(self.filename):
            self.update_signal.emit(f"File {self.filename} not found.")
            return

        with open(self.filename, 'r') as f:
            events = json.load(f)

        mouse_controller = mouse.Controller()
        keyboard_listener = keyboard.Listener(on_press=self.on_press)
        keyboard_listener.start()

        for repeat_count in range(1, self.repeat + 1):
            if self.stop_replay:
                self.update_signal.emit("Replay stopped.")
                keyboard_listener.stop()
                return

            self.update_signal.emit(f"Replaying {repeat_count}/{self.repeat}...")
            start_time = 0
            for event in events:
                if self.stop_replay:
                    self.update_signal.emit("Replay stopped.")
                    keyboard_listener.stop()
                    return

                while self.pause_replay:
                    time.sleep(0.1)

                if self.image_path:
                    screen_image = self.capture_screen()
                    if self.detect_image(screen_image):
                        self.update_signal.emit("Error image detected, stopping replay.")
                        keyboard_listener.stop()
                        return

                event_type = event[0]
                if event_type == 'move':
                    _, x, y, event_time = event
                    sleep_time = event_time - start_time
                    time.sleep(max(sleep_time, 0))
                    mouse_controller.position = (x, y)
                    start_time = event_time
                elif event_type == 'click':
                    _, x, y, button, pressed, event_time = event
                    sleep_time = event_time - start_time
                    time.sleep(max(sleep_time, 0))
                    mouse_controller.position = (x, y)
                    if pressed:
                        mouse_controller.press(mouse.Button.left if button == 'left' else mouse.Button.right)
                    else:
                        mouse_controller.release(mouse.Button.left if button == 'left' else mouse.Button.right)
                    start_time = event_time
                elif event_type == 'scroll':
                    _, x, y, dx, dy, event_time = event
                    sleep_time = event_time - start_time
                    time.sleep(max(sleep_time, 0))
                    mouse_controller.position = (x, y)
                    mouse_controller.scroll(dx, dy)
                    start_time = event_time

        keyboard_listener.stop()
        self.update_signal.emit("Replay complete.")

class MouseRecorderApp(QMainWindow):
    def __init__(self):
        super().__init__()
        self.setWindowTitle("Mouse Recorder")
        self.setGeometry(300, 300, 500, 350)

        self.central_widget = QWidget()
        self.setCentralWidget(self.central_widget)
        self.layout = QVBoxLayout()
        self.central_widget.setLayout(self.layout)

        self.record_button = QPushButton("Start Recording")
        self.record_button.clicked.connect(self.start_recording)
        self.layout.addWidget(self.record_button)

        self.stop_button = QPushButton("Stop Recording")
        self.stop_button.clicked.connect(self.stop_recording)
        self.layout.addWidget(self.stop_button)

        self.replay_button = QPushButton("Replay")
        self.replay_button.clicked.connect(self.replay_events)
        self.layout.addWidget(self.replay_button)

        self.stop_replay_button = QPushButton("Stop Replay")
        self.stop_replay_button.clicked.connect(self.stop_replay)
        self.layout.addWidget(self.stop_replay_button)

        self.info_label = QLabel("Status: Idle")
        self.layout.addWidget(self.info_label)

        self.record_display = QTextEdit()
        self.record_display.setReadOnly(True)
        self.layout.addWidget(self.record_display)

        self.record_thread = None
        self.replay_thread = None

        # Initialize button states
        self.update_button_states()

    def start_recording(self):
        if self.record_thread and self.record_thread.recording:
            self.info_label.setText("Already recording.")
            return

        self.record_thread = RecordingThread()
        self.record_thread.update_signal.connect(self.update_status)
        self.record_thread.start()
        self.update_button_states()

    def stop_recording(self):
        if self.record_thread and self.record_thread.recording:
            self.record_thread.stop_recording = True
        self.update_button_states()

    def replay_events(self):
            options = QFileDialog.Options()
            filename, _ = QFileDialog.getOpenFileName(self, "Select JSON File", "", "JSON Files (*.json)", options=options)

            if filename:
                repeat, ok = QInputDialog.getInt(self, "Repeat Count", "Enter number of repeats:", min=1)
                if ok:
                    image_path, _ = QFileDialog.getOpenFileName(self, "Select Error Image (Optional)", "", "Image Files (*.png *.jpg *.bmp)", options=options)
                    self.replay_thread = ReplayThread(filename, repeat, image_path if image_path else None)
                    self.replay_thread.update_signal.connect(self.update_status)
                    self.replay_thread.start()
                    self.update_button_states()

    def stop_replay(self):
        if self.replay_thread:
            self.replay_thread.stop_replay = True
        self.update_button_states()

    def update_status(self, message):
        self.info_label.setText(message)
        if self.record_thread and not self.record_thread.recording:
            self.display_recording()
        if self.replay_thread and self.replay_thread.stop_replay:
            self.info_label.setText("Replay stopped.")
            self.update_button_states()

    def display_recording(self):
        if self.record_thread:
            self.record_display.clear()
            for event in self.record_thread.events:
                self.record_display.append(str(event))

    def update_button_states(self):
        # Update button states based on current recording/replay status
        if self.record_thread and self.record_thread.recording:
            self.record_button.setEnabled(False)
            self.stop_button.setEnabled(True)
            self.replay_button.setEnabled(False)
            self.stop_replay_button.setEnabled(False)
        else:
            self.record_button.setEnabled(True)
            self.stop_button.setEnabled(False)
            self.replay_button.setEnabled(True)
            self.stop_replay_button.setEnabled(self.replay_thread is not None and not self.replay_thread.stop_replay)

if __name__ == "__main__":
    app = QApplication(sys.argv)
    window = MouseRecorderApp()
    window.show()
    sys.exit(app.exec_())

Discussion