📷

OPC UAを介してUSBカメラ映像を可視化してみた

2025/02/23に公開

はじめに

OPC UA (OPC Unified Architecture) は、産業用の機器やシステム間でデータ交換を行うためのプロトコルです。本記事では、Python の python-opcua ライブラリを用いて USB カメラから取得した映像を OPC UA 経由で送受信し、リアルタイムに可視化してみた例を紹介します。

全体の流れ

  1. カメラから映像を取得
    cv2.VideoCapture() を使い、カメラデバイスからフレームを取得します。
  2. JPEGエンコード→Base64エンコード
    OPC UA はバイナリデータを直接やり取りできますが、今回は簡便のため JPEG にエンコードしたフレームをさらに Base64 に変換して OPC UA のノードに書き込みます。
  3. OPC UA サーバーに送信
    client.get_node("<node_id>") でノードを取得し、set_value() により Base64エンコード後の JPEG データを送信します。
  4. 別のスクリプトで OPC UA サーバーから取得
    同じノードから get_value() でデータを取得し、Base64デコード→ JPEGデコード→ 画像表示 (cv2.imshow()) を行います。

必要な環境

  • Python 3.7 以降 (例では Python 3.9 以上を想定)
  • OpenCV (pip install opencv-python)
  • python-opcua (pip install opcua または pip install python-opcua)
  • USB カメラ (例: PC 内蔵カメラでも可)

スクリプト1: カメラ映像をOPC UAに送信する

以下のコードは、USBカメラの映像をフレームごとに取得し、OPC UA ノードに対して Base64エンコードした JPEG データを送信するものです。

test_opcua_frame.py
from datetime import datetime
from opcua import Client, Node, ua
import cv2
import sys
import base64
import time
import numpy as np

def capture_and_send_frames():
    try:
        # OPC UAクライアントの設定と接続
        client = Client("opc.tcp://localhost:53530/OPCUA/SimulationServer")
        client.connect()
        print("OPC UAサーバーに接続しました")

        # 既存のノードを取得
        node = client.get_node("ns=3;s=plc/image")
        print(f"ノードに接続: {node.nodeid}")

        # カメラのセットアップ
        cap = cv2.VideoCapture(1)
        if not cap.isOpened():
            raise Exception("カメラデバイスにアクセスできません")

        print(f"カメラの解像度: {cap.get(cv2.CAP_PROP_FRAME_WIDTH)}x{cap.get(cv2.CAP_PROP_FRAME_HEIGHT)}")

        while True:
            ret, frame = cap.read()
            if not ret:
                raise Exception("フレームの取得に失敗しました")

            # フレームをJPEGにエンコード
            _, buffer = cv2.imencode('.jpg', frame)
            jpeg_data = base64.b64encode(buffer)  # バイナリデータとして保持

            # OPC UAサーバーのノードに値を書き込み
            node.set_value(jpeg_data)

            # 処理状況を表示
            print(f"フレームを更新しました: {datetime.now().isoformat()}")

            # 100ms待機
            time.sleep(0.1)

    except Exception as e:
        print(f"エラーが発生しました: {str(e)}", file=sys.stderr)
        raise

    finally:
        if 'cap' in locals():
            cap.release()
            print("カメラを解放しました")
        if 'client' in locals():
            client.disconnect()
            print("OPC UAサーバーから切断しました")

if __name__ == "__main__":
    try:
        capture_and_send_frames()
    except KeyboardInterrupt:
        print("\nプログラムを終了します")
    except Exception as e:
        sys.exit(1)

スクリプト2: OPC UA サーバーから取得した映像を表示する

次に、送信されているカメラ映像を OPC UA サーバーから取得し、表示するスクリプトです。

test_opcua_get_frame.py
from opcua import Client
import cv2
import numpy as np
import base64

def display_image_from_opcua():
    try:
        # OPC UAクライアントの設定と接続
        client = Client("opc.tcp://localhost:53530/OPCUA/SimulationServer")
        client.connect()
        print("OPC UAサーバーに接続しました")

        # 画像ノードを取得
        node = client.get_node("ns=3;s=plc/image")
        print(f"ノードに接続: {node.nodeid}")

        while True:
            # ノードから画像データを取得
            jpeg_data = node.get_value()
            
            # Base64デコード
            img_data = base64.b64decode(jpeg_data)
            
            # バイナリデータからnumpy配列に変換
            img_array = np.frombuffer(img_data, np.uint8)
            
            # JPEGデータをデコード
            frame = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
            
            # 画像を表示
            cv2.imshow('OPC UA Camera Feed', frame)
            
            # qキーで終了
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

    except Exception as e:
        print(f"エラーが発生しました: {str(e)}")
        raise

    finally:
        cv2.destroyAllWindows()
        if 'client' in locals():
            client.disconnect()
            print("OPC UAサーバーから切断しました")

if __name__ == "__main__":
    try:
        display_image_from_opcua()
    except KeyboardInterrupt:
        print("\nプログラムを終了します")
    except Exception as e:
        import sys
        sys.exit(1)

実行手順

  1. OPC UA サーバーを起動
    自前の SimulationServer など、通信できる OPC UA サーバーが起動していることを確認してください。

  2. カメラ映像送信スクリプト起動

python test_opcua_frame.py

成功するとカメラ映像を取得し、OPC UA サーバーのノードに随時送信します。

  1. 映像受信&表示スクリプト起動
    別のターミナルなどで以下を実行:
python test_opcua_get_frame.py

OPC UA サーバーからカメラフレームを受信して、OpenCV ウィンドウに映像を表示します。

※ 表示ウィンドウで q を押すと表示を終了します。

参考リンク

  • python-opcua GitHub Discussions
    OPC UA の実装や使用例に関する議論が行われている GitHub Discussions のページです。
  • OPC UA Reference (OPC Foundation)
    OPC UA の公式リファレンスです。データ型やサービスの仕様を確認できます。
  • python-opcua サンプル
    python-opcua の公式リポジトリ内にあるサンプルコードです。クライアントの実装例が載っています。
  • python-opcua GitHub
    本記事で使用した python-opcua ライブラリの公式リポジトリです。インストール方法や詳細なドキュメントも掲載されています。
  • Prosys OPC UA 製品
    Prosys は OPC UA に対応した商用ソフトウェアを提供しています。シミュレーションサーバーの検証に役立つツールもあります。

おわりに

本記事では、OPC UA を介して USB カメラ映像をリアルタイムに送受信し、可視化する方法を紹介しました。
実際の産業機器や組み込みシステムではカメラ映像だけでなく、センサー値や制御データなど様々な情報を OPC UA でやり取りするケースが想定されます。Python で OPC UA を使う際には、ぜひ参考にしてみてください。

GitHubで編集を提案

Discussion