🫗

SysmacでHashMapの仮実装

2025/01/22に公開

はじめに

本記事は、PLC(Programmable Logic Controller)向けのソフトウェア開発に従事、または関心のある方で、Structured Text(ST)による開発に興味がある方向けです。OMRON社のSysmac Studioとコントローラ(NX1またはNX5)を使用します。

今回は、HashMapを仮実装します。仮実装で課題の確認をします。多くのアプリケーション向け言語では当たり前のように連想配列が使用できます。言語仕様に無い場合でも、それを可能にするライブラリや手法があります。残念ながらStructured Textにはありません。Sysmacにもそれを実現するライブラリはありません。

結果は、芳しくありませんでした。機能的な実装は問題なく行えます。問題はパフォーマンスです。NX1ではパフォーマンスが致命的で気軽に使用できる構造になる見込みがありません。パフォーマンスを改善できそうな箇所はあるのですが、少なくとも読み書きを目標とした20usで実行できるようになる見込みはありません。ハッシュ値の算出は比較的簡単なものでもNX1からするとかなりの計算量になります。また、当然のように行うFUNへの分割も負荷になります。これを改善するとなると、手動インライン展開しかありませんが現実的ではありません。

しかし、全く使いものにならないわけでもありません。当然ですが、要素数が多くなっても各操作の実行時間が大きく増えることはありません。また、キーが任意長で値の型も任意であることは、Sysmacで使用できる基本型には無い機能性です。用途に合わせてメモリの使用量やハッシュアルゴリズムを簡単に変更できるように設計しているので、意外と使う場面があるかもしれません。NX5であればパフォーマンスが大きく改善する見込みがあるので、メモリ使用量を押さえた小規模なHashMapとすることで気軽に使える構造になる可能性があります。

例えば、HTTPヘッダーの保持と操作を考えます。FAでHTTPヘッダーなんてどこの星の話だと思うかもしれませんが、多品種の生産ラインを単一のコントローラで制御し、制御のためにあれやこれやの情報を保持しておく必要があるとすれば、HTTPヘッダーのそれと状況は似たようなものです。今のところ、HTTPヘッダーを保持するために適当な固定長のキー文字列と値文字列をメンバにもつ構造体を定義し、その配列を使用して一連のヘッダーを扱っているのですが、それがHashMapになれば扱いが楽になります。気軽に使えないままであれば、次のようなレジストリとしての使用が考えられます。

IF P_First_Run THEN
  HashMap_PutString(iRegistry, 'service.mqttpub.name', 'mqtt_pub_1');
  HashMap_PutString(iRegistry, 'service.mqttpub.broker_adr', 'example.com');
  HashMap_PutUint(iRegistry, 'service.mqttpub.broker_port', 1883);
  HashMap_PutUint(iRegistry, 'service.mqttpub.keep_alive', 600);
  HashMap_PutUint(iRegistry, 'service.mqttpub.timeout', 5);
  HashMap_PutString(iRegistry, 'service.mqttpub.username', '');
  HashMap_PutString(iRegistry, 'service.mqttpub.password', '');
  HashMap_PutBool(iRegistry, 'service.mqttpub.tls_use', FALSE);
  HashMap_PutString(iRegistry, 'service.mqttpub.tls_ession_name', '');
  HashMap_PutString(iRegistry, 'service.mqttpub.client_id', 'nx');
  HashMap_PutBool(iRegistry, 'service.mqttpub.auto_ping', FALSE);
  HashMap_PutTime(iRegistry, 'service.mqttpub.auto_ping_interval', TIME#10s);
  HashMap_PutUint(iRegistry, 'service.mqttpub.auto_ping_timeout', 3000);
  HashMap_PutBool(iRegistry, 'service.mqttpub.will_flag', TRUE);
  HashMap_PutString(iRegistry, 'service.mqttpub.will_topic',
                    'notify/offline/mqtt_pub_1');
  HashMap_PutString(iRegistry, 'service.mqttpub.will_msg',
                    CONCAT('[info]$L',
                           'type = "machine/service"$L',
                           'entity = "mqtt_pub_1"$L',
                           'origin = "broker"$L',
                           'message = "offline"$L'));
  HashMap_PutByte(iRegistry, 'service.mqttpub.will_qos', BYTE#1);
  HashMap_PutBool(iRegistry, 'service.mqttpub.will_retain', FALSE);
  HashMap_PutBool(iRegistry, 'service.mqttpub.clean_session', TRUE);
  HashMap_PutUint(iRegistry, 'service.mqttpub.discard_msg_time', 1000);
END_IF;

CASE iState OF
  0:
    Inc(iState);
  1:
    HashMap_GetString(iRegistry, 'service.mqttpub.name', iServiceName);
    HashMap_GetString(iRegistry, 'service.mqttpub.broker_adr', iBrokerAdr);
    HashMap_GetUint(iRegistry, 'service.mqttpub.broker_port', iBrokerPort);
    HashMap_GetUint(iRegistry, 'service.mqttpub.keep_alive', iKeepAlive);
    HashMap_GetUint(iRegistry, 'service.mqttpub.timeout', iTimeout);
    HashMap_GetString(iRegistry, 'service.mqttpub.username', iUsername);
    HashMap_GetString(iRegistry, 'service.mqttpub.password', iPassword);
    HashMap_GetBool(iRegistry, 'service.mqttpub.tls_use', iTLSUse);
    HashMap_GetString(iRegistry, 'service.mqttpub.tls_ession_name', iTLSSessionName);
    HashMap_GetString(iRegistry, 'service.mqttpub.client_id', iClientID);
    HashMap_GetBool(iRegistry, 'service.mqttpub.auto_ping', iAutoPing);
    HashMap_GetTime(iRegistry, 'service.mqttpub.auto_ping_interval', iAutoPingInterval);
    HashMap_GetUint(iRegistry, 'service.mqttpub.auto_ping_timeout', iAutoPingTimeout);
    HashMap_GetBool(iRegistry, 'service.mqttpub.will_flag', iWillFlag);
    HashMap_GetString(iRegistry, 'service.mqttpub.will_topic', iWillTopic);
    HashMap_GetString(iRegistry, 'service.mqttpub.will_msg', iWillMsg);
    HashMap_GetByte(iRegistry, 'service.mqttpub.will_qos', iWillQoS);
    HashMap_GetBool(iRegistry, 'service.mqttpub.will_retain', iWillRetain);
    HashMap_GetBool(iRegistry, 'service.mqttpub.clean_session', iCleanSession);
    HashMap_GetUint(iRegistry, 'service.mqttpub.discard_msg_time', iDiscardMsgTime);
    
    Inc(iState);
END_CASE;

Sysmacプロジェクト

次にSysmacプロジェクトがあります。

https://github.com/kmu2030/ExperimentalHashMapLib

HashMapの要件

HashMapへの要件を確認します。キーは任意長、値は任意の型とその配列を保持でき、32 byteのキー長と100 byte未満の値について20usで読み書きを行うことができることを要件とします。20usは、NX1で現在時刻を取得する命令の実行時間と同程度です。メモリについての要件は、そもそも実行時間についての要件を満たせる可能性を確認してからです。今回は仮実装なので、ハッシュテーブルの実装に集中し、その他は適当です。

キーと値を保持する領域のメモリ管理は放棄します。削除した要素のデータ保持領域は再使用しません。メモリ管理は固定ブロックによる単方向リストが候補です。値の内部データ表現はMessagePackとし、ビット列型と日時型はext formatとします。キーは文字列型に限定しますが、これも内部ではMessagePack表現としているため、少し手を加えると任意型とすることができます。但し、読み書きFUNのシグネチャが大変なことになるか、便宜上Object型のように振る舞う構造体を使用することになります。

HashMapの実装

HashMapは、キーと値の保持領域とハッシュテーブルをもちます。ハッシュテーブルの値は、キー、値それぞれの保持領域の先頭ポインタと値のサイズです。キーと値の読み書きはメモリ管理とMessagePack形式へのエンコード/デコードなので省略し、ハッシュテーブル操作を確認します。ハッシュテーブル操作は、次になります。

  • HashTableKey_New
    ハッシュ値を含むHashTableKey構造体を生成します。
  • HashTableKey_Eq
    引数のHashTableKeyが一致するかを返します。
  • HashTable_KeyToIndex
    引数のHashTableKeyから値を格納する配列のインデックスを算出します。
  • HashTable_PutWithKey
    引数のHashTableKeyとそれに対応した値(HashTableValue)をHashTableに格納します。
  • HashTable_GetWithKey
    引数のHashTableKeyに対応した値を取得します。
  • HashTable_DelWithKey
    引数のHashTableKeyに対応した値をHashTableから削除します。

HashTableKey_New

ハッシュ関数は、MurmurHash3の32bitを使用します。また、キーのデータ長も比較することにしたので、HashTableKeyはそれも保持します。HashTableでは、ハッシュ値そのものを扱うのではなく、ハッシュ値を保持する構造体を扱うようにすることで変更に強くなります。

HashTableKey_New := TRUE;
IF NOT EN THEN
  Clear(Key);
  
  RETURN;
END_IF;

Key.KeyLength := StringToAry(In:=Value, AryOut:=iData[0]);
MurmurHash3A(Data:=iData,
             Head:=0,
             Size:=Key.KeyLength,
             Seed:=0,
             Hash=>Key.Hash);

HashTableKey_Eq

HashTableKeyの一致と衝突は、ハッシュ値とキーのデータ長で判断します。

Collision := In1.Hash = In2.Hash AND In1.KeyLength <> In2.KeyLength;
HashTableKey_Eq := In1.Hash = In2.Hash AND In1.KeyLength = In2.KeyLength;

HashTable_KeyToIndex

ハッシュ値をそのまま値を保持する配列のインデックスとして使用することはできません。ハッシュ値を配列長に丸めます。配列長は最大としたので65534までです。よってかなりの確率でインデックスは重複します。また、ハッシュ値の各ビットの出現の偏りが大きいと重複の偏りも大きくなります。ハッシュ値の衝突を前提としているので、HashTableでは値を単方向リストとして管理します。

IF NOT EN THEN
  HashTable_KeyToIndex := 0;
  
  RETURN;
END_IF;

// xor
HashTable_KeyToIndex
  := MIN(TO_UINT(SHR(Key.Hash, 16) XOR Key.Hash), UINT#16#FFFE);

HashTable_PutWithKey

HashTableに値を追加します。値を格納する配列は、複数の単方向リストとして操作します。リストの先頭は、ListTableメンバが保持する値(Head)と配列の先頭要素です。先頭要素が指す次(Nextメンバ)のインデックスは、値を保持していない要素を指すようにします。値を保持していない要素のリストをたどって、Nextがゼロである要素に当たった場合、その要素のインデックスから先は一度も使用していない未使用領域です。また、値を保持する要素でNextが自身のインデックスである要素は、そのリストの末端とします。値は、HashTableValueとしてHashTableに対して詳細を隠し、変更に強くします。

HashTable_PutWithKey := FALSE;
Collision := FALSE;
Exist := FALSE;
IF NOT EN
  OR Table.Num >= SizeOfAry(Table.Elements)
THEN  
  RETURN;
END_IF;

iListIndex := HashTable_KeyToIndex(Key:=Key, Table:=Table);

CASE Table.ListTable[iListIndex].Head OF
  0:
    iNewElementIndex := Table.Elements[0].Next;
    IF iNewElementIndex = 0 THEN
      Table.Elements[0].Next := 2;
      iNewElementIndex := 1;
    ELSIF Table.Elements[iNewElementIndex].Next = 0 THEN
      Table.Elements[0].Next := iNewElementIndex + 1;
    ELSE
      Table.Elements[0].Next := Table.Elements[iNewElementIndex].Next;
    END_IF;
    Table.Elements[iNewElementIndex].Key := Key;
    Table.Elements[iNewElementIndex].Value := Value;
    Table.Elements[iNewElementIndex].Next := iNewElementIndex;
    Table.ListTable[iListIndex].Head := iNewElementIndex;
ELSE  
  iElmNum := 1;
  iElementIndex := Table.ListTable[iListIndex].Head;
  i := iElementIndex;
  REPEAT
    IF HashTableKey_Eq(
         In1:=Table.Elements[iElementIndex].Key,
         In2:=Key,
         Collision=>iCollision)
    THEN
      Table.Elements[iElementIndex].Value := Value;
      Exist := TRUE;      
      EXIT;
    ELSIF iCollision THEN
      Collision := Collision OR iCollision;
    END_IF;
    i := iElementIndex;
    iElementIndex := Table.Elements[iElementIndex].Next;
    Inc(iElmNum);
  UNTIL Table.Elements[iElementIndex].Next = i
  END_REPEAT;

  IF NOT Exist THEN
    iNewElementIndex := Table.Elements[0].Next;
    IF Table.Elements[iNewElementIndex].Next = 0 THEN
      Table.Elements[0].Next := iNewElementIndex + 1;
    ELSE
      Table.Elements[0].Next := Table.Elements[iNewElementIndex].Next;
    END_IF;

    Table.Elements[iNewElementIndex].Key := Key;
    Table.Elements[iNewElementIndex].Next := iNewElementIndex;
    Table.Elements[iNewElementIndex].Value := Value;
    Table.Elements[iElementIndex].Next := iNewElementIndex;
    Inc(iElmNum);
    Table.MaxListLength := MAX(Table.MaxListLength, iElmNum);
  END_IF;

  IF Collision THEN
    Table.ListTable[iListIndex].Stat
      := Table.ListTable[iListIndex].Stat OR BYTE#16#01;
    Inc(Table.CollisionNum);
  END_IF;
END_CASE;
Inc(Table.Num);

HashTable_PutWithKey := TRUE;

HashTable_GetWithKey

HashTableから値を取得します。値のリストをたどってHashTableKeyが一致した値を返します。

HashTable_GetWithKey := FALSE;
Collision := FALSE;
IF NOT EN
  OR Table.Num < 1
THEN
  RETURN;
END_IF;

iListIndex := HashTable_KeyToIndex(Key:=Key, Table:=Table);

IF Table.ListTable[iListIndex].Head < 1 THEN
  RETURN;
END_IF;

i := Table.ListTable[iListIndex].Head;
j := i;
REPEAT
  IF HashTableKey_Eq(
       In1:=Table.Elements[i].Key,
       In2:=Key,
       Collision=>Collision)
  THEN
    Value := Table.Elements[i].Value;
    HashTable_GetWithKey := TRUE;
    
    EXIT;
  ELSIF Collision THEN
    Collision := TRUE;
    Value := Table.Elements[i].Value;
    HashTable_GetWithKey := TRUE;
    
    EXIT;
  END_IF;
  j := i;
  i := Table.Elements[i].Next;
UNTIL Table.Elements[i].Next = j
END_REPEAT;

HashTable_DelWithKey

HashTableから値とキーを削除します。値のリストをたどってHashTableKeyが一致した値を削除します。

HashTable_DelWithKey := FALSE;
Collision := FALSE;
IF NOT EN
  OR Table.Num < 1
THEN
  RETURN;
END_IF;

iListIndex := HashTable_KeyToIndex(Key:=Key, Table:=Table);

IF Table.ListTable[iListIndex].Head < 1 THEN
  RETURN;
END_IF;

i := Table.ListTable[iListIndex].Head;
j := i;
REPEAT
  IF HashTableKey_Eq(
       In1:=Table.Elements[i].Key,
       In2:=Key,
       Collision=>Collision)
  THEN      
    Value := Table.Elements[i].Value;
    IF i = Table.Elements[i].Next THEN
      IF i = j THEN
        Clear(Table.ListTable[iListIndex]);
      ELSE
        Table.Elements[j].Next := j;
      END_IF;
    ELSE
      IF i = j THEN
        Table.ListTable[iListIndex].Head := Table.Elements[i].Next;
      ELSE
        Table.Elements[j].Next := Table.Elements[i].Next;
      END_IF;
    END_IF;
    
    Table.Elements[i].Next := Table.Elements[0].Next;
    Table.Elements[0].Next := i;
    
    Dec(Table.Num);
    
    HashTable_DelWithKey := TRUE;
    
    EXIT;
  ELSIF Collision THEN
    Collision := TRUE;
    Value := Table.Elements[i].Value;
    
    EXIT;
  END_IF;
  j := i;
  i := Table.Elements[i].Next;
UNTIL Table.Elements[i].Next = j
END_REPEAT;

パフォーマンス

パフォーマンスを確認するため、実機でプログラムを動作させて各操作の実行時間をトレースしてみました。実行時間計測のためのよい手段が無く、命令実行前後の現在時刻の差から、現在時刻取得1回分の実行時間を引いた値を実行時間としています。ランタイム側の影響も受けるようなのであまり信頼できない値です。実行時間計測については、どこかで探求する必要がありますが、メーカーさんのお仕事ではという気もします。信頼できない値をもとに確認をしても意味が無いと言われると弱いのですが、今のところはこれしかありません。いずれも試行回数が少ないので平均は参考とし、最大と最小を確認します。長期間のデータ収集とその視覚化を行いたければ、InfluxDBにでも投げましょう。

実行環境

パフォーマンスの確認は、次の環境で行いました。タスクへの割り付けとプログラムの詳細は、Sysmacプロジェクトを確認してください。

  • コントローラ
    NX102-9000 Ver 1.64を使用しました。
  • Sysmac Studio
    Ver 1.61を使用しました。

HashMap操作の実行時間

HashMap操作の実行時間は次のようになりました。要素数は64、キーのデータ長は33 byteです。


HashMap操作の実行時間グラフ (要素数=64, Key=33 byte)

操作 最小(us) 最大(us) 平均(us)
Get 30 41 34
Put 66 86 72
Update 82 106 91
Delete 84 105 91

いずれも最小値が20usを上回っているので、残念ながら目標はクリアしていません。しかし、最大値が絶望的な値を示すことが無さそうなのは救いです。要求を満たせるなら、使えないこともないということです。横着する可能性が高まりました。ハッシュ値が衝突した場合、キーの長さによりますが、実行時間が数倍になります。

異なる要素数による値取得実行時間の比較

要素数を変えて、値取得の実行時間を比較すると次のようになりました。要素数そのものではなく、HashMapの容量に対する使用率として見ます。キーのデータ長は33 byteです。


異なる要素数の値取得実行時間グラフ (Key=33 byte)

要素数 最小(us) 最大(us) 平均(us)
64 39 57 42
256 63 83 70
9000 82 107 88

要素数に応じて実行時間が増えているので、そこは順当です。増え方も妥当です。しかし、リストの検索は思っていたよりも時間がかかっているようです。キーの比較を行うFUNの呼び出しが原因かもしれません。

キー生成の実行時間

HashTableKey生成の実行時間については、次のようになりました。キーのデータ長は33 byteです。


HashTableKey生成の実行時間グラフ (Key=33 byte)

最小(us) 最大(us) 平均(us)
11 71 28

少なくとも11usはかかるようです。キーのデータ長が一定なのに、実行時間がこれほど振れる理由については、今のところ検証していません。頻繁に生じる演算オバーフローが原因かもしれませんし、ライブラリのFUNを呼び出しているからかもしれません。あるいは、全く別の理由かもしれません。しかし、先のMashMap操作の実行時間を考えると腑に落ちません。やはり計測方法そのものに問題がある気がします。実行時間が短すぎるのかもしれません。そうであれば、複数回実行するのに要する時間を計測したほうがよいかもしれません。

まとめ

今回のHashMapは、課題を確認するための仮実装です。多くの課題を見いだせたという意味で、よい結果になりました。使い物にならないという訳でも無さそうだということも、よい結果です。何よりも一通り作り切ったので、とりあえずSysmacで動くHashMapがあるということが最大の収穫です。やはり、始めたら作り切って、何であっても動くものに仕上げることです。"できる"と"ある"には雲泥の差があります。

連想配列については、赤黒木の方がよいと思いますが、木構造を使用したとしてもキー値の比較はハッシュ値で行うことを考えています。バイト列や文字列の比較は高コストだからです。小規模なキーバリューであれば、そのコストが何らかのデータ構造を使用するコストを下回る可能性が高いので、固定長キーバリューの構造体と組み込み検索命令の線形探索で十分です。

Discussion