📖

Pythonで排他ロック可能なプロセス間通信(共有メモリ)ライブラリを作った

2022/09/17に公開
3

概要

複数の独立したPythonプロセス間の通信ができる高水準のモジュール IPMMAP を作成しました。

https://github.com/quag-cactus/ipmmap

  • IPMMAPでは、mmap(標準モジュール)を用いて共有メモリ領域を使ったプロセス間通信を提供します。
  • クラスベースのデータ構造を採用することで、可読性を高めています。
  • サードパーティーモジュールであるfastainersを使用してプロセス間排他ロックを実装することで、安全な共有メモリへのアクセスを実現しています。

背景と要件

とあるプロジェクトで、複数のpythonプロセスを連携して動作させる必要が出てきました。前段のpythonプロセスから後段のpythonプロセスにパラメータを渡す処理が必要になり、何らかのプロセス間通信(IPC)機能の実装が必要になりました。

標準モジュールの利用を含め、IPCの実装には様々なアプローチがありますが[1]、今回は以下の要件に合致する必要があります。

  • それぞれのプロセスに親子関係はなく、完全に独立したプロセス群同士が「通信」する。
  • パラメータの「送信側」と「受信側」のプロセスは固定されている。(通信は事実上単方向)
  • 通信の頻度は最大で数回/秒程度。

通信頻度とサイズから考え、各プロセスが共有メモリ領域への書き込み・読み込みを行う方式をとることにしました。

この方法では、複数プロセスが同じメモリ領域の参照を行う都合上、排他制御が必要になりそうです。よって、要件をもう一つ追加します。

  • 共有メモリ領域の参照中は書き込み・読み込み処理に合わせて適切な排他制御を行う。

pythonの標準ライブラリにおけるプロセス間通信

とりあえず標準モジュールで実装可能かどうか調べていきます。
pythonでプロセス間通信に関する検索を行うと、multiprocessingモジュールによる並列処理間でのパラメータ共有の解説がほとんどです。今回の要件には使えません。

今回は独立したプロセス同士の通信を行いたいのですが、標準モジュールでは、独立したプロセス間で安全にメモリを共有する高水準なモジュールは用意されていないことが分かりました。

multiprocessing.shared_memoryの調査

今回の要件に近いのはモジュールも一応ありまして、multiprocessing.shared_memoryです。

モジュール内部のSharedMemoryクラスは以下のように記述することで、共有メモリブロックを作成します。name属性に入っている名前をコンストラクタ引数で指定することで、他プロセスから同じメモリブロックを参照できます。

# https://docs.python.org/3/library/multiprocessing.shared_memory.html
>>> from multiprocessing import shared_memory
>>> shm_a = shared_memory.SharedMemory(create=True, size=10)
>>> shm_a.name
-> '共有メモリブロック名'

共有メモリ方式を採用したのは、SharedMemoryクラスを以前から把握しているからでもありました。このクラスをベースとして、必要な機能(排他ロック関連)を追加していく算段でした。

しかしながら、shared_memoryモジュールはいくつか問題があります。

  1. 排他ロック機能が存在しない
  2. linuxだとうまく動作しない

1: ドキュメントを見た感じ、ロック関連の記述がありません。複数プロセスからの同時書き込みを試してみたところ、書き込めてしまいました。また、python自体に排他ロックを行うための(高水準な)モジュールも存在しないようです。
2 : これは非常に困ります。issueは上がっていますが、まだマージされていません[2]

まとめると、shared_memoryは排他ロック無しということに目をつぶれば、Windowsでのみ動作(Python3.8で確認)します。ただし、linuxでは使用できません。
まだ安定していないモジュールという考え方もできます。ちょっと使うのが気が引ける感じになってきました。

実装方針

標準モジュールでは要件すべて達成できないので、多少のスクラッチを視野に作成していきます。
共有メモリや排他制御は低水準の知識や、慎重な実装が必要になるため、一から作るのは避けたいところです。

(1)主要部分の実装方針

  1. 共有メモリの実装
    プロセス間通信の手段として共有メモリを用いる方針は維持します。shared_mamory.pyのソースを確認したところ、内部的にはmmapクラスを呼び出しています。mmapクラスは比較的低水準のクラスであり(OSの差異は吸収してくれますが)、やや使いにくいです。新たに今回の要件に合わせたラッパクラスで活用していくことにします。

  2. 排他制御の実装
    サードパーティーのパッケージでfastainersというパッケージを見つけました。これにリード・ライトロックが可能なプロセス間ロック機構が用意されていたので、使ってみることにしました。OS毎に異なるファイルロックのシステムコールをうまく抽象化してくれているようです。
    日本語ではこのページを参考にしました: https://blog.amedama.jp/entry/2016/12/08/202549

  3. データ構造
    mmapが示す共有メモリ領域はbytesarrayと同じように取り扱えます。
    ただし、mmapにアクセスするたびに直接バイトいじりをしていると大変効率が悪いので、バッファサイズと内部形状は、ctypes.Structureを継承したクラス(構造体)を用いて表現します。上記ラッパクラスはこの構造体を使用する設計とします。

そろそろ名前が必要なので、今回のプロセス間通信に特化したmmapラッパーライブラリをIPMMAPと呼称します。

(2)クラス設計

可読性と安全性を考え、アクセス権限によってラッパクラスを分けます。

  • IPMMAPマネジャークラス(mmapファイル新規作成権限をもつ)
  • IPMMAPエディタクラス(mmapが示すバッファ内部の編集権限を持つ)
  • IPMMAPリーダークラス(mmapが示すバッファ内部の編集権限を持つ)

(3)処理フロー

1. 共有メモリを編集する場合(送信側)

IPMMAPマネジャークラスのmmapの実体ファイルを作成
↓
IPMMAPエディタクラスをインスタンス化
↓
書き込みロックを取得
↓
mmapが示すメモリ領域に書き込み
↓
最終更新時刻を更新
↓
書き込みロックをリリース

2. 共有メモリを編集する場合(受信側)

IPMMAPリーダークラスをインスタンス化
↓
読み込みロックを取得
↓
mmapが示すメモリ領域を読み込み
↓
書き込みロックをリリース
  • 排他ロックとそのリリースを確実を行うことが肝要です。コンテキストマネージャを使って、IPMMAPクラスの使用者が意識せずともリソース管理ができるように実装します。

  • プロセス間通信という視点に立ち返れば、「共有メモリを編集する=>送信」、「共有メモリを読み込む=>受信」、と言い換えることができます。上述したデータ構造に最終更新時刻を含め、書き込み毎に更新するようにすれば、読み込み側はmmapへのポーリングを繰り返すことで最新情報を「受信」することができるようになります。

実装・使い方

以上の方針をもとに実装を進め、とりあえず作りました。簡単な使い方も併記します。

https://github.com/quag-cactus/ipmmap

このライブラリでは、プロセス間で共有したいデータ構造を定義する構造体を用意し、メモリマッピングはその構造体のサイズに基づいて行われます。マッピングされたアドレス(共有メモリ領域)へのアクセスは、権限別に用意されたipmmapアクセスクラスを用いて行います。

IPMMAP向け構造体を作成する

  • IPMMAPでは、共有メモリに展開するデータ構造をC言語の型をつけ、固定長で定義していきます。
  • IPMMAP向け構造体として認識させるために、base_struct.BaseMmapStructureを継承させる必要があります。
  • 構造体のネストも可能です。
  • 下記デモスクリプトでは、IPMMAP構造体DemoMmapStructureを定義しています

https://github.com/quag-cactus/ipmmap/blob/main/demo_struct.py

DataStructMmapManagerクラスを使ってメモリマッピング元のファイルを作成

  • まず、共有マップ構造を定義したモジュールパスを、setUserStructs関数を用いて登録します。登録されたモジュールパスは、ユーザー定義IPMMAP構造体の捜索パスとして追加されます。
  • DataStructMmapManager(以下Managerクラス)はAbstractMmapMangerを親クラスとするクラスのなかで、唯一mmapのマッピング元ファイル(mmapファイル)を作成する権限を持っています。mmapファイルはpythonのカレントディレクトリ直下に生成される.mmap/に格納されます。主なコンストラクタ引数は以下です:
    • structName: マッピングしたい(共有したい)IPMMAP構造体クラス名です。
    • tag(オプション): 共有メモリ領域をタグ名で一意に定めることができます。同一の構造体定義で複数のメモリ領域を管理したい場合便利です
  • プロセス間通信を行うプロセスの中のいずれかが、Managerクラスを呼び出し、マッピング元ファイルを確実に作成する必要があります。
  • openMemory()を呼び出すことで、mmapファイルの内容を共有メモリとしてマッピングできます。この関数によって、Managerクラスのインスタンスが有効なスコープにある限り、メモリマッピングを維持できます。openMemory()closeMemoryはManagerクラスのみが呼び出せます。
>>> import ipmmap
>>> # ユーザー定義構造体のモジュールパス登録
>>> ipmmap.DataStructMmapManager.setUserStructs(["demo_struct"])
>>> # manager権限のインスタンスを作成(.mmapファイルが作成されます)
>>> # DataStructMmapManager(structName: str, tag: str = "", mmapDir: Path = None, fastenerDir: Path = None, create: bool = False, force: bool = False)
>>> manager = ipmmap.DataStructMmapManager("DemoMmapStructure", create=True, force=True)
>>> # mmapファイルをメモリにマッピングします
>>> manager.openMemory()

DataStructMmapEditorを使って共有メモリ領域を編集する

  • DataStructMmapEditorクラスをwith文を用いて呼び出すことで、リソース管理を適切に行いつつ、排他的に共有メモリ領域にアクセスできます。
  • Editorクラスは編集権限を持つクラスで、Managerクラスの次の強い権限を持っています。このクラスを用いて共有メモリ領域にアクセスした場合、書き込み権限で排他ロックがかかります。
    • あるプロセスがEditorクラスのwith文の中を実行中の間は、他のプロセスのEditorクラス・Readerクラス(後述)は共有メモリにアクセスできません(with文の行でブロックされます)。
    • updateLastUpdate関数を呼び出すことで、最終更新時刻を現在のシステム時刻に更新できます。
    • writeData関数によって、構造体のフィールド名をキーとして、値を挿入できます。
    • ctypes.Structureは配列の形をとったフィールドにも対応していますが、writeData関数は、配列へのアクセスに対応していません。referMappedBuffer関数で共有メモリ領域を直接参照することで、通常の変数と同様に添字を使ってアクセスできます。
      • referMappedBuffer関数で取得した参照は必ずwith文内部で破棄してください。共有メモリへの余計な参照が存在するとエラーになります。
>>> with ipmmap.DataStructMmapEditor("DemoMmapStructure") as editor:
...      # writeData関数でIPMMAP構造体メンバ名をキーとして、値を挿入できます
...      editor.writeData("data_int", 10)
...      # 文字列はbytes型に変換する必要があります
...      editor.writeData("data_string", bytes("hello!", 'utf-8'))
...      # 構造体が入れ子になっていてもOKです
...      editor.writeData("data_xy.x", 123.45)
...      editor.writeData("data_xy.x", 543.21)
...      # 共有メモリの内容を直接参照・アクセス
...	 buf = editor.referMappedBuffer()
...	 buf.multi_pts.multi_points[0].x = 987.65
...	 buf.multi_pts.multi_points[0].y = 567.89
...      # 参照を破棄(今回はNoneを代入)
...      buf = None
...      # updateLastUpdate関数で最終更新時刻を更新できます
...      editor.updateLastUpdate()
...

DataStructMmapReaderを使って共有メモリ領域を読み込む

  • DataStructMmapReaderクラスは読み込み権限のみを行うクラスで、用意された関数では、共有メモリ領域の読み込みのみ可能です。
  • このクラスもwith文をつかって共有メモリ領域にアクセスできます。このクラスを用いて共有メモリ領域にアクセスした場合、読み込み権限で排他ロックがかかります。
    • 読み込みロック中はその領域にたいして書き込みはできませんが、他プロセスが同時に読み込むことは可能です。
>>> import ipmmap
>>> ipmmap.DataStructMmapReader.setUserStructs(["demo_struct"])
>>> # 最終更新時刻のエポック秒が取得できます
>>> with ipmmap.DataStructMmapReader("DemoMmapStructure") as reader:
...     reader.getLastUpdate()
...
1663318271.859895
>>> # IPMMAP構造体メンバ名(文字列)をキーとして値を取得できます
>>> with ipmmap.DataStructMmapReader("DemoMmapStructure") as reader:
...     reader.readData("data_int")
...     reader.readData("data_string")
...     reader.readData("data_xy.x")
...     reader.readData("data_xy.y")
...     # 共有メモリ領域を構造体のままコピーしてアクセス
...     buf = reader.readMappedBuffer()
...     buf.multi_pts.multi_points[0].x
...     buf.multi_pts.multi_points[0].y
...
10
b'hello!'
123.45
543.21
987.65
567.89
>>>

gitリポジトリのREADME.mdとデモスクリプト(demo_editor.pyとdemo_reader.py)も参考にしてください。

Todo

  • ノンブロッキングモード(ロックが取れない場合、即座に復帰する)の追加
  • PyPIに登録
  • リポジトリをpoetryに対応 -> 対応しました!

参考

脚注
  1. 更新頻度とサイズによっては、単純なファイルIOでも良いはずです ↩︎

  2. 議論は活発なので、そのうち治るかもしれません。shared_memory全体に問題が多いのも認識されているようです。 ↩︎

Discussion

thebluetheblue

大変参考になる記事をありがとうございます。早速使用を試みたところ一点、不明な点がありました。

class MultiPointsData(ctypes.Structure):
    _fields_ = (
        ('multi_data_int', ctypes.c_int32), 
        ('multi_points', Point2D * 24),
    )

のPoint2D * 24でPoint2D構造体をArrayにしていますが、そこへの書き込み、読み出し手順がdemoにはなくご教示頂けないでしょうか?

QuagCactusQuagCactus

コメントありがとうございます!
確かにctypes.structureに配列構造がある場合の読み書きについて漏れていました。

現在、writeData/readData関数で配列を扱うことができません。
代わりに、共有メモリ領域を直接参照し、構造体の形で扱えるようにする関数として、

  • DataStructMmapEditor.referMappedBuffer() -> 編集権限
  • DataStructMmapReader.readMappedBuffer() -> 読み込み権限

を用意してあります。複雑な形状の構造体を扱う場合は、こちらを使ってください(本文にも追記しておきます)。

以下使用例です。注意点として、referMappedBufferを呼び出した場合は、with文から出る前(内部的にmmapへの参照を解く前に)、bufferへの参照を明示的に解く必要があります。

# 編集側
with DataStructMmapEditor("DemoMmapStructure") as editor:

    # 共有メモリ領域を直接参照
    buf = editor.referMappedBuffer()

    # 変数bufは構造体のインスタンスなので、ネストされたAttributeにもアクセス可能
    buf.multi_pts.multi_points[0].x = 8888
    buf.multi_pts.multi_points[0].y = 9999
    # with文を閉じる前に明示的に参照を解く
    buf = None
# 読み込み側
with DataStructMmapReader("DemoMmapStructure") as reader:

    if updateTime > lastestUpdateTime:
        # 共有メモリ領域のバイナリをコピーし、構造体のインスタンスとして返却
        buf = reader.readMappedBuffer()
   
    # 変数bufは構造体のインスタンスなので、ネストされたAttributeにもアクセス可能
        multi_0_x = buf.multi_pts.multi_points[0].x
        multi_0_y = buf.multi_pts.multi_points[0].y

        print(multi_0_x, multi_0_y)

thebluetheblue

ありがとうございます。
・DataStructMmapEditor.referMappedBuffer()
・DataStructMmapReader.readMappedBuffer()
にあったんですね。readData,writeDataの方を見ていました。
確認不足でした。

ログインするとコメントできます