📷
OPC UAを介してUSBカメラ映像を可視化してみた
はじめに
OPC UA (OPC Unified Architecture) は、産業用の機器やシステム間でデータ交換を行うためのプロトコルです。本記事では、Python の python-opcua ライブラリを用いて USB カメラから取得した映像を OPC UA 経由で送受信し、リアルタイムに可視化してみた例を紹介します。
全体の流れ
-
カメラから映像を取得
cv2.VideoCapture()
を使い、カメラデバイスからフレームを取得します。 -
JPEGエンコード→Base64エンコード
OPC UA はバイナリデータを直接やり取りできますが、今回は簡便のため JPEG にエンコードしたフレームをさらに Base64 に変換して OPC UA のノードに書き込みます。 -
OPC UA サーバーに送信
client.get_node("<node_id>")
でノードを取得し、set_value()
により Base64エンコード後の JPEG データを送信します。 -
別のスクリプトで OPC UA サーバーから取得
同じノードからget_value()
でデータを取得し、Base64デコード→ JPEGデコード→ 画像表示 (cv2.imshow()
) を行います。
必要な環境
- Python 3.7 以降 (例では Python 3.9 以上を想定)
- OpenCV (
pip install opencv-python
) - python-opcua (
pip install opcua
またはpip install python-opcua
) - USB カメラ (例: PC 内蔵カメラでも可)
スクリプト1: カメラ映像をOPC UAに送信する
以下のコードは、USBカメラの映像をフレームごとに取得し、OPC UA ノードに対して Base64エンコードした JPEG データを送信するものです。
test_opcua_frame.py
from datetime import datetime
from opcua import Client, Node, ua
import cv2
import sys
import base64
import time
import numpy as np
def capture_and_send_frames():
try:
# OPC UAクライアントの設定と接続
client = Client("opc.tcp://localhost:53530/OPCUA/SimulationServer")
client.connect()
print("OPC UAサーバーに接続しました")
# 既存のノードを取得
node = client.get_node("ns=3;s=plc/image")
print(f"ノードに接続: {node.nodeid}")
# カメラのセットアップ
cap = cv2.VideoCapture(1)
if not cap.isOpened():
raise Exception("カメラデバイスにアクセスできません")
print(f"カメラの解像度: {cap.get(cv2.CAP_PROP_FRAME_WIDTH)}x{cap.get(cv2.CAP_PROP_FRAME_HEIGHT)}")
while True:
ret, frame = cap.read()
if not ret:
raise Exception("フレームの取得に失敗しました")
# フレームをJPEGにエンコード
_, buffer = cv2.imencode('.jpg', frame)
jpeg_data = base64.b64encode(buffer) # バイナリデータとして保持
# OPC UAサーバーのノードに値を書き込み
node.set_value(jpeg_data)
# 処理状況を表示
print(f"フレームを更新しました: {datetime.now().isoformat()}")
# 100ms待機
time.sleep(0.1)
except Exception as e:
print(f"エラーが発生しました: {str(e)}", file=sys.stderr)
raise
finally:
if 'cap' in locals():
cap.release()
print("カメラを解放しました")
if 'client' in locals():
client.disconnect()
print("OPC UAサーバーから切断しました")
if __name__ == "__main__":
try:
capture_and_send_frames()
except KeyboardInterrupt:
print("\nプログラムを終了します")
except Exception as e:
sys.exit(1)
スクリプト2: OPC UA サーバーから取得した映像を表示する
次に、送信されているカメラ映像を OPC UA サーバーから取得し、表示するスクリプトです。
test_opcua_get_frame.py
from opcua import Client
import cv2
import numpy as np
import base64
def display_image_from_opcua():
try:
# OPC UAクライアントの設定と接続
client = Client("opc.tcp://localhost:53530/OPCUA/SimulationServer")
client.connect()
print("OPC UAサーバーに接続しました")
# 画像ノードを取得
node = client.get_node("ns=3;s=plc/image")
print(f"ノードに接続: {node.nodeid}")
while True:
# ノードから画像データを取得
jpeg_data = node.get_value()
# Base64デコード
img_data = base64.b64decode(jpeg_data)
# バイナリデータからnumpy配列に変換
img_array = np.frombuffer(img_data, np.uint8)
# JPEGデータをデコード
frame = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
# 画像を表示
cv2.imshow('OPC UA Camera Feed', frame)
# qキーで終了
if cv2.waitKey(1) & 0xFF == ord('q'):
break
except Exception as e:
print(f"エラーが発生しました: {str(e)}")
raise
finally:
cv2.destroyAllWindows()
if 'client' in locals():
client.disconnect()
print("OPC UAサーバーから切断しました")
if __name__ == "__main__":
try:
display_image_from_opcua()
except KeyboardInterrupt:
print("\nプログラムを終了します")
except Exception as e:
import sys
sys.exit(1)
実行手順
-
OPC UA サーバーを起動
自前の SimulationServer など、通信できる OPC UA サーバーが起動していることを確認してください。 -
カメラ映像送信スクリプト起動
python test_opcua_frame.py
成功するとカメラ映像を取得し、OPC UA サーバーのノードに随時送信します。
- 映像受信&表示スクリプト起動
別のターミナルなどで以下を実行:
python test_opcua_get_frame.py
OPC UA サーバーからカメラフレームを受信して、OpenCV ウィンドウに映像を表示します。
※ 表示ウィンドウで q を押すと表示を終了します。
参考リンク
-
python-opcua GitHub Discussions
OPC UA の実装や使用例に関する議論が行われている GitHub Discussions のページです。 -
OPC UA Reference (OPC Foundation)
OPC UA の公式リファレンスです。データ型やサービスの仕様を確認できます。 -
python-opcua サンプル
python-opcua の公式リポジトリ内にあるサンプルコードです。クライアントの実装例が載っています。 -
python-opcua GitHub
本記事で使用したpython-opcua
ライブラリの公式リポジトリです。インストール方法や詳細なドキュメントも掲載されています。 -
Prosys OPC UA 製品
Prosys は OPC UA に対応した商用ソフトウェアを提供しています。シミュレーションサーバーの検証に役立つツールもあります。
おわりに
本記事では、OPC UA を介して USB カメラ映像をリアルタイムに送受信し、可視化する方法を紹介しました。
実際の産業機器や組み込みシステムではカメラ映像だけでなく、センサー値や制御データなど様々な情報を OPC UA でやり取りするケースが想定されます。Python で OPC UA を使う際には、ぜひ参考にしてみてください。
Discussion