📖

Pythonで排他ロック可能なプロセス間通信(共有メモリ)ライブラリを作った

2022/09/17に公開約7,100字

概要

複数の独立したPythonプロセス間の通信ができる高水準のモジュール IPMMAP を作成しました。

https://github.com/quag-cactus/ipmmap
  • IPMMAPでは、mmap(標準モジュール)を用いて共有メモリ領域を使ったプロセス間通信を提供します。
  • クラスベースのデータ構造を採用することで、可読性を高めています。
  • サードパーティーモジュールであるfastainersを使用してプロセス間排他ロックを実装することで、安全な共有メモリへのアクセスを実現しています。

背景と要件

とあるプロジェクトで、複数のpythonプロセスを連携して動作させる必要が出てきました。前段のpythonプロセスから後段のpythonプロセスにパラメータを渡す処理が必要になり、何らかのプロセス間通信(IPC)機能の実装が必要になりました。

標準モジュールの利用を含め、IPCの実装には様々なアプローチがありますが(場合によっては、単純なファイルIOでも良いはず)、今回は以下の要件に合致する必要があります。

  • それぞれのプロセスに親子関係はなく、完全に独立したプロセス群同士が「通信」する。
  • パラメータの「送信側」と「受信側」のプロセスは固定されている。(通信は事実上単方向)
  • 通信の頻度は最大で数回/秒程度。

通信頻度とサイズから考え、各プロセスが共有メモリ領域への書き込み・読み込みを行う方式をとることにしました。

この方法では、複数プロセスが同じメモリ領域の参照を行う都合上、排他制御が必要になりそうです。よって、要件をもう一つ追加します。

  • 共有メモリ領域の参照中は書き込み・読み込み処理に合わせて適切な排他制御を行う。

pythonの標準ライブラリにおけるプロセス間通信

とりあえず標準モジュールで実装可能かどうか調べていきます。
pythonでプロセス間通信に関する検索を行うと、multiprocessingモジュールによる並列処理間でのパラメータ共有の解説がほとんどです。今回の要件には使えません。

今回は独立したプロセス同士の通信を行いたいのですが、標準モジュールでは、独立したプロセス間で安全にメモリを共有する高水準なモジュールは用意されていないことが分かりました。

強いて言えば、今回の要件に掠っているのはmultiprocessing.shared_memoryです。

SharedMemoryクラスは以下のように記述することで、共有メモリブロックを作成します。name属性に入っている名前をコンストラクタ引数で指定することで、他プロセスから同じメモリブロックを参照できます。

# https://docs.python.org/3/library/multiprocessing.shared_memory.html
>>> from multiprocessing import shared_memory
>>> shm_a = shared_memory.SharedMemory(create=True, size=10)
>>> shm_a.name
-> '共有メモリブロック名'

共有メモリ方式を採用したのは、SharedMemoryクラスの存在を以前から把握しているからでもありました。このクラスをベースとして、必要な機能を追加してけばいいと思っていたのです。

ところが、shared_memoryモジュールにはいくつか問題があることが分かりました。

  1. 排他ロック機能が存在しない
  2. linuxだとうまく動作しない

1に関しては、python自体に排他ロックを行うための(高水準な)モジュールが存在しないことが分かりました。
2に関しては致命的です。ポツポツとこの件に関する記事や質問もありますが、少なくともPython3.8ではWindowsでは動作するものの、linuxでは使用できません。せっかくpythonで書くなら、可能な限りクロスプラットフォームを目指します。

実装方針

標準モジュールでは要件すべて達成できないので、多少のスクラッチを視野に作成していきます。
共有メモリや排他制御は低水準の知識や、慎重な実装が必要になるため、一から作るのは避けたいところです。

(1)主要部分の実装方針

  1. 共有メモリの実装
    プロセス間通信の手段として共有メモリを用いる方針は維持します。shared_mamory.pyのソースを確認したところ、内部的にはmmapクラスを呼び出しています。mmapクラスは比較的低水準のクラスであり(OSの差異は吸収してくれますが)、やや使いにくいです。新たに今回の要件に合わせたラッパクラスで活用していくことにします。

  2. 排他制御の実装
    サードパーティーのパッケージでfastainersというパッケージを見つけました。これにリード・ライトロックが可能なプロセス間ロック機構が用意されていたので、使ってみることにしました。OS毎に異なるファイルロックのシステムコールをうまく抽象化してくれているようです。
    日本語ではこのページを参考にしました: https://blog.amedama.jp/entry/2016/12/08/202549

  3. データ構造
    mmapが示す共有メモリ領域はbytesarrayと同じように取り扱えます。
    ただし、mmapにアクセスするたびに直接バイトいじりをしていると大変効率が悪いので、バッファサイズと内部形状は、ctypes.Structureを継承したクラス(構造体)を用いて表現します。上記ラッパクラスはこの構造体を使用する設計とします。

そろそろ名前が必要なので、今回のプロセス間通信に特化したmmapラッパーライブラリをIPMMAPと呼称します。

(2)クラス設計

可読性と安全性を考え、アクセス権限によってラッパクラスを分けます。

  • IPMMAPマネジャークラス(mmapファイル新規作成権限をもつ)
  • IPMMAPエディタクラス(mmapが示すバッファ内部の編集権限を持つ)
  • IPMMAPリーダークラス(mmapが示すバッファ内部の編集権限を持つ)

(3)処理フロー

1. 共有メモリを編集する場合(送信側)

IPMMAPマネジャークラスのmmapの実体ファイルを作成
↓
IPMMAPエディタクラスをインスタンス化
↓
書き込みロックを取得
↓
mmapが示すメモリ領域に書き込み
↓
最終更新時刻を更新
↓
書き込みロックをリリース

2. 共有メモリを編集する場合(受信側)

IPMMAPリーダークラスをインスタンス化
↓
読み込みロックを取得
↓
mmapが示すメモリ領域を読み込み
↓
書き込みロックをリリース
  • 排他ロックとそのリリースを確実を行うことが肝要です。コンテキストマネージャを使って、IPMMAPクラスの使用者が意識せずともリソース管理ができるように実装します。

  • プロセス間通信という視点に立ち返れば、「共有メモリを編集する=>送信」、「共有メモリを読み込む=>受信」、と言い換えることができます。上述したデータ構造に最終更新時刻を含め、書き込み毎に更新するようにすれば、読み込み側はmmapへのポーリングを繰り返すことで最新情報を「受信」することができるようになります。

実装・使い方

以上の方針をもとに実装を進め、とりあえず作りました。簡単な使い方も併記します。

https://github.com/quag-cactus/ipmmap

このライブラリでは、プロセス間で共有したいデータ構造を定義する構造体を用意し、メモリマッピングはその構造体のサイズに基づいて行われます。マッピングされたアドレス(共有メモリ領域)へのアクセスは、権限別に用意されたipmmapアクセスクラスを用いて行います。

IPMMAP向け構造体を作成する

  • IPMMAPでは、共有メモリに展開するデータ構造をC言語の型をつけ、固定長で定義していきます。
  • IPMMAP向け構造体として認識させるために、base_struct.BaseMmapStructureを継承させる必要があります。
  • 構造体のネストも可能です。
  • 下記デモスクリプトでは、IPMMAP構造体DemoMmapStructureを定義しています

https://github.com/quag-cactus/ipmmap/blob/main/demo_struct.py

DataStructMmapManagerクラスを使ってメモリマッピング元のファイルを作成

  • まず、共有マップ構造を定義したモジュールパスを、setUserStructs関数を用いて登録します。登録されたモジュールパスは、ユーザー定義IPMMAP構造体の捜索パスとして追加されます。
  • DataStructMmapManager(以下Managerクラス)はAbstractMmapMangerを親クラスとするクラスのなかで、唯一mmapのマッピング元ファイル(mmapファイル)を作成する権限を持っています。mmapファイルはpythonのカレントディレクトリ直下に生成される.mmap/に格納されます。主なコンストラクタ引数は以下です:
    • structName: マッピングしたい(共有したい)IPMMAP構造体クラス名です。
    • tag(オプション): 共有メモリ領域をタグ名で一意に定めることができます。同一の構造体定義で複数のメモリ領域を管理したい場合便利です
  • プロセス間通信を行うプロセスの中のいずれかが、Managerクラスを呼び出し、マッピング元ファイルを確実に作成する必要があります。
  • openMemory()を呼び出すことで、mmapファイルの内容を共有メモリとしてマッピングできます。この関数によって、Managerクラスのインスタンスが有効なスコープにある限り、メモリマッピングを維持できます。openMemory()closeMemoryはManagerクラスのみが呼び出せます。
>>> import ipmmap
>>> # ユーザー定義構造体のモジュールパス登録
>>> ipmmap.DataStructMmapManager.setUserStructs(["demo_struct"])
>>> # manager権限のインスタンスを作成(.mmapファイルが作成されます)
>>> # DataStructMmapManager(structName: str, tag: str = "", mmapDir: Path = None, fastenerDir: Path = None, create: bool = False, force: bool = False)
>>> manager = ipmmap.DataStructMmapManager("DemoMmapStructure", create=True, force=True)
>>> # mmapファイルをメモリにマッピングします
>>> manager.openMemory()

DataStructMmapEditorを使って共有メモリ領域を編集する

  • DataStructMmapEditorクラスをwith文を用いて呼び出すことで、リソース管理を適切に行いつつ、排他的に共有メモリ領域にアクセスできます。
  • Editorクラスは編集権限を持つクラスで、Managerクラスの次の強い権限を持っています。このクラスを用いて共有メモリ領域にアクセスした場合、書き込み権限で排他ロックがかかります。
    • あるプロセスがEditorクラスのwith文の中を実行中の間は、他のプロセスのEditorクラス・Readerクラス(後述)は共有メモリにアクセスできません(with文の行でブロックされます)。
    • updateLastUpdate関数を呼び出すことで、最終更新時刻を現在のシステム時刻に更新できます。
>>> with ipmmap.DataStructMmapEditor("DemoMmapStructure") as editor:
...      # writeData関数でIPMMAP構造体メンバ名をキーとして、値を挿入できます
...      editor.writeData("data_int", 10)
...      # 文字列はbytes型に変換する必要があります
...      editor.writeData("data_string", bytes("hello!", 'utf-8'))
...      # 構造体が入れ子になっていてもOKです
...      editor.writeData("data_xy.x", 123.45)
...      editor.writeData("data_xy.x", 543.21)
...      # updateLastUpdate関数で最終更新時刻を更新できます
...      editor.updateLastUpdate()
...

DataStructMmapReaderを使って共有メモリ領域を読み込む

  • DataStructMmapReaderクラスは読み込み権限のみを行うクラスで、用意された関数では、共有メモリ領域の読み込みのみ可能です。
  • このクラスもwith文をつかって共有メモリ領域にアクセスできます。このクラスを用いて共有メモリ領域にアクセスした場合、読み込み権限で排他ロックがかかります。
    • 読み込みロック中はその領域にたいして書き込みはできませんが、他プロセスが同時に読み込むことは可能です。
>>> import ipmmap
>>> ipmmap.DataStructMmapReader.setUserStructs(["demo_struct"])
>>> # 最終更新時刻のエポック秒が取得できます
>>> with ipmmap.DataStructMmapReader("DemoMmapStructure") as reader:
...     reader.getLastUpdate()
...
1663318271.859895
>>> # IPMMAP構造体メンバ名(文字列)をキーとして値を取得できます
>>> with ipmmap.DataStructMmapReader("DemoMmapStructure") as reader:
...     reader.readData("data_int")
...     reader.readData("data_string")
...     reader.readData("data_xy.x")
...     reader.readData("data_xy.y")
...
10
b'hello!'
123.45
543.21
>>>

gitリポジトリのREADME.mdとデモスクリプト(demo_editor.pyとdemo_reader.py)も参考にしてください。

Todo

  • ノンブロッキングモード(ロックが取れない場合、即座に復帰する)の追加
  • PyPIに登録
  • リポジトリをpoetryに対応 -> 対応しました!

参考

Discussion

ログインするとコメントできます