MIXI DEVELOPERS NOTE
🔍

PydanticでQueueメッセージを型定義する

2024/08/01に公開

みてね事業部 Data Engineeringグループに所属しているkittchyです。
MLエンジニアとして、ML解析器の整備、精度改善などの業務に携わっています。

家族アルバムみてね

「家族アルバム みてね」(以下「みてね」)は、スマホで撮った子どもの写真や動画をかんたん共有、整理でき、家族で楽しく会話したり成長をふりかえることができるサービスです。
みてねData Engineeringグループでは顔検出や画像の分類情報など、Machine Learning (ML) の技術を利用してアップロードされた画像を解析し、みてねのさまざまな機能に活用しています。

ML解析器について

現在、ML 基盤として、数種類のML解析器をAmazon SQSを介して、パイプライン化して運用しています。それぞれのML解析器は共通の内部構造を持っており、主に以下の3つから構成されています。

  • QueuePoller : SQSからJSONのジョブメッセージをポーリング
  • QueueWorker: ML解析器にジョブを一つずつ送って、結果を受け取る
  • QueueSender: 解析結果をJSON形式のメッセージに変換し、SQSに返す

これら三つのモジュールをマルチプロセスで動作させ、無駄な待機時間をなくす工夫をしています。また、モジュール間のメッセージはPythonのQueueを用いて、Dict型でやりとりしています。
図に直すと以下の通りになります。

問題点

この設計における従来の解析器の実装には、以下の問題がありました。

1. 各モジュール間のジョブメッセージに、何の値が含まれているかわからない問題

例えば、QueueWorker で、以下のようにjob_qからジョブを受け取り、処理を行っています。

   def queue_worker(job_q: Queue)
       # JobQueueからdequeueしてジョブを受け取り、一件ずつ処理
       job: dict = self.job_q.get()
       # jobの中身がわからない!!

ここで、jobの中身に何が入っているのか知りたくなった場合、 job_q.put()が実行され、ジョブがenqueueされたところを探さないといけません。
そこで、QueuePollerからjob_qにenqueueしているところを見てみると...

   class QueuePoller:
       def __init__(self, job_q: Queue):
           self.job_q
           # ...省略...
       async def run_async(self) -> None:
           job: dict = await self.sqs.receive_message(...) # SQSからjobを取得
           # job の中身は おそらく {"image_id": 1234}?

           # JobQueueにenqueue
           self.job_q.put(job)

jobはSQSから受け取ったメッセージをそのまま使っていそうですが、この場合、他のリポジトリのSQSにジョブを送っているところを確認する必要があります。
さらに、エディタでのコード補完もできないため、何度も確認する必要もあります。

このように、コードの可読性が下がってしまい、開発・運用コストが上がってしまっていました。

2. Dict型の値はValidationしづらい問題

ML解析器のQueuePollerはSQSからJSONのジョブメッセージをポーリングして、下流の解析タスクに渡すようにしています。
みてねの場合、ML解析器に投げるジョブの情報が複雑で、膨大であるため、パースしてValidationする際、膨大なコードを作成する必要があり、可読性が下がってしまう問題がありました。

解決方法

これらの問題点を解決するために、メッセージをDict型から構造体で型定義を行いました。
メッセージの型定義を行う方法として、FastAPIなどでも採用されているPydantic を用いています。

PydanticはPythonのdataclassesのように構造体を表現できるため、ジョブメッセージを構造体として各モジュール間で受け渡しできるようになります。さらに、型のValidationもデフォルトで行ってくれるため、間違った値が入ってきてもPydanticがエラーを出してくれます。その他、dictから簡単に構造体への変換、構造体からdictやJSONへの変換も簡単に行うことができるなど、高機能なツールです。

その他の候補
  1. dataclasses: Pythonの標準で用意されている構造体の仕組み
    • メリット:Pythonを用いて構造体を表現できる
    • デメリット:構造体としての基本的な機能のみ
      → Pydanticであればその他の機能を付加して提供してくれているため、使いやすい。
  2. Protocol Buffers : gRPCなどで主に用いられている構造体データをserialize/deserialize するための仕組み
    • メリット:サーバとML解析器でprotocolの共有ができる(共通認識を揃えられる)
    • デメリット:proto fileを共有しないといけないことや、pythonのコード補完もイマイチ効かない
      → PydanticはPythonの型定義をそのまま使うことができることや、IDEのコード補完が効くため、使いやすい。
from datetime import datetime
from pydantic import BaseModel, PositiveInt

class User(BaseModel):
    id: int
    name: str = 'John Doe'
    signup_ts: datetime | None

external_data = {
    'id': 123,
    'signup_ts': '2019-06-01 12:22',
}

user = User(**external_data)

external_data = {
    'id': "hoge",
    'name': 123
    'signup_ts': '2019-06-01 12:22',
}

user = User(**external_data)
# Parse Error!!

Pydantic導入フロー

対応方法として、以下のフローを取りました。

1. SQSからポーリングするメッセージと、解析結果のメッセージの確認

datamodel-code-generatorを用いてJSONファイルからPydanticの構造体を自動生成できます。
そこで、SQSからどのようなメッセージが入ってきているかをログを出力し、ファイルにまとめて以下のコマンドを実行します。

datamodel-codegen \\
  --target-python-version 3.10 \\
  --use-standard-collections \\
  --input example.json \\
  --input-file-type json \\
  --snake-case-field \\
  --output example.py \\
  --class-name ExampleClass

例として、example.json とdatamodel-code-generatorを用いて出力されたPydantic modelのexample.py を載せておきます。

{ "faceUuid": "abcdefg", "faceNum": 1, "score": 0.5 }
from pydantic import BaseModel, Field

class ExampleClass(BaseModel):
    face_uuid: str = Field(..., alias="faceUuid")
    face_num: int = Field(..., alias="faceNum")
    score: float

2. Pydantic型の調整

自動生成されたメッセージを、使いやすいように調整します。
例えばMLパイプラインのメタデータなど、ML解析器内で直接参照しなくて良いが、そのまま結果としてSQSに送りたい情報などです。これらの場合、詳しいvalidationはかけず、AnyやDictにして、簡略化してしまいました。

class SQSJob(BaseModel):
    name: str
    pipeline_metadata: Any

3. SQSからのメッセージをパース

あとは構造体を使ってJSONをパースしたり、JSONにエクスポートしたりします。パースの例を以下に挙げます。

try:
    # ジョブメッセージのパース
    with open(job_file) as f:
        job_json = json.load(f)
    job_body = SQSJob(**job_json)
except Exception as e:
    # Error Message としてSQSに返す
    return await process_error(
        error=f"Parse message failed! {e.message}"
        sqs=self.sqs,
        queue=self.res_sqs,
    )

JSONにExport する部分は以下の通りです。このようにたった1行だけでJSONに変換してくれます。

result_body_json = result_body.model_dump_json()
await self.sqs.send_message(QueueUrl=self.res_sqs, MessageBody=result_body_json)

4. JobQueueやResultQueueとのやり取り

Pydanticによる型定義のおかげで、QueueからのDequeueの値に型定義をつけることができます。例えば、以下のようにEnqueue側とDequeue側で型を簡単に揃えることができ、IDEなどで簡単に参照できるようになります。

# JobQueueにenqueue
self.job_q.put(sqs_job)

# JobQueueからジョブを受け取り、一件ずつ処理
hoge_job: SQSJob = job_q.get()

まとめ

Pydanticを用いた型定義を使って、ML解析器の保守性を高めました。
さらに、IDEなどでコード補完もできるようになり、開発効率も向上しています。

ここまで読んでいただきありがとうございました。みてねのData Engineeringグループは、これからもMLパイプラインの改善についての発信を続けていきます。

MIXI DEVELOPERS NOTE
MIXI DEVELOPERS NOTE

Discussion