🐍

[Python] ModbusTCP通信環境を構築する

2023/06/13に公開

はじめに

Python プロジェクトで ModbusTCP 通信環境が必要であり、
実現した経験があります。

その時に pyModbusTCP ライブラリーに関する情報が少なく、大変でした。
今回実装方法を紹介して、今後の誰かの開発の役に立てば良いなと思います。

対象読者

  • Python の基本文法を理解している方
  • pyModbusTCP で ModbusTCP 通信環境を構築したい方
  • サーバー/クライアント等の単語を理解している方

この記事でわかること

  • pyModbusTCP の基本的な使い方

前提条件

  • Python 3.9.10
  • pyModbusTCP 0.2.0

手順解説

ライブラリーの準備

1. pyModbusTCP ライブラリーをインストールする

下記コマンドでインストールする

pip install pyModbusTCP

2. pyModbusTCP ライブラリーを確認する

pyModbusTCP 0.2.0がインストールされている事が確認できます。

$ pip show pyModbusTCP
Name: pyModbusTCP
Version: 0.2.0
Summary: A simple Modbus/TCP library for Python
Home-page: https://github.com/sourceperl/pyModbusTCP
Author: Loic Lefebvre
Author-email: loic.celine@free.fr
License: MIT
Location: /home/furuta/.local/lib/python3.9/site-packages
Requires:
Required-by:

サーバー側の実装

1. ライブラリー等をインポートする

ModbusServerをインポートしてサーバー側に必要な関数等を使えるようにします。
datetimeは、ログ用にインポートします。

from pyModbusTCP.server import ModbusServer
from datetime import datetime

2. サーバー用のクラスを作成する

インポートしたModbusServerに、下記を引数として渡せるようにします。

  • IP アドレス
  • ポート
  • ノンブロッキング動作の設定値

インスタンス化したModbusServerIPアドレスポートは、
クラス内部で使用したり、呼び出しために self で情報を保持します。

class ModbusTCPServer():

    def __init__(self, ipaddress="127.0.0.1", port=12345):
        self.__ipaddress = ipaddress
        self.__port = port
        self.__server = ModbusServer(ipaddress, port, no_block=True)
        pass

補足情報

  • Q: ノンブロッキング動作(no_block)とは?

  • A: 入出力処理の完了を待たずに次の処理を実行することができる非同期処理のことを指します。
    このように、ノンブロッキング動作を行うことで、Modbus サーバーは他の処理を同時に実行することができ、パフォーマンスを向上させることができます。

  • Q: no_block=False を指定すると?

  • A: Modbus サーバーはブロッキング動作を行うようになります。
    ブロッキング動作を行う場合、入出力処理が完了するまで次の処理を実行することができず、パフォーマンスが低下することがあります。

3. 状態を取得できる関数を作成する

@propertyを使って、下記をオブジェクトに対して値を取得できるように関数を作成します。

  • タイムスタンプ
  • サーバーの IP アドレス
  • サーバーのポート
    @property
    def get_nowtime(self):
        """ ログ用のタイムスタンプ """
        return datetime.now()

    @property
    def get_ipaddress(self):
        """ サーバーのIPアドレスを取得する """
        return self.__ipaddress

    @property
    def get_port(self):
        """ サーバーのポートを取得する """
        return self.__port

4. サーバーを起動する関数の作成する

ModbusServer.start()を使って任意のタイミングでサーバーを起動が出来るようにします。
サーバーが起動したら、Start ModbusTCP Serverを出力します。

    def start_server(self):
        """ サーバーを起動する """
        print(f"{self.get_nowtime} Start ModbusTCP Server")
        return self.__server.start()

5. サーバーを停止する関数の作成する

ModbusServer.stop()を使って任意のタイミングでサーバーを停止が出来るようにします。
サーバーが起動したら、Stop ModbusTCP Serveを出力します。

    def stop_server(self):
        """ サーバーを停止する """
        print(f"{self.get_nowtime} Stop ModbusTCP Server")
        return self.__server.stop()

6. 保持レジスターの値に関する関数を作成する

下記関数を作成します。

  • 保持レジスターの値を設定する
  • 保持レジスターの値を取得する

set_holding_registersは、
設定するアドレス設定する値を渡して保持レジスターに設定します。
その時の設定するアドレスと設定する値を出力させます。

get_holding_registersは、
設定するアドレス取得する値の数の値を渡して保持レジスターの値を取得する。
その時の設定するアドレスと取得した値を出力させます。

    def get_holding_registers(self, address, num=1):
        """ 保持レジスターの値を取得する """
        value = self.__server.data_bank.get_holding_registers(address, number=num)
        print(f"{self.get_nowtime} [server] get_holding_registers[address:{address}]: {value}")
        return value

    def set_holding_registers(self, address, value):
        """ 保持レジスターの値を設定する """
        print(f"{self.get_nowtime} [server] set_holding_registers[address:{address}]: {[value]}")
        return self.__server.data_bank.set_holding_registers(address, [value])

補足情報

  • Q: 保持レジスターとは?

  • A: Modbus スレーブデバイスが持つ一連の 16 ビットのレジスターのことを指します。
    スレーブデバイスに対して書き込み要求を送信することで書き込みができ、
    また読み出し要求を送信することでその値を読み出すことができます。

  • Q: 保持レジスターのアドレス範囲は?

  • A: 0x0000 から 0xFFFF までの 65,536 個のアドレスがあり、
    各レジスターは 0 から 65535 の値を取ることができます。

サーバー側のサンプルコード

先ほどの手順解説をまとめたコードになります。

server.py

from pyModbusTCP.server import ModbusServer
from datetime import datetime

class ModbusTCPServer():

    def __init__(self, ipaddress="127.0.0.1", port=12345):
        self.__ipaddress = ipaddress
        self.__port = port
        self.__server = ModbusServer(ipaddress, port, no_block=True)
        pass

    @property
    def get_nowtime(self):
        """ ログ用のタイムスタンプ """
        return datetime.now()

    @property
    def get_ipaddress(self):
        """ サーバーのIPアドレスを取得する """
        return self.__ipaddress

    @property
    def get_port(self):
        """ サーバーのポートを取得する """
        return self.__port

    def start_server(self):
        """ サーバーを起動する """
        print(f"{self.get_nowtime} Start ModbusTCP Server")
        return self.__server.start()

    def stop_server(self):
        """ サーバーを停止する """
        print(f"{self.get_nowtime} Stop ModbusTCP Server")
        return self.__server.stop()

    def get_holding_registers(self, address, num=1):
        """ 保持レジスターの値を取得する """
        value = self.__server.data_bank.get_holding_registers(address, number=num)
        print(f"{self.get_nowtime} [server] get_holding_registers[address:{address}]: {value}")
        return value

    def set_holding_registers(self, address, value):
        """ 保持レジスターの値を設定する """
        print(f"{self.get_nowtime} [server] set_holding_registers[address:{address}]: {[value]}")
        return self.__server.data_bank.set_holding_registers(address, [value])

クライアント側の実装

1. ライブラリー等をインポートする

ModbusClientをインポートしてクライアント側に必要な関数等を使えるようにします。
datetimeは、ログ用にインポートします。
timeは、スリープ用にインポートします。

from pyModbusTCP.client import ModbusClient
from datetime import datetime
import time

2. クライアント用のクラスを作成する

インポートしたModbusClientに IP アドレス/ポートを引数として渡せるようにします。
インスタンス化したModbusClientは、クラス内部で使用したり、呼び出しために self で情報を保持します。

class ModbusTCPClient():

    def __init__(self, ipaddress="127.0.0.1", port=12345):
        self.__client = ModbusClient(host=ipaddress, port=port)
        pass

3. 状態を取得できる関数を作成する

@propertyを使って、下記をオブジェクトに対して値を取得できるように関数を作成します。

  • タイムスタンプ
  • サーバーとの接続状態
    @property
    def get_nowtime(self):
        """ ログ用のタイムスタンプ """
        return datetime.now()

    @property
    def get_connect_state(self):
        """ サーバーとの接続状態を確認する """
        return self.__client.is_open

4. サーバーへ接続する関数を作成する

デフォルトで 10 秒間 接続を監視する
接続が出来たら、Connect: Success !! と表示するようにする

    def connect_to_server(self, sec=10):
        """ サーバーとの接続を試みる """
        i = 0
        while i < sec:
            result = self.__client.open()
            print(f"{self.get_nowtime} Connect: Success !!")
            if result:
                break
            else:
                i += 1
                time.sleep(1)

5. サーバーへ切断する関数を作成する

デフォルトで 10 秒間 切断を監視する
接続が出来たら、Disconnect: Success !! と表示するようにする

    def disconnect_to_server(self, sec=10):
        """ サーバーとの切断を試みる """
        i = 0
        while i < sec:
            result = self.__client.close()
            if not result:
                print(f"{self.get_nowtime} Disconnect: Success !!")
                break
            else:
                i += 1
                time.sleep(1)

6. 保持レジスターの値を取得する関数を作成する

取得したい保持レジスターのアドレスを指定して保持レジスターの値を取得する

    def read_holding_registers(self, address):
        """ 保持レジスターの値を取得する """
        value = self.__client.read_holding_registers(address)
        print(f"{self.get_nowtime} [client] read_holding_registers[address:{address}]: {value}")

クライアント側のサンプルコード

先ほどの手順解説をまとめたコードになります。

client.py
from pyModbusTCP.client import ModbusClient
from datetime import datetime
import time


class ModbusTCPClient():

    def __init__(self, ipaddress="127.0.0.1", port=12345):
        self.__client = ModbusClient(host=ipaddress, port=port)
        pass

    @property
    def get_nowtime(self):
        """ ログ用のタイムスタンプ """
        return datetime.now()

    @property
    def get_connect_state(self):
        """ サーバーとの接続状態を確認する """
        return self.__client.is_open

    def connect_to_server(self, sec=10):
        """ サーバーとの接続を試みる """
        i = 0
        while i < sec:
            result = self.__client.open()
            print(f"{self.get_nowtime} Connect: Success !!")
            if result:
                break
            else:
                i += 1
                time.sleep(1)

    def disconnect_to_server(self, sec=10):
        """ サーバーとの切断を試みる """
        i = 0
        while i < sec:
            result = self.__client.close()
            if not result:
                print(f"{self.get_nowtime} Disconnect: Success !!")
                break
            else:
                i += 1
                time.sleep(1)

    def read_holding_registers(self, address):
        """ 保持レジスターの値を取得する """
        value = self.__client.read_holding_registers(address)
        print(f"{self.get_nowtime} [client] read_holding_registers[address:{address}]: {value}")

    def write_single_register(self, address, value):
        """ シングルレジスターに値を設定する """
        print(f"{self.get_nowtime} [client] write_single_register[address:{address}]: {value}")
        return self.__client.write_single_register(address, value)

    def write_multiple_registers(self, address, value: list):
        """ マルチレジスターに値を設定する """
        print(f"{self.get_nowtime} [client] write_multiple_registers[address:{address}]: {value}")
        return self.__client.write_multiple_registers(address, value)

ModbusTCP 通信環境の実装

1. 自作のクライアントとサーバーをインポートする

timeは、サーバーとクライアントを十分に接続できるようにスリープさせるためにインポートします

from server import ModbusTCPServer
from client import ModbusTCPClient
import time

2. 通信環境の準備する

インポートした自作クラスをインスタンス化し、通信環境を準備します。

ipaddress = "127.0.0.1"
port = 12345

# インスタンス化
modbus_server = ModbusTCPServer(ipaddress=ipaddress, port=port)
modbus_client = ModbusTCPClient(ipaddress=ipaddress, port=port)

3. サーバーを起動後、接続する

modbus_server.start_server()
modbus_client.connect_to_server()

4. 保持レジスターのアドレスの 0 番目に値を操作する

  1. クライアント側で address=0 の値が [0] の初期値である事を確認する
  2. サーバー側で address=0 の値に23を設定する。
  3. クライアント側にデータが十分に反映されるまで待つ
  4. クライアント側で address=0 の値が [23] に更新されている事を確認する
modbus_client.read_holding_registers(address=0)
modbus_server.set_holding_registers(address=0, value=23)
time.sleep(2)
modbus_client.read_holding_registers(address=0)

5. 保持レジスターのアドレスの 1 番目に値を操作する

address=1の保持レジスターでも同様な操作が可能か確認します

modbus_client.read_holding_registers(address=1)
modbus_server.set_holding_registers(address=1, value=23)
time.sleep(2)
modbus_client.read_holding_registers(address=1)

6. 保持レジスターのアドレスの 1 番目に値を更新する

  1. サーバー側で address=1 の値に25を上書きする。
  2. クライアント側にデータが十分に反映されるまで待つ
  3. クライアント側で address=1 の値が [25] に更新されている事を確認する
modbus_server.set_holding_registers(address=1, value=25)
time.sleep(2)
modbus_client.read_holding_registers(address=1)

7. サーバーとの切断後、終了する

modbus_client.get_connect_stateで接続状態を確認しながらサーバーを終了させます。

print(f"{modbus_client.get_nowtime} from client to server connected state:{modbus_client.get_connect_state}")
modbus_client.disconnect_to_server()
print(f"{modbus_client.get_nowtime} from client to server connected state:{modbus_client.get_connect_state}")
modbus_server.stop_server()

サーバー側->クライアント側へサンプルコード

先ほどの手順解説をまとめたコードになります。

sample_server_to_client.py
from server import ModbusTCPServer
from client import ModbusTCPClient
import time

ipaddress = "127.0.0.1"
port = 12345

# インスタンス化
modbus_server = ModbusTCPServer(ipaddress=ipaddress, port=port)
modbus_client = ModbusTCPClient(ipaddress=ipaddress, port=port)

print(f"{modbus_client.get_nowtime} from client to server connected state:{modbus_client.get_connect_state}")
# modbusTCPサーバーを立ち上げる
modbus_server.start_server()
# サーバーへの接続を試みる
modbus_client.connect_to_server()
print(f"{modbus_client.get_nowtime} from client to server connected state:{modbus_client.get_connect_state}")

# クライアント側でaddress=0の値が[0]の初期値である事を確認する
modbus_client.read_holding_registers(address=0)
# サーバー側でaddress=0の値に23を設定する。
modbus_server.set_holding_registers(address=0, value=23)
# クライアント側にデータが十分に反映されるまで待つ
time.sleep(2)
# クライアント側でaddress=0の値が[23]に更新されている事を確認する
modbus_client.read_holding_registers(address=0)

# クライアント側でaddress=1の値が[0]の初期値である事を確認する
modbus_client.read_holding_registers(address=1)
# サーバー側でaddress=1の値に25を設定する。
modbus_server.set_holding_registers(address=1, value=25)
# クライアント側にデータが十分に反映されるまで待つ
time.sleep(2)
# クライアント側でaddress=1の値が[25]に更新されている事を確認する
modbus_client.read_holding_registers(address=1)

# サーバー側でaddress=1の値に25を上書きする。
modbus_server.set_holding_registers(address=1, value=20)
# クライアント側にデータが十分に反映されるまで待つ
time.sleep(2)
# クライアント側でaddress=1の値が[20]に更新されている事を確認する
modbus_client.read_holding_registers(address=1)

print(f"{modbus_client.get_nowtime} from client to server connected state:{modbus_client.get_connect_state}")
# サーバーとの切断を試みる
modbus_client.disconnect_to_server()
print(f"{modbus_client.get_nowtime} from client to server connected state:{modbus_client.get_connect_state}")
# modbusTCPサーバーを終了させる
modbus_server.stop_server()

出力結果を確認します。

$ python sample_server_to_client.py
2023-02-21 14:32:42.986585 from client to server connected state:False

2023-02-21 14:32:42.986683 Start ModbusTCP Server
2023-02-21 14:32:42.992917 Connect: Success !!

2023-02-21 14:32:42.993132 from client to server connected state:True

2023-02-21 14:32:42.996868 [client] read_holding_registers[address:0]: [0]
2023-02-21 14:32:44.999293 [server] set_holding_registers[address:0]: [23]
2023-02-21 14:32:47.002003 [client] read_holding_registers[address:0]: [23]

2023-02-21 14:32:47.002562 [client] read_holding_registers[address:1]: [0]
2023-02-21 14:32:47.002656 [server] set_holding_registers[address:1]: [25]
2023-02-21 14:32:49.005167 [client] read_holding_registers[address:1]: [25]

2023-02-21 14:32:49.005256 [server] set_holding_registers[address:1]: [20]
2023-02-21 14:32:51.007852 [client] read_holding_registers[address:1]: [20]

2023-02-21 14:32:51.008022 from client to server connected state:True
2023-02-21 14:32:51.008166 Disconnect: Success !!

2023-02-21 14:32:51.008322 from client to server connected state:False
2023-02-21 14:32:51.008598 Stop ModbusTCP Server

クライアント側 → サーバー側へサンプルコード

似たような解説文が増える為、こちらの解説は行いません。

from server import ModbusTCPServer
from client import ModbusTCPClient
import time

ipaddress = "127.0.0.1"
port = 12345

# インスタンス化
modbus_server = ModbusTCPServer(ipaddress=ipaddress, port=port)
modbus_client = ModbusTCPClient(ipaddress=ipaddress, port=port)

print(f"{modbus_client.get_nowtime} from client to server connected state:{modbus_client.get_connect_state}")
# modbusTCPサーバーを立ち上げる
modbus_server.start_server()
# サーバーへの接続を試みる
modbus_client.connect_to_server()
print(f"{modbus_client.get_nowtime} from client to server connected state:{modbus_client.get_connect_state}")

# サーバー側でaddress=0の値が[0]の初期値である事を確認する
modbus_server.get_holding_registers(address=0)
# クライアント側でaddress=0の値を122に設定する
modbus_client.write_single_register(address=0, value=122)
# サーバー側にデータが十分に反映されるまで待つ
time.sleep(2)
# サーバー側でaddress=0の値が[122]である事を確認する
modbus_server.get_holding_registers(address=0)

# サーバー側でaddress=1の値が[0]の初期値である事を確認する
modbus_server.get_holding_registers(address=1)
# クライアント側でaddress=1の値を[1, 2, 3]に設定する
modbus_client.write_multiple_registers(address=1, value=[1, 2, 3])
# サーバー側にデータが十分に反映されるまで待つ
time.sleep(2)
# サーバー側でaddress=1の値が[1, 2, 3]である事を確認する
modbus_server.get_holding_registers(address=1, num=3)

print(f"{modbus_client.get_nowtime} from client to server connected state:{modbus_client.get_connect_state}")
# サーバーとの切断を試みる
modbus_client.disconnect_to_server()
print(f"{modbus_client.get_nowtime} from client to server connected state:{modbus_client.get_connect_state}")
# modbusTCPサーバーを終了させる
modbus_server.stop_server()

出力結果を確認します。

$ python sample_client_to_server.py
2023-02-21 14:20:21.656062 from client to server connected state:False

2023-02-21 14:20:21.656305 Start ModbusTCP Server
2023-02-21 14:20:21.664870 Connect: Success !!

2023-02-21 14:20:21.665610 from client to server connected state:True

2023-02-21 14:20:21.665801 [server] get_holding_registers[address:0]: [0]
2023-02-21 14:20:21.665876 [client] write_single_register[address:0]: 122
2023-02-21 14:20:23.668568 [server] get_holding_registers[address:0]: [122]

2023-02-21 14:20:23.668771 [server] get_holding_registers[address:1]: [0]
2023-02-21 14:20:23.668844 [client] write_multiple_registers[address:1]: [1, 2, 3]
2023-02-21 14:20:25.671624 [server] get_holding_registers[address:1]: [1, 2, 3]

2023-02-21 14:20:25.671815 from client to server connected state:True
2023-02-21 14:20:25.672126 Disconnect: Success !!

2023-02-21 14:20:25.672307 from client to server connected state:False
2023-02-21 14:20:25.672378 Stop ModbusTCP Server

おわりに

pyModbusTCP ライブラリーで ModbusTCP 通信環境を構築しました。
pyModbusTCP ライブラリーについて情報が少なく苦戦しましたが、自分の記事が参考になったら幸いです。

GitHubで編集を提案

Discussion