🌽

リングバッファの機能を彫り出す

に公開

はじめに

本記事は、主にPLC(Programmable Logic Controller)向けのソフトウェア開発に従事、または関心のある方で、Structured Text(ST)による開発に興味がある方向けです。OMRON社のSysmac Studioとコントローラ(NX1またはNX5)を使用します。

今回は、リングバッファです。以前リングバッファについて述べた内容を発展させ、まともに使える実装にまとめました。配列上を巡回する2つの位置指示子に明示的な要素数を導入して
リングバッファとしましたが、その要素数を分解します。これにより、リングバッファとしての機能性が増し、使い勝手が良くなります。

今回実装したリングバッファ(RingBuffer)は、リングバッファが保持する要素数をリングバッファ初期化時からの累積書き込み数と累積読み出し数の差とします。これにより、バッファの読み手と書き手がそれぞれ1つである限り、ロック(ミューテックス)を使用しなくてもタスクセーフに使用できるようになります。読み手と書き手双方からの書き込みを伴う共有値である要素数が無くなるためです。

リングバッファの用途は、ソケット命令やSDカード命令(ファイル入出力)のバッファのようなデバイスとしての入出力に限りません。POU間メッセージチャネルのような内部的、設計上のツールにもなります。また、操作POUを適切に定義して使用することで、楽観的に大容量のバイト列を流すことも、1バイトの損失も生じることなくバイト列の流れを切り替えることもできます。さらに、制約はありますが、リングバッファの基本的な情報だけでリングバッファへの読み書きを追跡してその内容を取得することもできます。

RingBufferの基本的な読み書きは以下のように行います。RingBufferは、バッファ情報とバイト型配列の組み(以下では、iBufferContextとiBuffer)であり、バッファ操作はPOUによって隠蔽しています。

POU/プログラム/BasicReadWrite
// RingBufferは初期化する必要がある。
RingBuffer_init(Context:=iBufferContext,
                Buffer:=iBuffer);

iWriteDataStr := 'data';
iWriteDataSize := StringToAry(In:=iWriteDataStr,
                              AryOut:=iWriteData[0]);

// RingBufferにSizeで指定した値より大きな書き込み可能な空きがない時、FALSEを返す。
RingBuffer_write(Context:=iBufferContext,
                 Buffer:=iBuffer,
                 In:=iWriteData,
                 Head:=0,
                 Size:=iWriteDataSize,
                 AllowOverwrite:=FALSE);

// Sizeに指定した値を読み出すバイト数の最大値としてリングバッファから読み出す。
RingBuffer_read(Context:=iBufferContext,
                Buffer:=iBuffer,
                Out:=iReadData,
                Head:=0,
                Size:=SizeOfAry(iReadData),
                ReadSize=>iReadDataSize,
                Overflow=>iOverflow);
                
iReadDataStr := AryToString(In:=iReadData[0],
                            Size:=iReadDataSize);

iOk := iReadDataStr = 'data';

リングバッファとしての特性

RingBufferは、リングバッファとして以下の特性があります。

  • 有限の使用期間 
    単調に増分する書き込み数と読み出し数を内部で使用しているためです。  1msに65535バイトの書き込みを行うとして、約8900年で使用上限に達します。

  • バッファの最大容量は65kB
    SysmacStudioで定義可能な配列の最大の要素数が65535であるためです。

  • バッファへの書き込みはオーバーフロー(未読み出し領域への書き込み)を許容する  
    ユーザーが指定しない限りオーバーフローしませんが、バッファ操作POUはオーバーフローを許容するオプションを指定できます。オーバーフローを許容するのは、書き込み専用、優先バッファを扱えるようにするためです。

  • バッファの読み手と書き手がそれぞれ1つである限り、ロックを使用しなくてもタスクセーフに使用可能  
    単調に増分する書き込み数と読み出し数を使用することで、書き込み処理と読み出し処理が共通して更新する値が無いためです。ロックを使用せずにタスク間で使用できることで、実用的なチャネルを構築することができるようになります。

Sysmacプロジェクト

Sysmacプロジェクトは以下にあります。

https://github.com/kmu2030/RingBufferLib

また、以下にRingBufferを使用したライブラリがあります。これらは、バッファとしてRingBufferを使用しているため、組み合わせやバッファの付け替えが可能です。そのため、ロギングやモニタリングについてのフォールトトレラントな仕組みを簡潔に構築できます。RingBuffer用の変数に保持属性を設定することで電源断にも対応できます。

  • UdpClientLib
    UDPクライアントです。実機からのデバッグ情報取得や、大容量のデータを楽観的に取得するために使用します。

  • TcpClientLib
    TCPクライアントです。リモートの存在を確認してのデータ授受に使用します。

  • FileStreamWriterLib
    ファイルライターです。ログのローカル出力や、スタンドアロン動作時の大容量データの永続化に使用します。

読み書きの追跡(差分取得)について

RingBufferの読み書きの追跡は、バッファ情報を比較しての継続的な差分取得のことです。差分は、バッファへの書き込みである書き込み差分とバッファからの読み出しである読み出し差分を対象としています。2つのバッファ情報の差異の指すバイト列がバッファに残っているとき差分を取得できます。バッファは循環的に使用されるため、差異の指すバイト列がバッファに残っているかは状況によります。

バッファ情報は、バッファの初期化から使用によって常に増分する書き込み数と読み出し数を保持しています。そのため、2つのバッファ情報を比較することで常に差異を取得することができます。バッファ情報は、リングバッファの実装に必要とする情報しか保持していません。そのため、バッファ容量や差分取得のタイミングによる制約がありますが、一般的なリングバッファの使用状態において意図した動作を期待できます。また、それら制約は設計上のトレードオフ対象として取り扱うことができるため、要件に合わせて取捨選択することができます。

差分は、バッファへの書き込みによって差分を保持する領域を上書きすることで消費します。そのため、バッファが使用される限り、読み出し差分の取得可能期間は、書き込み差分と比較して常に短くなります。書き込み差分の消費は、バッファ容量の書き込みが生じたときであるのに対し、読み出し差分の消費は、読み出した時点の書き込み可能な容量の書き込みが生じたときであるためです。

差分取得は、バッファに対する並列的な処理です。そのため、処理中にバッファへの読み書きが生じて処理の開始時点と終了時点で状態が変化する可能性があります。差分取得操作POUは戻り値として、バッファ状態に変化が無いかを確認し、変化があるとFALSEを返します。差分取得中にバッファ状態が変化しても、多くの場合で、再度取得することでその時点での差分を得ることができます。継続的な処理であれば次の処理に持ち越しても問題ないかもしれません。

しかし、バッファへの書き込みが、バッファ状態の変化に先行して実行されることには注意が必要です。バッファ情報の更新は、処理が正常に終了した後に行われるためです。非常に高頻度に循環するバッファからの差分取得で操作POUがFALSEを返したとき、取得した差分は新しく書き込まれた値を含んでいる可能性があります。そのようなバッファからの差分取得が必要である場合、そのバッファへの読み書き操作と同期的にスケジューリングするか、バッファ容量を増やして対応します。

差分取得の制約

リングバッファは、その用途の多くの場合でオーバーフローすることも非常に高頻度に循環することも意図しません。そのため、制約は多くの場合で制約とならない可能性が高いです。また、いずれもバッファ容量の調整と設計上の工夫で対処可能です。

書き込み差分取得の制約

書き込み差分取得には、以下の制約があります。

  • 比較期間でバッファ容量を超える書き込みが行われていない  
    バッファ容量を超える書き込みは、未取得である差分を保持する領域への書き込みです。

読み出し差分取得の制約

読み出し差分取得には以下の制約があります。

  • 比較期間の開始時点で書き込み可能であった容量を超える書き込みが行われていない

  • オーバーフローしていない

  • 比較期間にオーバーフローが発生して解消し、正常な読み出しが行われていてもその読み出しは取得できない  
    バッファ操作と比較期間中のオーバーフローの解消は認識できません。

書き込み差分の再取得の制約

書き込み差分の再取得には以下の制約があります。

  • FALSEを返した差分取得操作POUの実行開始時点からバッファ容量を超える書き込みが行われていない

読み出し差分の再取得の制約

読み出し差分の再取得には以下の制約があります。

  • FALSEを返した差分取得操作POUの実行開始時点で書き込み可能であったバイト数を超える書き込みが行われていない

差分取得の例

以下のコードは、書き込みの追跡の例です。

RingBuffer_init(Context:=iBufferContext,
                Buffer:=iBuffer);

RingBuffer_init(Context:=iTrackWriteBufferContext,
                Buffer:=iTrackWriteBuffer);

// トラッカーを作成する。
// トラッカーを作成した時点の状態が比較の起点になる。
RingBuffer_createWriteTracker(Target:=iBufferContext,
                              Tracker=>iWriteTracker);

iWriteDataStr := 'data';
iWriteDataSize := StringToAry(In:=iWriteDataStr,
                              AryOut:=iWriteData[0]);

RingBuffer_write(Context:=iBufferContext,
                 Buffer:=iBuffer,
                 In:=iWriteData,
                 Head:=0,
                 Size:=iWriteDataSize);

// トラッカーとの差分をリングバッファに取得する。
// 差分の取得に成功するとトラッカーを更新し、比較した状態を比較の起点にする。
// 正常に差分を取得できるとTRUEを返す。
RingBuffer_pullWrite(Context:=iTrackWriteBufferContext,
                     Buffer:=iTrackWriteBuffer,
                     Tracker:=iWriteTracker,
                     Tracked:=iBufferContext,
                     TrackedBuffer:=iBuffer,
                     AllowOverwrite:=FALSE,
                     Missing=>iMissing,
                     PullSize=>iPullSize);

RingBuffer_read(Context:=iTrackWriteBufferContext,
                Buffer:=iTrackWriteBuffer,
                Out:=iTrackData,
                Head:=0,
                Size:=SizeOfAry(iTrackData),
                ReadSize=>iTrackDataSize);
iTrackDataStr := AryToString(In:=iTrackData[0],
                            Size:=iTrackDataSize);

iOk := iTrackDataStr = 'data';

実装

RingBufferは、一般的なリングバッファです。よって、処理内容も一般的なリングバッファと同じで、提供するPOUの意図に沿ったインデックス処理とバイト列のコピー処理を行うだけです。リングバッファの振る舞いを観察すれば、他にも定義できるPOUを見いだせます。

構造体

定義した構造体は1つです。

  • RingBufferContext
    差分取得のトラッカーもこの構造体です。
名称 データ型 内容
ReadSize ULINT 累積読み出しバイト数
WriteSize ULINT 累積書き込みバイト数
ReadHead UINT 読み出し開始位置
WriteHead UINT 書き込み開始位置
Capacity UINT バッファ容量

POU

ゲッターを除き以下のPOUを提供しています。

  • RingBuffer_init  
    バッファを初期化します。

  • RingBuffer_write  
    バッファにバイト列を書き込みます。

  • RingBuffer_writeFully  
    バッファに可能な限りバイト列を書き込みます。

  • RingBuffer_read  
    バッファからバイト列を読み出します。

  • RingBuffer_readFully
    バッファから可能な限りバイト列を読み出します。

  • RingBuffer_peek  
    バッファにバイト列を残したままバイト列を読み出します。

  • RingBuffer_peekFully  
    バッファにバイト列を残したまま可能な限りバイト列を読み出します。

  • RingBuffer_transfer  
    バッファ間でバイト列を転送します。

  • RingBuffer_transferFully  
    バッファ間で可能な限りバイト列を転送します。

  • RingBuffer_consume  
    バッファから指定したバイト数のバイト列を消費します。

  • RingBuffer_createWriteTracker  
    バッファの書き込みを追跡するためのトラッカーを作成します。

  • RingBuffer_createReadTracker  
    バッファの読み出しを追跡するためのトラッカーを作成します。

  • RingBuffer_createTracker  
    バッファの書き込みと読み出しを追跡するためのトラッカーを作成します。

  • RingBuffer_pullWrite  
    RingBuffer_createWriteTrackerで作成したトラッカーを使用して書き込み差分をRingBufferに取得します。

  • RingBuffer_pullRead  
    RingBuffer_createReadTrackerで作成したトラッカーを使用して読み出し差分をRingBufferに取得します。

  • RingBuffer_pull  
    RingBuffer_createTrackerで作成したトラッカーを使用して書き込み差分と読み出し差分をそれぞれRingBufferに取得します。

  • RingBuffer_diffWrite  
    バッファ情報を比較して書き込み差分をBYTE型配列に取得します。

  • RingBuffer_diffRead  
    バッファ情報を比較して読み出し差分をBYTE型配列に取得します。

  • RingBuffer_diff  
    バッファ情報を比較して書き込み差分と読み出し差分をそれぞれBYTE型配列に取得します。

ライブラリを組み合わせる

RingBufferを使用するライブラリを組み合わせる例として、ログをコントローラのSDカードに出力しつつリモートのUDPエンドポイントにリアルタイムで送信するプログラムを確認します。このプログラムは、起動直後のログを幾つか拾えません。プログラム起動順序の工夫、ロギング開始のタイミングやロガーのバッファの初期状態を公開するなどで対処できます。

以下のプログラムを動作させます。

  • POU/プログラム/PseudoMain
    プライマリタスクでログを生成し続けます。
  • POU/プログラム/LogFileWriter
    コントローラのSDカードにログを出力し続けます。
  • POU/プログラム/LogUdpSender
    リモートのUDPエンドポイントにログを送信し続けます。

これらを以下のタスク構成で動作させます。

タスク設定
タスク設定

プログラムの割付設定
プログラムの割付設定

POU/プログラム/PseudoMainは、以下です。指定した間隔でほぼ同サイズのログを生成し続けます。ロガーのバッファは、グローバル変数として定義しています。LOGGING_INTERVALが"1"なので各サイクル、2msで1つのログを生成します。

POU/プログラム/PseudoMain
// Configuration
IF P_First_Run THEN
    LOGGING_INTERVAL := 1;
END_IF;

IF P_First_Run THEN
    // Setup Logger.
    Logger_init(Context:=gLoggerBufferContext,
                Buffer:=gLoggerBuffer);
    
    iWaitTick := LOGGING_INTERVAL;
END_IF;

// Logging at regular intervals.
Dec(iWaitTick);
IF iWaitTick < 1 THEN
    iMsg := CONCAT('{"counter":', ULINT_TO_STRING(Get1usCnt()),
                   ',"timestamp":"', DtToString(GetTime()),
                   '"}$L');
    Logger_write(Context:=gLoggerBufferContext,
                 Buffer:=gLoggerBuffer,
                 Message:=iMsg);
    
    iWaitTick := LOGGING_INTERVAL;
END_IF;

POU/プログラム/LogFileWriterは、以下です。ロガーのバッファへの書き込みを追跡してファイルに出力します。他のPOUもログデータを使用できるよう、ロガーのバッファから消費しないようにします。

POU/プログラム/LogFileWriter
// Configuration
IF P_First_Run THEN
    LOG_PATH := '/log.txt';
END_IF;

IF P_First_Run THEN
    // Setup Log writer.
    RingBuffer_init(Context:=iLogFileWriterBufferContext,
                    Buffer:=iLogFileWriterBuffer);    
    FileStreamWriter_configure(Context:=iLogFileWriterContext,
                               Activate:=TRUE,
                               Path:=LOG_PATH,
                               FileOpenMode:=_eFOPEN_MODE#_WRITE_CREATE,
                               FlushThreshold:=(SizeOfAry(iLogFileWriterBuffer) / 2),
                               FlushOnDeactivate:=TRUE);
    iLogFileWriter.Enable := TRUE;
    
    // Tracks Logger writes.
    RingBuffer_createWriteTracker(Target:=gLoggerBufferContext,
                                  Tracker=>iLoggerWrittenTracker);
END_IF;

// Gets the difference of the logger buffer into the log writer's write buffer.
RingBuffer_pullWrite(Context:=iLogFileWriterBufferContext,
                     Buffer:=iLogFileWriterBuffer,
                     Tracker:=iLoggerWrittenTracker,
                     Tracked:=gLoggerBufferContext,
                     TrackedBuffer:=gLoggerBuffer);
iLogFileWriter(Context:=iLogFileWriterContext,
               BufferContext:=iLogFileWriterBufferContext,
               Buffer:=iLogFileWriterBuffer);

POU/プログラム/LogUdpSenderは、以下です。ロガーのバッファへの書き込みを追跡してリモートのUDPエンドポイントに送信します。POU/プログラム/LogFileWriter同様にログデータをロガーのバッファから消費しないようにします。

POU/プログラム/LogUdpSender
// Configuration.
IF P_First_Run THEN
    REMOTE_ADDR := 'YOUR_REMOTE_DEVICE';
    REMOTE_PORT := 12001;
    SENDER_PORT := 12001;
END_IF;

IF P_First_Run THEN    
    // Setup Log sender.
    // The log sender monitors the logger buffer and sends the written logs.
    RingBuffer_init(Context:=iLogSenderSendBufferContext,
                    Buffer:=iLogSenderSendBuffer);
    UdpClient_configure(Context:=iLogSenderContext,
                        Activate:=TRUE,
                        Destination:=REMOTE_ADDR,
                        DestinationPort:=REMOTE_PORT,
                        Port:=SENDER_PORT,
                        UseSend:=TRUE);
    iLogSenderUdpClient.Enable := TRUE;
    // Tracks logger writes.
    RingBuffer_createWriteTracker(Target:=gLoggerBufferContext,
                                  Tracker=>iLoggerWrittenTracker);

    // Sends the log sender reset payload first.
    RingBuffer_writeString(Context:=iLogSenderSendBufferContext,
                           Buffer:=iLogSenderSendBuffer,
                           Value:=LOG_SENDER_RESET_PAYLOAD);
END_IF;

// Gets the difference of the logger buffer into the log sender's sending buffer.
RingBuffer_pullWrite(Context:=iLogSenderSendBufferContext,
                     Buffer:=iLogSenderSendBuffer,
                     Tracker:=iLoggerWrittenTracker,
                     Tracked:=gLoggerBufferContext,
                     TrackedBuffer:=gLoggerBuffer);
iLogSenderUdpClient(Context:=iLogSenderContext,
                    SendBufferContext:=iLogSenderSendBufferContext,
                    SendBuffer:=iLogSenderSendBuffer);

これらをコントローラで動作させ、適当なUDPエンドポイントを立ち上げておくと、以下のようにログを受信します。今回は、PowerShellから.NETを使用してUDPエンドポイントを立ち上げました。

ログの受信
ログの受信

コントローラのSDカードには同内容のログファイルが作成されています。SDカードへのフラッシュ頻度は、UDPクライアントの送信頻度に比べると小さいので、電源断時に大きなロスが生じます。UDPクライアントによる送信も、電源断時に送信中断によるロスが生じるはずです。それらのロスを無くしたい場合、ロガーのRingBufferを保持属性とし、適切な初期化処理を組み込みます。

まとめ

今回は、リングバッファを扱いました。以前、少し変則的な見方でリングバッファを扱いましたが、RingBufferはあの見方の延長で実装しています。リングバッファの機能の効率ではなく、機能の拡張を目指すためです。これまでRingBufferの基礎となる循環メモリコピー関数を広範に使用していたのですが、構築物の再利用性を求め、改めてBYTE型のリングバッファ実装であるRingBufferとしてまとめることにしました。その際、環境的に効率性にはあまり伸びが無さそうに思えたので、リングバッファとしての機能と単純さを損なわない範囲での機能的な拡張を目指すことにしました。

読み書き間のタスクセーフは第一の要件、かつ、実装できることは分かっていたので、それ以外に何があるかということで、バッファの読み書きの追跡を機能として取り入れることにしました。読み書きの追跡は、実用的な機能の実装用途も目的にしていますが、一番はテストとデバッグです。

RingBufferを使用するPOUは、概ね引数とするRingBufferから読み取るデータが入力であり、RingBufferへ書き込むデータが出力になるはずです。これに対して、RingBufferの追跡機能を使用すると、そのPOUの内部コードや使用コンテクストのコードに触れること無く入出力を追跡してモニタすることができます。実機で動作させ、リモートのエンドポイントで受けて解析することも難しくありません。

基板の端子にプローブをあてて信号を確認するようにPOUのRingBufferを追跡して送信する小さなコード片を導入することで、入出力を確認できるようになります。バイト列は、文字列型のような可変長のデータ型にしか使えないものではありません。適切なフレームを定めれば、任意の構造体を流すこともできます。

Discussion