PydanticでQueueメッセージを型定義する
みてね事業部 Data Engineeringグループに所属しているkittchyです。
MLエンジニアとして、ML解析器の整備、精度改善などの業務に携わっています。
家族アルバムみてね
「家族アルバム みてね」(以下「みてね」)は、スマホで撮った子どもの写真や動画をかんたん共有、整理でき、家族で楽しく会話したり成長をふりかえることができるサービスです。
みてねData Engineeringグループでは顔検出や画像の分類情報など、Machine Learning (ML) の技術を利用してアップロードされた画像を解析し、みてねのさまざまな機能に活用しています。
ML解析器について
現在、ML 基盤として、数種類のML解析器をAmazon SQSを介して、パイプライン化して運用しています。それぞれのML解析器は共通の内部構造を持っており、主に以下の3つから構成されています。
- QueuePoller : SQSからJSONのジョブメッセージをポーリング
- QueueWorker: ML解析器にジョブを一つずつ送って、結果を受け取る
- QueueSender: 解析結果をJSON形式のメッセージに変換し、SQSに返す
これら三つのモジュールをマルチプロセスで動作させ、無駄な待機時間をなくす工夫をしています。また、モジュール間のメッセージはPythonのQueueを用いて、Dict型でやりとりしています。
図に直すと以下の通りになります。
問題点
この設計における従来の解析器の実装には、以下の問題がありました。
1. 各モジュール間のジョブメッセージに、何の値が含まれているかわからない問題
例えば、QueueWorker で、以下のようにjob_qからジョブを受け取り、処理を行っています。
def queue_worker(job_q: Queue)
# JobQueueからdequeueしてジョブを受け取り、一件ずつ処理
job: dict = self.job_q.get()
# jobの中身がわからない!!
ここで、jobの中身に何が入っているのか知りたくなった場合、 job_q.put()
が実行され、ジョブがenqueueされたところを探さないといけません。
そこで、QueuePollerからjob_qにenqueueしているところを見てみると...
class QueuePoller:
def __init__(self, job_q: Queue):
self.job_q
# ...省略...
async def run_async(self) -> None:
job: dict = await self.sqs.receive_message(...) # SQSからjobを取得
# job の中身は おそらく {"image_id": 1234}?
# JobQueueにenqueue
self.job_q.put(job)
jobはSQSから受け取ったメッセージをそのまま使っていそうですが、この場合、他のリポジトリのSQSにジョブを送っているところを確認する必要があります。
さらに、エディタでのコード補完もできないため、何度も確認する必要もあります。
このように、コードの可読性が下がってしまい、開発・運用コストが上がってしまっていました。
2. Dict型の値はValidationしづらい問題
ML解析器のQueuePollerはSQSからJSONのジョブメッセージをポーリングして、下流の解析タスクに渡すようにしています。
みてねの場合、ML解析器に投げるジョブの情報が複雑で、膨大であるため、パースしてValidationする際、膨大なコードを作成する必要があり、可読性が下がってしまう問題がありました。
解決方法
これらの問題点を解決するために、メッセージをDict型から構造体で型定義を行いました。
メッセージの型定義を行う方法として、FastAPIなどでも採用されているPydantic を用いています。
PydanticはPythonのdataclassesのように構造体を表現できるため、ジョブメッセージを構造体として各モジュール間で受け渡しできるようになります。さらに、型のValidationもデフォルトで行ってくれるため、間違った値が入ってきてもPydanticがエラーを出してくれます。その他、dictから簡単に構造体への変換、構造体からdictやJSONへの変換も簡単に行うことができるなど、高機能なツールです。
その他の候補
-
dataclasses: Pythonの標準で用意されている構造体の仕組み
- メリット:Pythonを用いて構造体を表現できる
- デメリット:構造体としての基本的な機能のみ
→ Pydanticであればその他の機能を付加して提供してくれているため、使いやすい。
-
Protocol Buffers : gRPCなどで主に用いられている構造体データをserialize/deserialize するための仕組み
- メリット:サーバとML解析器でprotocolの共有ができる(共通認識を揃えられる)
- デメリット:proto fileを共有しないといけないことや、pythonのコード補完もイマイチ効かない
→ PydanticはPythonの型定義をそのまま使うことができることや、IDEのコード補完が効くため、使いやすい。
from datetime import datetime
from pydantic import BaseModel, PositiveInt
class User(BaseModel):
id: int
name: str = 'John Doe'
signup_ts: datetime | None
external_data = {
'id': 123,
'signup_ts': '2019-06-01 12:22',
}
user = User(**external_data)
external_data = {
'id': "hoge",
'name': 123
'signup_ts': '2019-06-01 12:22',
}
user = User(**external_data)
# Parse Error!!
Pydantic導入フロー
対応方法として、以下のフローを取りました。
1. SQSからポーリングするメッセージと、解析結果のメッセージの確認
datamodel-code-generatorを用いてJSONファイルからPydanticの構造体を自動生成できます。
そこで、SQSからどのようなメッセージが入ってきているかをログを出力し、ファイルにまとめて以下のコマンドを実行します。
datamodel-codegen \\
--target-python-version 3.10 \\
--use-standard-collections \\
--input example.json \\
--input-file-type json \\
--snake-case-field \\
--output example.py \\
--class-name ExampleClass
例として、example.json
とdatamodel-code-generatorを用いて出力されたPydantic modelのexample.py
を載せておきます。
{ "faceUuid": "abcdefg", "faceNum": 1, "score": 0.5 }
from pydantic import BaseModel, Field
class ExampleClass(BaseModel):
face_uuid: str = Field(..., alias="faceUuid")
face_num: int = Field(..., alias="faceNum")
score: float
2. Pydantic型の調整
自動生成されたメッセージを、使いやすいように調整します。
例えばMLパイプラインのメタデータなど、ML解析器内で直接参照しなくて良いが、そのまま結果としてSQSに送りたい情報などです。これらの場合、詳しいvalidationはかけず、AnyやDictにして、簡略化してしまいました。
class SQSJob(BaseModel):
name: str
pipeline_metadata: Any
3. SQSからのメッセージをパース
あとは構造体を使ってJSONをパースしたり、JSONにエクスポートしたりします。パースの例を以下に挙げます。
try:
# ジョブメッセージのパース
with open(job_file) as f:
job_json = json.load(f)
job_body = SQSJob(**job_json)
except Exception as e:
# Error Message としてSQSに返す
return await process_error(
error=f"Parse message failed! {e.message}"
sqs=self.sqs,
queue=self.res_sqs,
)
JSONにExport する部分は以下の通りです。このようにたった1行だけでJSONに変換してくれます。
result_body_json = result_body.model_dump_json()
await self.sqs.send_message(QueueUrl=self.res_sqs, MessageBody=result_body_json)
4. JobQueueやResultQueueとのやり取り
Pydanticによる型定義のおかげで、QueueからのDequeueの値に型定義をつけることができます。例えば、以下のようにEnqueue側とDequeue側で型を簡単に揃えることができ、IDEなどで簡単に参照できるようになります。
# JobQueueにenqueue
self.job_q.put(sqs_job)
# JobQueueからジョブを受け取り、一件ずつ処理
hoge_job: SQSJob = job_q.get()
まとめ
Pydanticを用いた型定義を使って、ML解析器の保守性を高めました。
さらに、IDEなどでコード補完もできるようになり、開発効率も向上しています。
ここまで読んでいただきありがとうございました。みてねのData Engineeringグループは、これからもMLパイプラインの改善についての発信を続けていきます。
Discussion