SysmacとMQTT 通信ライブラリでつくるNX向けの実践的なPUBLISHサービスPOU
はじめに
本記事は、PLC(Programmable Logic Controller)向けのソフトウェア開発に従事、または関心のある方で、Structured Text(ST)による開発に興味がある方向けです。OMRON社のSysmac Studioとコントローラ(NX1またはNX5)を使用します。
今回は、OMRON社が公開しているMQTT通信ライブラリ(SYSMAC-XR020)を使用してプログラム内でMQTT Publishを使いやすくする実践的なPUBLISHサービスPOUを構築します。MQTT通信ライブラリの使い方については言及しません。プロジェクトを実行する場合、必ずMQTT通信ライブラリのマニュアルを手元に置き、参照できるようにしてください。
PLCでのMQTTの使用となると、マニュアルに記載の使用例に近い範囲で、少量の定量メッセージを長周期(secオーダー)、一定間隔な使い方が多いと思います。しかし、PUBLISHサービスPOUとして仕様を決めて構築することで十分な送出容量かつ長期連続使用可能で再利用可能なモジュールにすることができます。
MQTT通信ライブラリのMQTTクライアントは、1コネクションあたり1 Mbps程度の送出量であれば、ネットワークに多少の速度変動があっても、簡単なバッファ管理でデータ喪失を回避できます。1 Mbpsは、4 ms周期タスクで1サイクル当たり500 byteの情報量になります。また、ブローカーが許容するのであれば3コネクション程度まで同時にコネクションを張っても支障ありません。重度のデータ損失回避が必要であれば、SDカードを使用する手段もあります。
PLCにおけるMQTTプロトコルは、情報システム向けの出力に限らず、装置開発・評価のための運転データ収集にも使用できます。運転データ収集はノートPCのDocker上にMosquitto + Telegraf + InfluxDBを構築するだけでも十分な環境になります。動作解析のためではありませんでしたが、製造業向けDXシステムとしている製品の動作テストにておいて、データ収集が仕様通りに行われているかを確認するため、先の構成でInfluxDB上にダッシュボードを作成し、関係者がリアルタイムに両者を確認できるようにしたことがあります。
データ収集となるとOPC UAが候補にあがります。OPC UAにはPub/Sub仕様もありますが、NXに搭載されているOPC UAサーバには機能が実装されていません。また、Pub/Subが実装されたとしてもユーザーにどの程度の自由度が与えられるかは分かりません。Pub/Subの無いOPC UAサーバに対しては、ポーリングでデータ収集を行いますが、短時間に変化する信号が収集対象になると、PLC側で何らかの対応をすることになります。概ね一定期間の履歴を公開し続けるという手段をとりますが、何らかのズレが生じた場合、一部を捨てることになります。捨てたことが分かればよいのですが、それが分からなければ無かったことになります。
MQTTによるデータ収集は万能ではありません。今のところMQTT通信ライブラリが対応しているのは、MQTT Version 3.1.1です。また、ユーザープログラムとして稼働させるのでユーザーが使用可能なメモリとCPU時間を消費します。そうであっても、ユーザーに十分な制御の余地があることは、シビアなデータ収集で有利に働きます。
扱わないこと
次の事項は、実際の使用では十分な確認と検討が必要ですが、MQTT関連は、PLC固有ではなく情報も充実しているため扱いません。メッセージのエンコードは大きななトピックなので機会を改めます。
- MQTTプロトコル
- MQTTブローカー
- トピックの設計
- メッセージのエンコード
Sysmacプロジェクト
MQTTPubServiceのSysmacプロジェクトは以下より入手してください。
使用環境
プロジェクトの実行には、次の環境が必要です。
- コントローラ
NX1かNX5を使用します。 - MQTTブローカー
MQTT Version 3.1.1に準じたブローカーであれば選択は自由です。よほど制約のあるハードウェアで稼働するブローカーでない限り処理能力が問題になるほどのトラフィックには至りません。パブリックのブローカーの使用は避けます。 - MQTTクライアント
好みのMQTTクライアントで支障ありません。 - ネットワーク
ブローカーとコントローラ間のネットワークは大きな遅延のあるものは避けます。Ping値で30 ms程度が目安です。
構築した環境
プロジェクトの構築は、次の環境で行いました。
- コントローラ
NX102-9000 Ver 1.64を使用しました。 - Sysmac Studio
Ver.1.60を使用しました。 - MQTTブローカー
ローカルPCのDocker上のMosquitto(eclipse-mosquitto:2.0.20)を使用しました。 - MQTTクライアント
MQTTXを使用しました。 - ネットワーク
ブローカーとコントローラ間は、Ping値で平均して10 msの遅延がありました。
使用手順
参考としてプロジェクトの使用手順を示します。
-
プロジェクトのファイルハッシュを確認
プロジェクトファイルが破損しているとどうにもなりません。 -
プロジェクトのコントローラ形式を使用するコントローラに合わせる
古いNX1はMQTT通信ライブラリを使用できません。使用可能なバージョンは、MQTT通信ライブラリのマニュアルを確認してください。 -
POU/プログラム/NotifyStatsのPUBLISHサービス設定の変更
ブローカーのアドレス、ポート番号、クライアントIDを使用する環境に合わせます。 -
構成・設定/コントローラ設定/内臓EtherNet/IPポート設定の確認
ブローカーへ到達できる設定に変更します。 -
MQTTブローカーのセットアップと起動
ブローカーを起動し、適当なMQTTクライアントで接続できるか確認します。また、ブローカーのシステムステータスを購読できるか確認します。 -
コントローラ時計を合わせる
接続するSysmac Studioが動作している端末の時計が適切であれば、それを反映します。NTPを使用する場合、同期したか確認します。 -
プロジェクトをコントローラにダウンロード
プログラムを運転モードで動作させ、イベントログにエラーがないか確認します。 -
MQTTクライアントで"machine/stats"を購読
メッセージを購読でき、内容が妥当であることを確認します。
PUBLISHサービスの構成と機能
今回は、MQTTコネクション、Publishワーカー共に1つ、自動Pingチェックと軽微なモニタリングだけのシンプルな機構のPUBLISHサービスを作成します。必要であれば、POU間メッセージング機構と組み合わせて汎POUなサービスに仕上げることや、フォールバック機構を構築してデータ損失回避の程度を高めることもできます。安定してメッセージ発行ができることとは別に機能として以下を要求することにします。
-
各MQTT命令のエラーをトラップできること
障害時に原因を確認するために必要です。また、多くの例外は自己復旧不能ですが、タイムアウト関係のエラーは、待つことで解消する可能性があり、ユーザーがその選択を行えるようにするためにも必要です。 -
ユーザーがバッファ管理を行えること
バッファは、メッセージ送出に対する要求によって異なるためユーザーに実装を含め選択の自由が必要です。 -
例外時にサービスを終了すること
例外時にどうするかはユーザーの判断事項ですが、FB内のMQTT命令は、適切に終了処理を行い、システムに影響を与えないようにします。また、再実行可能な状態にします。 -
送出状態をモニタリングできること
送出スループットには限界があるため、モニタリングが必要です。通信遅延が小さくてもMQTTPub命令は3 cycleを必要とし、コネクションあたりの送出スループットに1000 byte/cycleという限界があることを確認しています。また、MQTTPub命令は、処理にかかるサイクル数について、トピックを含めたペイロードサイズでゼロから500 byte刻みで変化します。送出スループットの低下が生じて何らかのバッファ機構を圧迫するような状況でそれを解消しようとする場合、メッセージサイズで対応するにはその変化点を超えるように減じなければ効果がありません。
MQTTPubServiceの実装
実装は次のようになります。MQTT通信ライブラリのマニュアルや一般に見られるコードに比べると複雑に見えるかもしれませんが、使っているものも、使い方も同じです。異なるのは、そられを使用して必要とする機能を実現しつつFBに実装を隠蔽してもユーザーが制御できるようにするためのコードを含んでいることです。MQTTコネクションの確立と監視を行うMQTTクライアントサービスとメッセージ発行を行うPublishワーカーに分かれています。
Clear(Error);
Clear(ErrorID);
Clear(ErrorIDEx);
Done := FALSE;
IF Enable AND NOT Busy THEN
Busy := TRUE;
iState := STATE_INIT;
END_IF;
// Mqtt client service task.
CASE iState OF
// Init the service.
0: // STATE_INIT
Active := FALSE;
iMqttClient.Enable := FALSE;
iMqttPing.Execute := FALSE;
iMqttPubAryByte.Execute := FALSE;
Clear(iPubWorkerContext);
iAutoPingTimer.In := FALSE;
Clear(iClientReference);
// Setup
iSettings := Settings;
// Broker
iConnectionSettings.IpAdr := iSettings.BrokerAdr;
iConnectionSettings.PortNo := iSettings.BrokerPort;
iConnectionSettings.CleanSession := iSettings.CleanSession;
// Auth, Encrypt
iConnectionSettings.UserName := iSettings.UserName;
iConnectionSettings.Password := iSettings.Password;
iConnectionSettings.TLSUse := iSettings.TLSUse;
iConnectionSettings.TLSSessionName := iSettings.TLSSessionName;
// Will
iConnectionSettings.WillCfg.WillFlag := iSettings.WillFlag;
iConnectionSettings.WillCfg.WillQoS := 0;
iConnectionSettings.WillCfg.WillTopic := iSettings.WillTopic;
iConnectionSettings.WillCfg.WillMsg := iSettings.WillMsg;
iConnectionSettings.WillCfg.WillRetain := iSettings.WillRetain;
// Mqtt client
iMqttClient.ConnectionSettings := iConnectionSettings;
iMqttClient.ClientID := iSettings.ClientID;
iMqttClient.KeepAlive := iSettings.KeepAlive;
iMqttClient.DiscardMsgTime := iSettings.DiscardMsgTime;
iMqttClient.Timeout := iSettings.Timeout;
// Ping
iMqttPing.Timeout := iSettings.AutoPingTimeout;
iAutoPingTimer.PT := iSettings.AutoPingInterval;
Clear(iError);
Clear(iErrorID);
Clear(iErrorIDEx);
Clear(iConnected);
Clear(iPublishing);
Inc(iState);
1:
iMqttClient.Enable := TRUE;
Inc(iState);
2:
IF iMqttClient.Error THEN
iError := TRUE;
iErrorID := iMqttClient.ErrorID;
iErrorIDEx := iMqttClient.ErrorIDEx;
iState := STATE_ERROR;
ELSIF NOT Enable THEN
iState := STATE_SHUTDOWN;
ELSIF iMqttClient.Connected THEN
iState := STATE_PING;
END_IF;
// The service is active.
10: // STATE_ACTIVE
Active := TRUE;
Inc(iState);
11:
IF iMqttClient.Error THEN
iError := TRUE;
iErrorID := iMqttClient.ErrorID;
iErrorIDEx := iMqttClient.ErrorIDEx;
iState := STATE_ERROR;
ELSIF NOT iMqttClient.Connected THEN
iState := STATE_CLIENT_NO_CONNECTION;
ELSIF NOT Enable THEN
iState := STATE_SHUTDOWN;
ELSIF iAutoPingTimer.Q THEN
iAutoPingTimer.In := FALSE;
iState := STATE_PING;
END_IF;
// Mqtt ping test.
20: // STATE_PING
IF NOT iPublishing THEN
iMqttPing.Execute := TRUE;
Inc(iState);
END_IF;
21:
IF iMqttPing.Done OR iMqttPing.Error THEN
iMqttPing.Execute := FALSE;
IF iMqttPing.Error THEN
IF iMqttClient.Error THEN
iError := TRUE;
iErrorID := iMqttClient.ErrorID;
iErrorIDEx := iMqttClient.ErrorIDEx;
ELSE
iError := TRUE;
iErrorID := iMqttPing.ErrorID;
iErrorIDEx := iMqttPing.ErrorID;
END_IF;
iState := STATE_ERROR;
ELSE
iAutoPingTimer.In := iSettings.AutoPing;
Latency := iMqttPing.ElapseTime;
iState := STATE_ACTIVE;
END_IF;
END_IF;
30: // STATE_NO_CONNECTION
IF iMqttClient.Connected THEN
iState := STATE_ACTIVE;
ELSIF iMqttClient.Error THEN
iError := TRUE;
iErrorID := iMqttClient.ErrorID;
iErrorIDEx := iMqttClient.ErrorIDEx;
iState := STATE_ERROR;
ELSIF NOT Enable THEN
iState := STATE_SHUTDOWN;
END_IF;
40: // STATE_ERROR
Error := TRUE;
ErrorID := iErrorID;
ErrorIDEx := iErrorIDEx;
iState := STATE_SHUTDOWN;
// Shutdown the service.
90: // STATE_SHUTDOWN
iMqttClient.Enable := FALSE;
Active := FALSE;
Inc(iState);
91:
IF NOT iMqttClient.Busy THEN
Inc(iState);
END_IF;
92:
IF iPubWorkerContext.State = STATE_PUB_WORKER_INIT
AND NOT iMqttPubAryByte.Busy
THEN
iState := STATE_DONE;
END_IF;
// The service is done.
1000: // STATE_DONE
Busy := FALSE;
Inc(iState);
END_CASE;
// Publish worker task.
// Input to pub worker.
IF Active THEN
CASE iPubWorkerContext.State OF
10: // STATE_PUB_WORKER_WAIT
IF Publish THEN
iPubWorkerContext.Request := Request;
END_IF;
END_CASE;
END_IF;
// Pub worker.
iPubWorkerState := iPubWorkerContext.State;
CASE iPubWorkerContext.State OF
// Init the pub FB.
0: // STATE_PUB_WORKER_INIT
iMqttPubAryByte.Execute := FALSE;
IF iConnected THEN
iPubWorkerState := STATE_PUB_WORKER_WAIT;
END_IF;
// Wait a pub request.
10: // STATE_PUB_WORKER_WAIT
IF NOT iConnected THEN
iPubWorkerState := STATE_PUB_WORKER_INIT;
ELSIF iPubWorkerContext.Request.MessageSize > 0 THEN
iMqttPubAryByte.Topic
:= iPubWorkerContext.Request.Topic;
iMqttPubAryByte.MsgSize
:= iPubWorkerContext.Request.MessageSize;
iFlags.PubQoS
:= USINT_TO_BYTE(iPubWorkerContext.Request.QoS);
iFlags.RetainFlag
:= iPubWorkerContext.Request.RetainFlag;
iMqttPubAryByte.PubSettings := iFlags;
iMqttPubAryByte.Timeout
:= iPubWorkerContext.Request.Timeout;
iMqttPubAryByte.Execute := TRUE;
iPubWorkerContext.PubTick := 0;
iPubWorkerState := STATE_PUB_WORKER_PUBLISHING;
END_IF;
11: // STATE_PUB_WORKER_PUBLISHING
Inc(iPubWorkerContext.PubTick);
IF iMqttPubAryByte.Done OR iMqttPubAryByte.Error THEN
iMqttPubAryByte.Execute := FALSE;
IF iMqttPubAryByte.Error THEN
iPubWorkerContext.Error := TRUE;
iPubWorkerContext.ErrorID
:= iMqttPubAryByte.ErrorID;
iPubWorkerContext.ErrorIDEx
:= iMqttPubAryByte.ErrorIDEx;
iPubWorkerState := STATE_PUB_WORKER_ERROR;
ELSE
iPubWorkerState := STATE_PUB_WORKER_PUBLISHED;
END_IF;
END_IF;
12: // STATE_PUB_WORKER_PUBLISHED
Clear(iPubWorkerContext.Request);
iPubWorkerState := STATE_PUB_WORKER_WAIT;
13: // STATE_PUB_WORKER_ERROR
Clear(iPubWorkerContext.Request);
Clear(iPubWorkerContext.Error);
Clear(iPubWorkerContext.ErrorID);
Clear(iPubWorkerContext.ErrorIDEx);
iPubWorkerState := STATE_PUB_WORKER_INIT;
END_CASE;
iPubWorkerContext.State := iPubWorkerState;
// Output from pub worker.
IF Active THEN
CASE iPubWorkerContext.State OF
11: iPublishing := TRUE;
12: // STATE_PUB_WORKER_PUBLISHED
iPublishing := FALSE;
Done := TRUE;
13: // STATE_PUB_WORKER_ERROR
IF NOT iError THEN
Error := iPubWorkerContext.Error;
ErrorID := iPubWorkerContext.ErrorID;
ErrorIDEx := iPubWorkerContext.ErrorIDEx;
END_IF;
iPublishing := FALSE;
Done := TRUE;
END_CASE;
ELSE
iPublishing := FALSE;
END_IF;
// Execute FBs.
iAutoPingTimer();
iMqttPing(ClientReference:=iClientReference);
iMqttPubAryByte(ClientReference:=iClientReference,
PacketID:=iPubWorkerContext.PacketID,
MsgType:=iPubWorkerContext.MessageType,
PubMsg:=iPubWorkerContext.Request.Message);
iMqttClient(ClientReference:=iClientReference);
iConnected := iMqttClient.Connected;
// Output Throughput.
CASE iPrevState OF
0:
// The minimum aggregation time is 1 second,
// and the maximum is the buffer capacity.
iThroughputWindowSize
:= MAX(TO_UINT(MIN(ThroughputWindowSize, SizeOfAry(iPubAmounts))),
TO_UINT(LINT#1_000_000_000 / TimeToNanoSec(GetMyTaskInterval())));
ELSE
IF Busy THEN
iSize
:= iPubWorkerContext.Request.MessageSize
+ GetByteLen(iPubWorkerContext.Request.Topic);
Throughput
:= MovingAverage(
In:=TO_UINT(SEL(iPubWorkerContext.Stat = STATE_PUB_WORKER_PUBLISHED,
UINT#0,
iSize)),
CurIndex:=iThroughputIndex,
Buf:=iPubAmounts[0],
BufSize:=iThroughputWindowSize,
Q:=iThroughputValid);
ELSE
Throughput := 0;
END_IF;
END_CASE;
// Output State.
CASE iPrevState OF
30: // STATE_NO_CONNECTION
// 1cycle
State := MQTT_PUB_SRV_STATE_NO_CONNECTION;
90: // STATE_SHUTDOWN
// 1cycle
State := MQTT_PUB_SRV_STATE_SHUTDOWN;
ELSE
IF Error THEN
State := MQTT_PUB_SRV_STATE_ERROR;
ELSIF Active THEN
CASE iPubWorkerContext.State OF
11: // STATE_PUB_WORKER_PUBLISHING
// 1cycle
State := MQTT_PUB_SRV_STATE_PUBLISHING;
12: // STATE_PUB_WORKER_PUBLISHED
// 1cycle
State := MQTT_PUB_SRV_STATE_PUBLISHED;
ELSE
State := MQTT_PUB_SRV_STATE_ACTIVE;
END_CASE;
ELSIF Busy THEN
State := MQTT_PUB_SRV_STATE_INACTIVE;
ELSE
State := MQTT_PUB_SRV_STATE_STOP;
END_IF;
END_CASE;
iPrevState := iState;
MQTTクライアントサービス部
MQTTクライアントサービスは、コネクションの確立と監視を行います。今回はありませんが、再接続処理を組み込むと使い勝手がよくなります。Publish/Subscribeワーカーが単一であれば、組み込んでもよいですが、大きくなるようであればPOUを分離します。
初期化節 (STATE_INIT)
初期化節では、変数の初期化とMQTT命令への設定を行った後、MQTTClient命令を実行してコネクションが確立するかを確認します。ブローカーに接続出来た場合、一度Pingを実行して応答遅延を確認します。MQTTClient命令は、接続要求に対するタイムアウト設定があるのですが、そもそもクライアントが存在しない場合はそのタイムアウト設定ではなく、何らかの内部設定時間経過後に例外を発生させるようです。あらかじめブローカーが立ち上がっていることを前提とした作りのようです。
// Init the service.
0: // STATE_INIT
Active := FALSE;
iMqttClient.Enable := FALSE;
iMqttPing.Execute := FALSE;
iMqttPubAryByte.Execute := FALSE;
Clear(iPubWorkerContext);
iAutoPingTimer.In := FALSE;
Clear(iClientReference);
// Setup
iSettings := Settings;
// Broker
iConnectionSettings.IpAdr := iSettings.BrokerAdr;
iConnectionSettings.PortNo := iSettings.BrokerPort;
iConnectionSettings.CleanSession := iSettings.CleanSession;
// Auth, Encrypt
iConnectionSettings.UserName := iSettings.UserName;
iConnectionSettings.Password := iSettings.Password;
iConnectionSettings.TLSUse := iSettings.TLSUse;
iConnectionSettings.TLSSessionName := iSettings.TLSSessionName;
// Will
iConnectionSettings.WillCfg.WillFlag := iSettings.WillFlag;
iConnectionSettings.WillCfg.WillQoS := 0;
iConnectionSettings.WillCfg.WillTopic := iSettings.WillTopic;
iConnectionSettings.WillCfg.WillMsg := iSettings.WillMsg;
iConnectionSettings.WillCfg.WillRetain := iSettings.WillRetain;
// Mqtt client
iMqttClient.ConnectionSettings := iConnectionSettings;
iMqttClient.ClientID := iSettings.ClientID;
iMqttClient.KeepAlive := iSettings.KeepAlive;
iMqttClient.DiscardMsgTime := iSettings.DiscardMsgTime;
iMqttClient.Timeout := iSettings.Timeout;
// Ping
iMqttPing.Timeout := iSettings.AutoPingTimeout;
iAutoPingTimer.PT := iSettings.AutoPingInterval;
Clear(iError);
Clear(iErrorID);
Clear(iErrorIDEx);
Clear(iConnected);
Clear(iPublishing);
Inc(iState);
1:
iMqttClient.Enable := TRUE;
Inc(iState);
2:
IF iMqttClient.Error THEN
iError := TRUE;
iErrorID := iMqttClient.ErrorID;
iErrorIDEx := iMqttClient.ErrorIDEx;
iState := STATE_ERROR;
ELSIF NOT Enable THEN
iState := STATE_SHUTDOWN;
ELSIF iMqttClient.Connected THEN
iState := STATE_PING;
END_IF;
サービス有効節 (STATE_ACTIVE)
サービス有効節は、MQTTClient命令の実行状態を監視します。また、自動Pingが有効であれば定期的にPingを実行します。
// The service is active.
10: // STATE_ACTIVE
Active := TRUE;
Inc(iState);
11:
IF iMqttClient.Error THEN
iError := TRUE;
iErrorID := iMqttClient.ErrorID;
iErrorIDEx := iMqttClient.ErrorIDEx;
iState := STATE_ERROR;
ELSIF NOT iMqttClient.Connected THEN
iState := STATE_CLIENT_NO_CONNECTION;
ELSIF NOT Enable THEN
iState := STATE_SHUTDOWN;
ELSIF iAutoPingTimer.Q THEN
iAutoPingTimer.In := FALSE;
iState := STATE_PING;
END_IF;
Ping節(STATE_PING)
Ping節は、MQTTPing命令を使用して応答遅延を確認します。例外が発生した場合、MQTTClient命令も確認し、例外が発生していればそちらを優先します。
// Mqtt ping test.
20: // STATE_PING
IF NOT iPublishing THEN
iMqttPing.Execute := TRUE;
Inc(iState);
END_IF;
21:
IF iMqttPing.Done OR iMqttPing.Error THEN
iMqttPing.Execute := FALSE;
IF iMqttPing.Error THEN
IF iMqttClient.Error THEN
iError := TRUE;
iErrorID := iMqttClient.ErrorID;
iErrorIDEx := iMqttClient.ErrorIDEx;
ELSE
iError := TRUE;
iErrorID := iMqttPing.ErrorID;
iErrorIDEx := iMqttPing.ErrorID;
END_IF;
iState := STATE_ERROR;
ELSE
iAutoPingTimer.In := iSettings.AutoPing;
Latency := iMqttPing.ElapseTime;
iState := STATE_ACTIVE;
END_IF;
END_IF;
コネクション喪失節 (STATE_NO_CONNECTION)
コネクション喪失節は、MQTTClient命令を監視してコネクションが復旧するか、例外が発生するのを待ちます。
30: // STATE_NO_CONNECTION
IF iMqttClient.Connected THEN
iState := STATE_ACTIVE;
ELSIF iMqttClient.Error THEN
iError := TRUE;
iErrorID := iMqttClient.ErrorID;
iErrorIDEx := iMqttClient.ErrorIDEx;
iState := STATE_ERROR;
ELSIF NOT Enable THEN
iState := STATE_SHUTDOWN;
END_IF;
例外節 (STATE_ERROR)
例外節は、例外情報を出力するに留まります。各節で例外処理を行うのではなく、共通処理となる節を設けることで体系的な例外処理を行いやすくなります。
40: // STATE_ERROR
Error := TRUE;
ErrorID := iErrorID;
ErrorIDEx := iErrorIDEx;
iState := STATE_SHUTDOWN;
シャットダウン節 (STATE_SHUTDOWN)
シャットダウン節は、各MQTT命令の終了とその完了を確認します。
// Shutdown the service.
90: // STATE_SHUTDOWN
iMqttClient.Enable := FALSE;
Active := FALSE;
Inc(iState);
91:
IF NOT iMqttClient.Busy THEN
Inc(iState);
END_IF;
92:
IF iPubWorkerContext.State = STATE_PUB_WORKER_INIT
AND NOT iMqttPubAryByte.Busy
THEN
iState := STATE_DONE;
END_IF;
完了節 (STATE_DONE)
完了節は、Busyを落としユーザーにサービス停止を通知します。
// The service is done.
1000: // STATE_DONE
Busy := FALSE;
Inc(iState);
Publishワーカー部
Publishワーカーは、MQTTPub命令でメッセージ送出を行います。MQTTクライアントサービスとは同一POU内ですが、できる限り独立したものとして構築します。そのため、MQTTクライアントサービスとの間に入出力を設けます。独立させることで、ワーカーの多重化やPOUへの切り出しが行いやすくなります。スループットの最大化を目指す場合、ワーカーの多重化は必須です。コネクションに対して複数のMQTTPub命令実行を許容しているため、常に一つのMQTTPub命令が実行されるよう、飽和状態を作ることになるためです。
// Publish worker task.
// Input to pub worker.
IF Active THEN
CASE iPubWorkerContext.State OF
10: // STATE_PUB_WORKER_WAIT
IF Publish THEN
iPubWorkerContext.Request := Request;
END_IF;
END_CASE;
END_IF;
// Pub worker.
iPubWorkerState := iPubWorkerContext.State;
CASE iPubWorkerContext.State OF
// Init the pub FB.
0: // STATE_PUB_WORKER_INIT
iMqttPubAryByte.Execute := FALSE;
IF iConnected THEN
iPubWorkerState := STATE_PUB_WORKER_WAIT;
END_IF;
// Wait a pub request.
10: // STATE_PUB_WORKER_WAIT
IF NOT iConnected THEN
iPubWorkerState := STATE_PUB_WORKER_INIT;
ELSIF iPubWorkerContext.Request.MessageSize > 0 THEN
iMqttPubAryByte.Topic
:= iPubWorkerContext.Request.Topic;
iMqttPubAryByte.MsgSize
:= iPubWorkerContext.Request.MessageSize;
iFlags.PubQoS
:= USINT_TO_BYTE(iPubWorkerContext.Request.QoS);
iFlags.RetainFlag
:= iPubWorkerContext.Request.RetainFlag;
iMqttPubAryByte.PubSettings := iFlags;
iMqttPubAryByte.Timeout
:= iPubWorkerContext.Request.Timeout;
iMqttPubAryByte.Execute := TRUE;
iPubWorkerContext.PubTick := 0;
iPubWorkerState := STATE_PUB_WORKER_PUBLISHING;
END_IF;
11: // STATE_PUB_WORKER_PUBLISHING
Inc(iPubWorkerContext.PubTick);
IF iMqttPubAryByte.Done OR iMqttPubAryByte.Error THEN
iMqttPubAryByte.Execute := FALSE;
IF iMqttPubAryByte.Error THEN
iPubWorkerContext.Error := TRUE;
iPubWorkerContext.ErrorID
:= iMqttPubAryByte.ErrorID;
iPubWorkerContext.ErrorIDEx
:= iMqttPubAryByte.ErrorIDEx;
iPubWorkerState := STATE_PUB_WORKER_ERROR;
ELSE
iPubWorkerState := STATE_PUB_WORKER_PUBLISHED;
END_IF;
END_IF;
12: // STATE_PUB_WORKER_PUBLISHED
Clear(iPubWorkerContext.Request);
iPubWorkerState := STATE_PUB_WORKER_WAIT;
13: // STATE_PUB_WORKER_ERROR
Clear(iPubWorkerContext.Request);
Clear(iPubWorkerContext.Error);
Clear(iPubWorkerContext.ErrorID);
Clear(iPubWorkerContext.ErrorIDEx);
iPubWorkerState := STATE_PUB_WORKER_INIT;
END_CASE;
iPubWorkerContext.State := iPubWorkerState;
Publishワーカー入力部
Publishワーカー入力部は、PublishワーカーへPublishRequestの投入を行います。Publishワーカー入力部自体は、MQTTクライアントサービスの一部です。
// Input to pub worker.
IF Active THEN
CASE iPubWorkerContext.State OF
10: // STATE_PUB_WORKER_WAIT
IF Publish THEN
iPubWorkerContext.Request := Request;
END_IF;
END_CASE;
END_IF;
Publishワーカー
Publishワーカーは、PublishRequestがあればMQTTPub命令でブローカーに送出します。"iConnected"はMQTTクライアントの接続状態です。
// Pub worker.
iPubWorkerState := iPubWorkerContext.State;
CASE iPubWorkerContext.State OF
// Init the pub FB.
0: // STATE_PUB_WORKER_INIT
iMqttPubAryByte.Execute := FALSE;
IF iConnected THEN
iPubWorkerState := STATE_PUB_WORKER_WAIT;
END_IF;
// Wait a pub request.
10: // STATE_PUB_WORKER_WAIT
IF NOT iConnected THEN
iPubWorkerState := STATE_PUB_WORKER_INIT;
ELSIF iPubWorkerContext.Request.MessageSize > 0 THEN
iMqttPubAryByte.Topic
:= iPubWorkerContext.Request.Topic;
iMqttPubAryByte.MsgSize
:= iPubWorkerContext.Request.MessageSize;
iFlags.PubQoS
:= USINT_TO_BYTE(iPubWorkerContext.Request.QoS);
iFlags.RetainFlag
:= iPubWorkerContext.Request.RetainFlag;
iMqttPubAryByte.PubSettings := iFlags;
iMqttPubAryByte.Timeout
:= iPubWorkerContext.Request.Timeout;
iMqttPubAryByte.Execute := TRUE;
iPubWorkerContext.PubTick := 0;
iPubWorkerState := STATE_PUB_WORKER_PUBLISHING;
END_IF;
11: // STATE_PUB_WORKER_PUBLISHING
Inc(iPubWorkerContext.PubTick);
IF iMqttPubAryByte.Done OR iMqttPubAryByte.Error THEN
iMqttPubAryByte.Execute := FALSE;
IF iMqttPubAryByte.Error THEN
iPubWorkerContext.Error := TRUE;
iPubWorkerContext.ErrorID
:= iMqttPubAryByte.ErrorID;
iPubWorkerContext.ErrorIDEx
:= iMqttPubAryByte.ErrorIDEx;
iPubWorkerState := STATE_PUB_WORKER_ERROR;
ELSE
iPubWorkerState := STATE_PUB_WORKER_PUBLISHED;
END_IF;
END_IF;
12: // STATE_PUB_WORKER_PUBLISHED
Clear(iPubWorkerContext.Request);
iPubWorkerState := STATE_PUB_WORKER_WAIT;
13: // STATE_PUB_WORKER_ERROR
Clear(iPubWorkerContext.Request);
Clear(iPubWorkerContext.Error);
Clear(iPubWorkerContext.ErrorID);
Clear(iPubWorkerContext.ErrorIDEx);
iPubWorkerState := STATE_PUB_WORKER_INIT;
END_CASE;
iPubWorkerContext.State := iPubWorkerState;
Publishワーカー出力部
Publishワーカー出力部は、Publishワーカーの処理状態をMQTTクライアントサービスに出力します。Publishワーカー出力部自体は、MQTTクライアントサービスの一部です。
// Output from pub worker.
IF Active THEN
CASE iPubWorkerContext.State OF
11: iPublishing := TRUE;
12: // STATE_PUB_WORKER_PUBLISHED
iPublishing := FALSE;
Done := TRUE;
13: // STATE_PUB_WORKER_ERROR
IF NOT iError THEN
Error := iPubWorkerContext.Error;
ErrorID := iPubWorkerContext.ErrorID;
ErrorIDEx := iPubWorkerContext.ErrorIDEx;
END_IF;
iPublishing := FALSE;
Done := TRUE;
END_CASE;
ELSE
iPublishing := FALSE;
END_IF;
FB実行部
FB実行部は、FBを1cycleに1回だけ実行することを保証します。FBは同一周期中の複数回実行を保証したものでない限り、1cycleに1回だけ実行します。特にネットワーク命令は、同一周期中の複数回実行や実行のスキップによって障害を起こしやすいです。
// Execute FBs.
iAutoPingTimer();
iMqttPing(ClientReference:=iClientReference);
iMqttPubAryByte(ClientReference:=iClientReference,
PacketID:=iPubWorkerContext.PacketID,
MsgType:=iPubWorkerContext.MessageType,
PubMsg:=iPubWorkerContext.Request.Message);
iMqttClient(ClientReference:=iClientReference);
iConnected := iMqttClient.Connected;
スループット算出部
スループット算出部は、送出スループットを算出します。Publishワーカー同様、MQTTクライアントサービスとは分けて構築します。独立させるのは、モニタリング機能は要求によっては肥大する可能性があり、POUを分離して切り替えられるようにするためです。
// Output Throughput.
CASE iPrevState OF
0:
// The minimum aggregation time is 1 second,
// and the maximum is the buffer capacity.
iThroughputWindowSize
:= MAX(TO_UINT(MIN(ThroughputWindowSize, SizeOfAry(iPubAmounts))),
TO_UINT(LINT#1_000_000_000 / TimeToNanoSec(GetMyTaskInterval())));
ELSE
IF Busy THEN
iSize
:= iPubWorkerContext.Request.MessageSize
+ GetByteLen(iPubWorkerContext.Request.Topic);
Throughput
:= MovingAverage(
In:=TO_UINT(SEL(iPubWorkerContext.Stat = STATE_PUB_WORKER_PUBLISHED,
UINT#0,
iSize)),
CurIndex:=iThroughputIndex,
Buf:=iPubAmounts[0],
BufSize:=iThroughputWindowSize,
Q:=iThroughputValid);
ELSE
Throughput := 0;
END_IF;
END_CASE;
状態出力部
状態出力部は、ユーザーがMQTTPubServiceの制御をステートマシンとして構築できるように状態値を出力します。付加的な機能であるため、スループット算出部同様、分けて構築します。
// Output State.
CASE iPrevState OF
30: // STATE_NO_CONNECTION
// 1cycle
State := MQTT_PUB_SRV_STATE_NO_CONNECTION;
90: // STATE_SHUTDOWN
// 1cycle
State := MQTT_PUB_SRV_STATE_SHUTDOWN;
ELSE
IF Error THEN
State := MQTT_PUB_SRV_STATE_ERROR;
ELSIF Active THEN
CASE iPubWorkerContext.State OF
11: // STATE_PUB_WORKER_PUBLISHING
// 1cycle
State := MQTT_PUB_SRV_STATE_PUBLISHING;
12: // STATE_PUB_WORKER_PUBLISHED
// 1cycle
State := MQTT_PUB_SRV_STATE_PUBLISHED;
ELSE
State := MQTT_PUB_SRV_STATE_ACTIVE;
END_CASE;
ELSIF Busy THEN
State := MQTT_PUB_SRV_STATE_INACTIVE;
ELSE
State := MQTT_PUB_SRV_STATE_STOP;
END_IF;
END_CASE;
iPrevState := iState;
状態は、次のように遷移します。
ERROR状態への移行が少し複雑です。ERROR状態は、いずれかのMQTT命令で例外が発生した場合に遷移するので、エラーコード(ErrorID)で原因を確認します。エラーコードと拡張エラーコード(ErrorIDEx)は、MQTT通信ライブラリのマニュアルを参照してください。各状態は次になります。
状態 | 説明 |
---|---|
STOP | サービスは停止しています。 |
INACTIVE | サービスは稼働していますが、使用できません。 |
ACTIVE | メッセージを発行可能です。 |
NO_CONNECTION | ブローカーとのコネクションを喪失しています。 |
PUBLISHING | メッセージを発行します。 |
PUBLISHED | メッセージの発行が完了しました。 |
SHTDOWN | サービスをシャットダウンします。 |
ERROR | 例外が発生しました。ErrorID、ErrorIDExを確認してください。 |
MQTTPubServiceの使用
作成したPUBLISHサービスPOUの動作確認用のコードを実装します。MQTT通信ライブラリを使用したコードはシミュレータでは動作しないため実機で動作確認を行います。
ユーザーコードの実装
今回のMQTTPubServiceを使用するユーザーコードは、初回サイクルでMQTTPubServiceの設定を行い、以後、MQTTPubServiceで例外が発生しない限りメッセージを送出し続けます。メッセージは、全サイクルのタスク情報とメッセージ送出直前のMQTTPubServiceの情報です。メッセージサイズは、送出処理が4 cycleで完了すれば約1.3 KBです。PublishRequestのバッファは設けていませんが、PublishRequestのメッセージバッファが送出遅延に対するバッファとして機能します。
IF P_First_Run THEN
// Configure mqtt pub service.
iServiceName := 'mqtt_pub_1';
// Connection
iMqttSettings.BrokerAdr := '192.168.40.71';
iMqttSettings.BrokerAdr := '192.168.40.248';
iMqttSettings.BrokerPort := 1883;
iMqttSettings.ClientID := 'nx_0001';
iMqttSettings.CleanSession := TRUE;
iMqttSettings.KeepAlive := 60; //sec
iMqttSettings.Timeout := 5; //sec
iMqttSettings.DiscardMsgTime := 1000; //ms
// Auth, Encrypt
iMqttSettings.UserName := '';
iMqttSettings.Password := '';
iMqttSettings.TLSUse := FALSE;
iMqttSettings.TLSSessionName := '';
// Will
iMqttSettings.WillFlag := TRUE;
iMqttSettings.WillTopic
:= CONCAT('notify/offline/', iServiceName);
iMqttSettings.WillMsg
:= CONCAT('[info]$L',
'type = "machine/service"$L',
CONCAT('entity = "',iServiceName, '"$L'),
'origin = "broker"$L',
'message = "offline"$L');
iMqttSettings.WillQos := 1;
iMqttSettings.WillRetain := FALSE;
// Auto ping
iMqttSettings.AutoPing := TRUE;
iMqttSettings.AutoPingInterval := TIME#10s;
iMqttSettings.AutoPingTimeout := 1000; //ms
iMqttPubService.Settings := iMqttSettings;
// Configure data collection.
MqttPubRequest_Init(iPubRequest);
iTimestampFrom := GetTime();
iUUIDv7.UtcOffset := TIME#+9h;
iUUIDv7.UseExtClockPrecision := FALSE;
iTrushMessageBufferUsageRate := REAL#0.80;
iMaxThroughput := REAL#950.0;
END_IF;
// Add task stats to the request.
GetMyTaskStatus(
LastExecTime=>iLastExecTime,
MaxExecTime=>iMaxExecTime,
ExecCount=>iExecCount,
Exceeded=>iExceeded,
ExceedCount=>iExceedCount);
iUUIDv7(Execute:=TRUE,
Timestamp:=GetTime(),
Out=>iID);
MqttPubRequest_AddString(
iPubRequest,
'[[stats]]$L');
MqttPubRequest_AddString(
iPubRequest,
CONCAT('id = "', iID, '"$L',
'type = "machine/task"$L',
'entity = "CommTask"$L'));
MqttPubRequest_AddString(
iPubRequest,
CONCAT('exceeded = ',
SEL(iExceeded, 'false', 'true'), '$L'));
MqttPubRequest_AddString(
iPubRequest,
CONCAT('exceeded_count = ',
UDINT_TO_STRING(iExceedCount), '$L'));
MqttPubRequest_AddString(
iPubRequest,
CONCAT('exec_count = ',
UDINT_TO_STRING(iExecCount), '$L'));
MqttPubRequest_AddString(
iPubRequest,
CONCAT('last_exec_time = ',
LINT_TO_STRING(TimeToNanoSec(iLastExecTime)), '$L'));
MqttPubRequest_AddString(
iPubRequest,
CONCAT('max_exec_time = ',
LINT_TO_STRING(TimeToNanoSec(iMaxExecTime)), '$L'));
MqttPubRequest_AddString(iPubRequest, '$L');
// Add mqtt pub service stats to the request.
CASE iMqttPubService.State OF
MQTT_PUB_SRV_STATE_ACTIVE:
iUUIDv7(Execute:=TRUE,
Timestamp:=GetTime(),
Out=>iID);
MqttPubRequest_AddString(
iPubRequest,
'[[stats]]$L');
MqttPubRequest_AddString(
iPubRequest,
CONCAT(
'id = "', iID, '"$L',
'type = "machine/service"$L',
CONCAT('entity = "', iServiceName, '"$L')));
MqttPubRequest_AddString(
iPubRequest,
CONCAT('latency = ',
UINT_TO_STRING(iMqttPubService.Latency), '$L'));
MqttPubRequest_AddString(
iPubRequest,
CONCAT('throughput = ',
UINT_TO_STRING(iMqttPubService.Throughput), '$L'));
iThroughputUsage
:= TO_REAL(iMqttPubService.Throughput) / iMaxThroughput;
MqttPubRequest_AddString(
iPubRequest,
CONCAT('throughput_usage = ',
RealToFormatString(In:=iThroughputUsage,
Exponent:=FALSE,
Sign:=FALSE,
MinLen:=3,
DecPlace:=2),
'$L'));
iBufferUsage
:= TO_REAL(MqttPubRequest_MessageSize(iPubRequest) + 22)
/ TO_REAL(MqttPubRequest_MessageCapacity(iPubRequest));
MqttPubRequest_AddString(
iPubRequest,
CONCAT('buffer_usage = ',
RealToFormatString(In:=iBufferUsage,
Exponent:=FALSE,
Sign:=FALSE,
MinLen:=3,
DecPlace:=2),
'$L'));
MqttPubRequest_AddString(iPubRequest, '$L');
END_CASE;
// Control mqtt pub service.
CASE iMqttPubService.State OF
MQTT_PUB_SRV_STATE_STOP:
// If the service is stopped, it will start immediately.
iMqttPubService.Enable := TRUE;
MQTT_PUB_SRV_STATE_ERROR:
// If an error occurs, the service will be stopped
// regardless of the content.
iMqttPubService.Enable := FALSE;
iMqttPubService.Publish := FALSE;
MQTT_PUB_SRV_STATE_INACTIVE,
MQTT_PUB_SRV_STATE_NO_CONNECTION:
// When the buffer capacity exceeds the threshold value
// while data cannot be sent, the data is discarded.
iBufferUsage
:= TO_REAL(MqttPubRequest_MessageSize(iPubRequest))
/ TO_REAL(MqttPubRequest_MessageCapacity(iPubRequest));
IF iBufferUsage > iTrushMessageBufferUsageRate THEN
MqttPubRequest_Init(iPubRequest);
END_IF;
MQTT_PUB_SRV_STATE_ACTIVE:
IF NOT MqttPubRequest_IsEmpty(iPubRequest) THEN
// Add timestamp info to the request.
iTimestampTo := GetTime();
MqttPubRequest_AddString(
iPubRequest,
'[timestamp]$L');
MqttPubRequest_AddString(
iPubRequest,
CONCAT('from = ',
LEFT(REPLACE(DtToString(iTimestampFrom), 'T', 1, 11), 26),
'+09:00$L'));
MqttPubRequest_AddString(
iPubRequest,
CONCAT('to = ',
LEFT(REPLACE(DtToString(iTimestampTo), 'T', 1, 11), 26),
'+09:00$L'));
MqttPubRequest_AddString(iPubRequest, '$L');
MqttPubRequest_Build(
Request:=iPubRequest,
Topic:='machine/stats',
QoS:=0,
RetainFlag:=FALSE,
Timeout:=0);
iMqttPubService.Request := iPubRequest;
iMqttPubService.Publish := TRUE;
// Clear the request and start the next collection.
MqttPubRequest_Init(iPubRequest);
iTimestampFrom := iTimestampTo;
END_IF;
MQTT_PUB_SRV_STATE_PUBLISHED:
// After sending the message, drop the flag.
iMqttPubService.Publish := FALSE;
END_CASE;
iMqttPubService();
初期化部
PUBLISHサービスの設定とデータ収集に関連する変数の初期化を行います。MQTT通信ライブラリのMQTTClientは、名前解決も可能です。名前解決を使用する場合、コントローラにDNSサーバまたは、"ホスト-IPアドレス"を設定します。TLSの使用は通信ライブラリのマニュアルを参照してください。TLS接続はコントローラに対して設定を行います。設定内容は、セキュアな情報以外、コード内にコメントとして残しておくと確認の負担が減ります。
IF P_First_Run THEN
// Configure mqtt pub service.
iServiceName := 'mqtt_pub_1';
// Connection
iMqttSettings.BrokerAdr := 'YOUR_BROKER_ADR';
iMqttSettings.BrokerPort := 1883;
iMqttSettings.ClientID := 'YOUR_CLIENT_ID';
iMqttSettings.CleanSession := TRUE;
iMqttSettings.KeepAlive := 60; //sec
iMqttSettings.Timeout := 5; //sec
iMqttSettings.DiscardMsgTime := 1000; //ms
// Auth, Encrypt
iMqttSettings.UserName := '';
iMqttSettings.Password := '';
iMqttSettings.TLSUse := FALSE;
iMqttSettings.TLSSessionName := '';
// Will
iMqttSettings.WillFlag := TRUE;
iMqttSettings.WillTopic
:= CONCAT('notify/offline/', iServiceName);
iMqttSettings.WillMsg
:= CONCAT('[info]$L',
'type = "machine/service"$L',
CONCAT('entity = "',iServiceName, '"$L'),
'origin = "broker"$L',
'message = "offline"$L');
iMqttSettings.WillQos := 1;
iMqttSettings.WillRetain := FALSE;
// Auto ping
iMqttSettings.AutoPing := TRUE;
iMqttSettings.AutoPingInterval := TIME#10s;
iMqttSettings.AutoPingTimeout := 1000; //ms
iMqttPubService.Settings := iMqttSettings;
// Configure data collection.
MqttPubRequest_Init(iPubRequest);
iTimestampFrom := GetTime();
iUUIDv7.UtcOffset := TIME#+9h;
iUUIDv7.UseExtClockPrecision := FALSE;
iTrushMessageBufferUsageRate := REAL#0.80;
iMaxThroughput := REAL#750.0;
END_IF;
データ収集部
データ収集部は、メッセージのコンテンツを作成します。MqttPubRequest構造体用のFUNを使用していますが、メッセージに値を追加し続けるだけです。メッセージの形式は、べた書きしやすく読みやすいTOML形式です。実際の運用ではJSON形式か効率的なメッセージフォーマットにエンコードします。
// Add task stats to the request.
GetMyTaskStatus(
LastExecTime=>iLastExecTime,
MaxExecTime=>iMaxExecTime,
ExecCount=>iExecCount,
Exceeded=>iExceeded,
ExceedCount=>iExceedCount);
iUUIDv7(Execute:=TRUE,
Timestamp:=GetTime(),
Out=>iID);
MqttPubRequest_AddString(
iPubRequest,
'[[stats]]$L');
MqttPubRequest_AddString(
iPubRequest,
CONCAT('id = "', iID, '"$L',
'type = "machine/task"$L',
'entity = "CommTask"$L'));
MqttPubRequest_AddString(
iPubRequest,
CONCAT('exceeded = ',
SEL(iExceeded, 'false', 'true'), '$L'));
MqttPubRequest_AddString(
iPubRequest,
CONCAT('exceeded_count = ',
UDINT_TO_STRING(iExceedCount), '$L'));
MqttPubRequest_AddString(
iPubRequest,
CONCAT('exec_count = ',
UDINT_TO_STRING(iExecCount), '$L'));
MqttPubRequest_AddString(
iPubRequest,
CONCAT('last_exec_time = ',
LINT_TO_STRING(TimeToNanoSec(iLastExecTime)), '$L'));
MqttPubRequest_AddString(
iPubRequest,
CONCAT('max_exec_time = ',
LINT_TO_STRING(TimeToNanoSec(iMaxExecTime)), '$L'));
MqttPubRequest_AddString(iPubRequest, '$L');
// Add mqtt pub service stats to the request.
CASE iMqttPubService.State OF
MQTT_PUB_SRV_STATE_ACTIVE:
iUUIDv7(Execute:=TRUE,
Timestamp:=GetTime(),
Out=>iID);
MqttPubRequest_AddString(
iPubRequest,
'[[stats]]$L');
MqttPubRequest_AddString(
iPubRequest,
CONCAT(
'id = "', iID, '"$L',
'type = "machine/service"$L',
CONCAT('entity = "', iServiceName, '"$L')));
MqttPubRequest_AddString(
iPubRequest,
CONCAT('latency = ',
UINT_TO_STRING(iMqttPubService.Latency), '$L'));
MqttPubRequest_AddString(
iPubRequest,
CONCAT('throughput = ',
UINT_TO_STRING(iMqttPubService.Throughput), '$L'));
iThroughputUsage
:= TO_REAL(iMqttPubService.Throughput) / iMaxThroughput;
MqttPubRequest_AddString(
iPubRequest,
CONCAT('throughput_usage = ',
RealToFormatString(In:=iThroughputUsage,
Exponent:=FALSE,
Sign:=FALSE,
MinLen:=3,
DecPlace:=2),
'$L'));
iBufferUsage
:= TO_REAL(MqttPubRequest_MessageSize(iPubRequest) + 22)
/ TO_REAL(MqttPubRequest_MessageCapacity(iPubRequest));
MqttPubRequest_AddString(
iPubRequest,
CONCAT('buffer_usage = ',
RealToFormatString(In:=iBufferUsage,
Exponent:=FALSE,
Sign:=FALSE,
MinLen:=3,
DecPlace:=2),
'$L'));
MqttPubRequest_AddString(iPubRequest, '$L');
END_CASE;
PUBLISHサービス制御部
PUBLISHサービス制御部では、PUBLISHサービスの制御を行います。今回は、PUBLISHサービスがメッセージ発行可能になったらすぐにメッセージを発行するようにします。例外発生時にはサービスを停止させ、即座に再起動します。そのため、ブローカーと接続できない状態では、STOP->INACTIVE->ERROR->SHUTDOWN->INACTIVE->STOPを繰り返します。サービスが無効、あるいはコネクションを喪失している場合はPublishRequestのメッセージバッファ残量を確認して閾値を超えたらメッセージを破棄します。
// Control mqtt pub service.
CASE iMqttPubService.State OF
MQTT_PUB_SRV_STATE_STOP:
// If the service is stopped, it will start immediately.
iMqttPubService.Enable := TRUE;
MQTT_PUB_SRV_STATE_ERROR:
// If an error occurs, the service will be stopped
// regardless of the content.
iMqttPubService.Enable := FALSE;
iMqttPubService.Publish := FALSE;
MQTT_PUB_SRV_STATE_INACTIVE,
MQTT_PUB_SRV_STATE_NO_CONNECTION:
// When the buffer capacity exceeds the threshold value
// while data cannot be sent, the data is discarded.
iBufferUsage
:= TO_REAL(MqttPubRequest_MessageSize(iPubRequest))
/ TO_REAL(MqttPubRequest_MessageCapacity(iPubRequest));
IF iBufferUsage > iTrushMessageBufferUsageRate THEN
MqttPubRequest_Init(iPubRequest);
END_IF;
MQTT_PUB_SRV_STATE_ACTIVE:
IF NOT MqttPubRequest_IsEmpty(iPubRequest) THEN
// Add timestamp info to the request.
iTimestampTo := GetTime();
MqttPubRequest_AddString(
iPubRequest,
'[timestamp]$L');
MqttPubRequest_AddString(
iPubRequest,
CONCAT('from = ',
LEFT(REPLACE(DtToString(iTimestampFrom), 'T', 1, 11), 26),
'+09:00$L'));
MqttPubRequest_AddString(
iPubRequest,
CONCAT('to = ',
LEFT(REPLACE(DtToString(iTimestampTo), 'T', 1, 11), 26),
'+09:00$L'));
MqttPubRequest_AddString(iPubRequest, '$L');
MqttPubRequest_Build(
Request:=iPubRequest,
Topic:='machine/stats',
QoS:=0,
RetainFlag:=FALSE,
Timeout:=0);
iMqttPubService.Request := iPubRequest;
iMqttPubService.Publish := TRUE;
// Clear the request and start the next collection.
MqttPubRequest_Init(iPubRequest);
iTimestampFrom := iTimestampTo;
END_IF;
MQTT_PUB_SRV_STATE_PUBLISHED:
// After sending the message, drop the flag.
iMqttPubService.Publish := FALSE;
END_CASE;
iMqttPubService();
ユーザーコードの実行
ユーザーコードを次のようなタスクで実行します。
タスク設定
プログラムの割り付け設定
プロジェクトを構築した環境で24時間稼働させると、次のようになりました。
タスク実行時間モニタ
タスク実行時間の最大値を記録しているのは、送出開始のタイミングです。MQTTPubServiceは、PublishRequestを入力変数としているので入出力変数にして改善を確認できれば、できる限り変数コピーを行わない実装に変更するのがよいかもしれません。改善が期待できなさそうであれば、MQTT通信ライブラリが消費していることになるので受け入れるしかありません。
MQTTXでブローカーに接続してメッセージを確認するとメッセージが途切れることなく送出されていることが確認できました。メッセージ破棄のカウントを忘れてしまったのですが、そもそも今回のユーザーコードはメッセージの欠損には寛容です。また、欠損はメッセージのtimestamp値で検出できます。ブローカーが発行するステータスは次のようになりました。
ブローカーのステータスメッセージ
稼働時間内のメッセージ発行回数は、毎秒49.7回なので送出できていないことがあるか、どこかのタイミングで送出できていない時間があったのかもしれません。しかし、Sysmac Studioでメッセージサイズをトレースしてみると、4 cycleで送出処理が完了しないことがそれなりにあることが確認できました。
メッセージサイズのトレース
自動PingとしてMQTTPingが定期的にコネクションを使用するので当然と言えば当然です。トレースを確認する限り、定間隔で5 cycle以上の送出処理が生じているわけではないので、MQTTPingだけが原因ではなさそうです。通信環境の変化、PLCランタイム処理の少しの変動要因で次サイクルへの完了持越しなど、ユーザーにはどうにもできない要因もあります。安定した環境でもバッファは必須です。ブローカーのステータスとして、"$SYS/broker/load/publish/received/+"を購読してメッセージ発行回数の移動平均を確認するとほぼ毎秒3000回でした。メッセージは次のものが送られてきました。
[[stats]]
id = "01940e1f-ed4c-7c88-a0f5-1ee3b6f3569f"
type = "machine/task"
entity = "CommTask"
exceeded = false
exceeded_count = 0
exec_count = 21833049
last_exec_time = 1185825
max_exec_time = 1440355
[[stats]]
id = "01940e1f-ed50-7c0d-8bab-59c0d18e7575"
type = "machine/task"
entity = "CommTask"
exceeded = false
exceeded_count = 0
exec_count = 21833050
last_exec_time = 508740
max_exec_time = 1440355
[[stats]]
id = "01940e1f-ed54-7c3f-89a9-e2524db43a82"
type = "machine/task"
entity = "CommTask"
exceeded = false
exceeded_count = 0
exec_count = 21833051
last_exec_time = 431215
max_exec_time = 1440355
[[stats]]
id = "01940e1f-ed58-7c0d-baf7-55dec5fd239c"
type = "machine/task"
entity = "CommTask"
exceeded = false
exceeded_count = 0
exec_count = 21833052
last_exec_time = 473090
max_exec_time = 1440355
[[stats]]
id = "01940e1f-ed5c-7c33-9357-fb0a2a99753d"
type = "machine/task"
entity = "CommTask"
exceeded = false
exceeded_count = 0
exec_count = 21833053
last_exec_time = 549755
max_exec_time = 1440355
[[stats]]
id = "01940e1f-ed5c-7f37-9069-abe4b6df8ea7"
type = "machine/service"
entity = "mqtt_pub_1"
latency = 39
throughput = 259
throughput_usage = 0.27
buffer_usage = 0.02
[timestamp]
from = 2024-12-29T01:36:00.969148+09:00
to = 2024-12-29T01:36:00.989147+09:00
スループットが259 byte/cycle(518 Kbps)なのでまだ余力があります。また、効率的なメッセージフォーマットの使用で情報量はさらに向上できます。レイテンシーについては、PLCラインタイム部分がブラックボックスなので過度の期待はできません。スループットとレイテンシーは相反関係にあります。ブローカーの設定もスループットをとるかレイテンシーをとるかで変わります。
装置に搭載するには
簡単なデータ収集であれば、メッセージのエンコード処理に気をつけることでそのまま使用できます。PUBLISHサービスが動作するタスクとは別のタスクでデータ収集を行う場合、適当なキュー構造を作成して、データの受け渡しを行います。また、ブローカーに対してメッセージ発行が出来ない場合にどうするかを検討します。何らかの対処を行う場合、必ずその対処が機能するかを実機でテストします。TLSやUser/Passwordを使用する場合、クレデンシャル情報の扱いに気をつけます。
高負荷なデータ収集は、対象となる装置の制御要件とデータ収集要件次第です。今回の実装をそのまま使用することもできるかもしれませんが、恐らくバッファ機構の内包と処理の効率化が必要になります。そうなると、実装に手を加える必要があります。どの程度の要件であれば実現できるかを簡単に述べることはできません。対象となる装置の制御コードがタスク設計も行われていないようなものであれば、まずはシステムとしての設計が先になります。それも難しいとなれば、コントローラがNX5でなければNX5で余裕が出来るか確認します。しかしながら、そのような状況になるのであれば、他の分野についても危うい状況にあると推測されるので、各分野について適切な協力者と共にプロジェクトに当たるほうが有益です。
まとめ
MQTTプロトコルは使い勝手のよいツールです。MQTTプロトコルに限ったことではありませんが、PLC側で少し仕事をするだけで情報システム側の選択肢を大きく広げることができます。そうすることで、できることの幅も広がります。重要なのは、激しい競争を生き抜いているツールやそこで見出される知見を直接に活用でき、それらに長けた人の協力も得やすくなることです。
MQTTのPublishは、スループットとメッセージのエンコードについてもう少し触れておく必要があります。スループットについては、MQTT通信ライブラリの性能的な限界の確認、スループットとレイテンシーの関係、メッセージのエンコードはJSON形式や他のバイナリフォーマットへの変換についてです。
Discussion