🚠

Rails アプリで OpenAI Batch を使って数十万のデータを生成した

2024/12/23に公開

この記事は、Ruby on Rails Advent Calendar 2024 Advent Calendar 2024 の23日目です。

はじめに

YAMAP のプロダクトでは先日、活動日記でふりかえる山歩 2024 と題し、ユーザーの皆さんが1年間の活動をふりかえられるデータをAIで生成し、サービス内で提供しました。

実際に作成されたふりかえりデータは、ユーザさんがシェアしてくれたモーメント(ツイートのような機能) 内のリンクから見ることができますので、ぜひご覧ください。(アカウント登録が必要です)

このプロジェクトの構想は、元々は別のメンバーが描いたもので、昨年にPoCを行い一部のユーザ限定に提供されました。

今年はこれをシステム化し、多くのユーザに届けられるようにアップデートしています。

ウェブやアプリのフロントエンド、API実装やインフラ構築、プロンプト作成など、多くのタスクは私以外のメンバーが担当しています。

そのおかげで、私は自分の担当領域であった OpenAI Batch との連携部分に集中できました。

この記事では私が担当した Rails アプリと OpenAI Batch との連携にフォーカスして書いています。ただし、このプロジェクトには、昨年の PoC に関わったメンバーや、今年のプロジェクトチームの協力が欠かせませんでした。この場を借りて感謝を伝えたいと思います。

OpenAI Batch とは

OpenAI Batch は、OpenAI APIを利用する際に、大量のリクエストを効率的に処理するための仕組みです。一度に複数のリクエストを送信し、並列処理で応答を受け取ることで、パフォーマンスを向上させます。

特に、大量のデータを扱うアプリケーションやバッチ処理が必要なプロジェクトに適しており、API利用のコスト削減やスループット向上を図ることができます。この機能により、時間やリソースを節約しながら、より効率的な開発が可能となります。

ただし、OpenAI Batchでは一般的なAPIとは異なり生成結果を即時のレスポンスで受け取れないため、結果を後から取得する仕組みが必要です。このため、結果の管理や処理を効率化する工夫が求められます。

設計で意識したこと

今回のプロジェクトでは「大量のデータを API インターフェイスに特徴がある OpenAI Batch を使って処理する」という点が重要な要素であると言えますが、これは決してこのプロジェクトに限った話ではなく、OpenAI Batch を利用するあらゆる機能で共通の課題といえます。そこで以下の点を意識して設計をしました。

  • 将来的な再利用性を意識する
    • 同じ仕組みを他のプロジェクトにも活用できるよう、コードをモジュール化する。
  • アプリケーションの柔軟性を保つ
    • OpenAI Batch とのインテグレーションは一般的な API と比べるとフローが複雑であるため、適切なエラーハンドリングや非同期処理の設計が重要です。

Packwerk による OpenAI Batch 接合部分のパッケージ化

我々のアプリでは Packwerk をもちいたモジュラーモノリスを採用しています。

OpenAI Batch と接合する部分についても Packwerk を用いてパッケージ化をしました。(以後 「OpenAI Batch パッケージ」と呼ぶことにします。)

OpenAI Batch との接合箇所をアプリ内に OpenAI Batch パッケージとして構成することで、他のコンポーネントと独立して動作することを約束でき、将来的に今回以外のプロジェクトで大量のデータ生成を OpenAI に依頼したくなった場合にもこのパッケージを再利用できるようになります。

OpenAI Batch パッケージの設計

OpenAI Batch パッケージは、大量のプロンプトを簡単で効率的に処理するための非同期バッチ処理を行うためのインターフェイスを提供します。

OpenAI と API でコミュニケーションしながらステータスを追跡し、アプリ内のモデルでデータを管理します。また、PubSubを活用したイベント駆動により他のコンポーネントが OpenAI Batch パッケージに処理を依頼したり、結果を受け取ったりすることができます。

この設計により、信頼性と柔軟性を両立させたAI生成プロセスを構築しました。

OpenAIBatch モデル

このパッケージでは特に openai_batches というテーブルを OpenAIBatch という ActiveRecord のモデルを介して扱います。

openai_batches テーブル

カラム 説明
id bigint PK
status string NOT NULL ("INITIATED" "IN_PROGRESS" "COMPLETED" "FAILED" "CANCELED" "EXPIRED")
custom_type string NOT NULL (バッチの種類の識別子)
openai_input_file_id string NULL (OpenAI Batch への入力となるファイルID)

custom_type: バッチの種類を識別するための値で、アプリケーションが適切にバッチの種類を管理するために利用されます。OpenAI Batch では、複数のプロンプト(AI生成リクエスト)を1つの API リクエストで送信可能です。また、この際 各プロンプトにはリクエスト内で一意の識別子 custom_id を設定することが義務付けられています。しかし、アプリ側の実装の観点では custom_id だけではなく、バッチの種類を識別するための値がある方がハンドリングが容易であると考えこのカラムを設けています。

状態遷移

私の経験上、システムを安定させるためには状態遷移を明確にすることが不可欠です。
特に今回のような外部APIを利用した非同期処理では、状態が不明確だとエラー処理やリトライのロジックが複雑化し、システム全体の信頼性が低下してしまいます。

OpenAI 側の Batch には、8つのステータス(下図の小文字で表したステータス)があります。これらのステータスをそのまま利用することも可能ですが、アプリケーション全体で扱いやすいように6つ(大文字のステータス)に再整理しました。

状態遷移を支える実装

状態遷移を支える実装の処理の流れは次の図の通りです。

詳しく見ていきます。

このパッケージのエントリーポイントは OpenAIBatches::RequestService です。このパッケージを利用するコンポーネントは 複数のプロンプトを含んだ入力ファイルを引数に与え OpenAIBatches::RequestService を呼び出します。OpenAIBatches::RequestService はこのファイルを OpenAI API(POST /v1/files) でアップロードして input_file_id を取得したら、それを openai_batches レコードとして保存し、OpenAIBatches::RegisterJob を非同期で呼び出します。

この時点で、OpenAI にバッチ処理を依頼する準備が整ったことになります。

OpenAIBatches::RegisterJob は OpenAIBatches::RegisterService を呼びだし、そこでは OpenAI API (POST /v1/batches)を用いてバッチを登録します。登録されたバッチID は openai_batches.openai_batch_id に保存し、OpenAIBatches::SyncWhileInProgressJob を非同期で実行します。この際、バッチ登録直後は OpenAI 側でのバッチ処理が完了していないので、ActiveJob の wait を用いて初回の実行は数十分後に設定しています。

OpenAIBatches::SyncWhileInProgressjob では、openai_batches.status を確認し "IN_PROGRESS" であれば OpenAIBatches::SyncService を呼び出し、OpenAI API を用いて最新のステータスを確認したら必要に応じて openai_batches.status を更新します。更新された場合は OpenAIBatches::StatusChanged イベントを publish します。更新がない場合は retry_job(wait:) を用いて一定時間間隔で SyncWhileInProgressJob を再スケジュールします。

OpenAI Batch パッケージの責務はここまでです。この先は OpenAI Batch パッケージを利用するコンポーネント側の実装について説明していきます。

OpenAI Batch パッケージを使うコンポーネント側の実装

OpenAI Batch パッケージを使うコンポーネントでは主に「バッチ処理の依頼」と「バッチ処理結果の受け取り」の2点を実装をすることになります。

バッチ処理の依頼

今回のプロジェクトでは 数十万のテキスト生成を AI で行いました。この処理のエントリーポイントは MaintenanceTask ととしました。

class Maintenance::AwesomeFeature::GenerateAITextTask < MaintenanceTasks::Task
  csv_collection(100)

  def process(rows)
    user_ids = rows.pluck(:user_id)

    users = User.where(id: user_ids)

    # 複数のプロンプトを含むファイルを生成する
    file = AwesomeFeature::MakeOpenAIInputReadIO.call(users:)

    # OpenAI Batch パッケージ経由で OpenAI にバッチ処理を依頼する
    OpenAIBatches::RequestService.call(custom_type: 'awesome_feature', file:)
  end
end

CSVで対象となるユーザのIDを受け取り、100ユーザごとに必要なプロンプトを含むファイルを生成し、OpenAIBatches::RequestService を使ってバッチ処理を依頼します。

バッチ処理を依頼する際には custom_type を設定しておきます。こうすることで結果を受け取るサブスクライバーが処理を継続するか無視するかを判断することができます。

バッチ処理結果を受け取る

状態遷移を支える実装のセクションでも触れた通り、openai_batches.status の変更時には OpenAIBatches::StatusChanged イベントを発行するように構成しました。そのため、OpenAI Batch パッケージを利用するコンポーネント側では OpenAIBatches::StatusChanged イベントをサブスクライブするサブスクライバーを実装しておけばよいことになります。

ちなみに 我々のプロダクトでは PubSub を実現するために downstream を使っているため、イベントのサブスクライブは以下のようになります。

Rails.application.config.to_prepare do
  ActiveSupport.on_load 'downstream-events' do |store|
    store.subscribe(AwesomeFeature::OpenAIBatchSubscriber,
                    to: OpenAIBatches::StatusChanged, async: true)
end

OpenAI Batch パッケージでパブリッシュされるこのイベントには openai_batches レコードの id を設定しています。そのため、サブスクライバーではレコードを選択し status や custom_type に基づいて適切に処理を進めることになります。

class AwesomeFeature::OpenAIBatchSubscriber
  def self.call(event) = new(event).call

  def initialize(event) = @event = event

  def call
    batch_id = event.data.fetch(:batch_id)
    openai_batch = OpenAIBatch.find_by(batch_id)

    return unless openai_batch

    # このサブスクライバーでは 'awesome_feature' 以外の処理はしない
    return if openai_batch.custom_type != 'awesome_feature'

    case openai_batch.status
    when 'FAILED', 'EXPIRED', 'CANCELED'
      Rails.logger.warn("OpenAI Batch was #{openai_batch.status}. (id: #{openai_batch.id})")
    when 'COMPLETED'
      # OpenAIから生成結果を取得する
      responses = OpenAIBatches::RetrieveResponsesService.call(openai_batch:)
      # => OpenAI::Client.new
      #      .batches.retrieve(id: openai_batch.openai_batch_id)
      #      .then {|batch| OpenAI::Client.new.files.content(id: batch['output_file_id']) }

      responses.each do |response|
        # response.custom_id を参考にして、データベースへの保存をはじめとした適切な処理を行う
        # ...
      end
    end
  end
end

進めていく中での課題と対策

今回のプロジェクトでは、いくつかの課題に直面しました。その中でも特に重要だったのが、OpenAI 側の都合でバッチ処理が期限切れ(expired)となる問題です。

今回のプロジェクトでは約5%のバッチリクエストが期限切れとなり、アプリケーションから再実行する必要がありました。このため、バッチの進行状況を定期的に把握することが必要でした。

しかし、OpenAI のウェブダッシュボードは一覧性に乏しく、今回のように大量にバッチを登録した場合はそれらのステータス(in_progress や completed、expires の数など)を確認するのが難しい状況でした。

そこで、独自のコマンドラインツール oaib を作成し、バッチのステータスを定期的にモニタリングできるようにしました。このツールにより、プロジェクトの管理効率が向上したと感じています。

(実行例) バッチのステータスや進捗状況を確認する

bash$ ./exe/oaib list --before batch_67568fdbdfc8 | jq -r -f jq/simple.jq
batch_6758ffeda4bc  completed       224/224 0       2024-12-11T02:58:53Z    2024-12-11T03:14:45Z
batch_6757e4657048  completed       232/232 0       2024-12-10T06:49:09Z    2024-12-10T07:24:08Z
batch_6756bac14370  completed       175/175 0       2024-12-09T09:39:13Z    2024-12-09T10:00:02Z
batch_6756babf5abc  completed       384/384 0       2024-12-09T09:39:11Z    2024-12-09T19:39:27Z

また、正常終了したバッチにも課題がありました。レスポンスを精査すると、一部の結果の finish_reason が "stop" ではない(=モデルの停止条件に到達しない)ケースが含まれており、AIの応答が途中で停止していることが判明しました。この確認作業はウェブダッシュボードを使うとさらに煩雑で、ウェブダッシュボードから対象のバッチを見つけて、そこからアウトプットファイルをダウンロードし、JSONL データを解析する必要がありました。この問題に対しても、「oaib」を活用し、レスポンスデータを効率的に分析・可視化できるようにしました。

(実行例) 出力結果を含めて確認する(--expands output

bash$ ./exe/oaib list --status completed --expands output \
        | jq -r '._output[] | [.custom_id .response.body.choices[].finish_reason] | @tsv'
        | grep -v stop

ツールの開発を通じて、OpenAI Batch の課題を解決しつつ、プロジェクトの進行をスムーズに進めることができました。

まとめ

  • Rails アプリケーションから OpenAI Batch を活用し、効率的なAIテキスト生成プロセスを構築しました。非同期処理や状態遷移の管理により、システム全体の信頼性と拡張性を向上させています。
  • バッチの期限切れやAI応答の不完全性といった課題に直面し、それらを解決するためにコマンドラインツール oaib を開発しました。

OpenAI Batch の活用やプロジェクト設計に関して、少しでも参考になれば幸いです。


この記事に登場する図やスニペットは説明のために簡略化・変更していますが、実際の設計や実装からは遠くないです。参考にしていただけると幸いです。


おしまい 🍥

YAMAP テックブログ

Discussion