📖

OpenAI Realtime APIをLangchainで利用しよう!

2024/10/26に公開

概要

OpenAIのRealtime APIが出て久しいですね。
料金は高めですが、非常に強力なツールだと思います。実装して色々使ってみたいと思いますが、自分は基本的にLLM開発にはLangchainを利用しているので、Realtime APIでもLangchainを利用できなかなと思ってました。そしたらLangchain公式で下記のように強力なコードを公開していました!

https://github.com/langchain-ai/react-voice-agent/tree/main

さすが対応が早いですね。
これを使うとlangchainのagentなどがそのまま利用できます!
Web検索した結果を利用したりできるのでさまざまなユースケースで利用できそうですね☺️

これを利用して色々試してみたのでその備忘録を残します。

注意

まず一つ開発にあたっての注意なのですが、基本的にWebRTCを利用してクライアントとサーバのやり取りは実施しましょう。
理由は普通にWebsocketとかで実装してしまうと、エコーキャンセレーションを自前で実装したりちゃんと動作するライブラリを利用しない場合Realtime APIが返却してスピーカーから発せられた音声をマイクが拾ってしまい、ループバックが発生し予期せぬ重課金が発生してしまうなんてことになります。この課題を自力で解決しようとすると結構大変なので、Web会議用のプロトコルとして多くの実績を出しているWebRTCを利用するようにした方が楽です。個人的にはLiveKitがOSSで個人で試すのに便利だなと思います。

https://github.com/livekit

livekitのWebRTCのサーバに対してバックエンドからクライアントで接続して音声の取得、送信を行うようなイメージです。またWebRTCにしておけば映像も取得できるのでRealtime APIが将来マルチモーダル対応したときに映像の取得等しやすくなるかなとも思います。

構成

上記注意点を踏まえてざっくりとした構成図は下記のような感じです。

この構成図に示したように、システムは大きく4つのコンポーネントで構成されています。ユーザーインターフェースとなるクライアントアプリケーション、WebRTCサーバとしてのLiveKit、制御を担当するバックエンドサーバ、そして音声対話のコアとなるOpenAI Realtime APIです。

特徴的なのは、バックエンドサーバの二重の役割です。バックエンドサーバは、クライアントからのWebSocket接続を受け付けると同時に、LiveKitサーバに対してWebRTCクライアントとしても接続します。この構成により、音声データの安全な取り回しと、効率的な処理が可能になっています。

具体的な処理の流れを見ていきましょう:

  1. クライアントアプリケーションは、まずLiveKitサーバとWebRTC接続を確立し、音声通信のチャネルを準備します
  2. 同時に、バックエンドサーバとWebSocket接続を確立し、システムの制御チャネルを確保します
  3. バックエンドサーバは、LiveKitサーバにWebRTCクライアントとして接続し、音声データの取得準備を整えます
  4. ユーザーが話した音声は、WebRTC経由でLiveKitサーバに送られます
  5. バックエンドサーバは、その音声データを取得してOpenAI Realtime APIに転送します
  6. OpenAIからの応答音声は、バックエンドサーバを経由してLiveKitに送られ、最終的にクライアントで再生されます

この構成は、Langchain公式が提供しているreact-voice-agentのコードと組み合わせることで、Realtime APIの機能をLangchainのエコシステムの中で効率的に利用することができます。また、将来的なOpenAI Realtime APIのマルチモーダル対応も見据えた拡張性も備えることができると思います。

実装

とりあえず実装の全体を示します。
まずがバックエンドです。

サーバサイド

async def process_virtual_user_chat_with_voice(
    self
) -> OpenAIVoiceReactAgent:
    try:
        agent = OpenAIVoiceReactAgent(
            model="gpt-4o-realtime-preview",
            tools=[
            ],
            instructions=f"あなたは質問に適切に生成するインタビュアーです。",
        )

        return agent
    except Exception as e:
        logger.error(f"Error processing chat: {e!s}")
        raise e
@router.websocket("/ws")
async def websocket_endpoint(
    websocket: WebSocket,
    chat_room_id: str,
) -> None:
    await websocket.accept()

    room = rtc.Room()
    sample_rate = 24000  # サンプルレートは適切に設定してください
    channels = 1  # チャンネル数は適切に設定してください
    source = rtc.AudioSource(sample_rate, channels, queue_size_ms=500)
    local_track = rtc.LocalAudioTrack.create_audio_track(
        "audio-track",
        source,
    )
    options = rtc.TrackPublishOptions(
        source=rtc.TrackSource.SOURCE_MICROPHONE,
    )
    audio_buffer: asyncio.Queue[bytes] = asyncio.Queue(maxsize=50)

    async def audio_processor() -> None:
        while True:
            audio_data = await audio_buffer.get()
            frame = rtc.AudioFrame(
                audio_data,
                sample_rate,
                channels,
                len(audio_data) // (channels * 2),
            )
            await source.capture_frame(frame)

    @room.on("participant_connected")
    def on_participant_connected(participant: rtc.RemoteParticipant) -> None:
        logger.info(f"参加者が接続しました: {participant.sid} {participant.identity}")

    async def receive_frames(stream: rtc.VideoStream) -> None:
        async for frame_event in stream:
            # ビデオフレームの処理をここで行う
            pass

    async def receive_audio_frames(stream: rtc.AudioStream) -> AsyncIterator[str]:
        async for frame_event in stream:
            array_buffer = frame_event.frame.data
            pcm_data = np.frombuffer(array_buffer, dtype=np.int16)
            base64_encoded = base64.b64encode(pcm_data.tobytes()).decode("utf-8")
            json_data = json.dumps(
                {
                    "type": "input_audio_buffer.append",
                    "audio": base64_encoded,
                },
            )
            yield json_data

    async def send_output_chunk(chunk: str) -> None:
        try:
            data = json.loads(chunk)
            if data["type"] == "response.audio.delta":
                audio_base64 = data["delta"]
                audio_data = base64.b64decode(audio_base64)
                if audio_buffer.full():
                    # If the buffer is full, remove the oldest item
                    _ = await audio_buffer.get()
                await audio_buffer.put(audio_data)
        except Exception as e:
            logger.error(f"音声データの処理中にエラーが発生しました: {e}")

    @room.on("track_subscribed")
    def on_track_subscribed(
        track: rtc.Track,
        publication: rtc.RemoteTrackPublication,
        participant: rtc.RemoteParticipant,
    ) -> None:
        logger.info(f"トラックが購読されました: {publication.sid}")
        if track.kind == rtc.TrackKind.KIND_VIDEO:
            video_stream = rtc.VideoStream(track)
            asyncio.ensure_future(receive_frames(video_stream))
        elif track.kind == rtc.TrackKind.KIND_AUDIO:
            audio_stream = rtc.AudioStream(track=track, sample_rate=24000)
            usecase = VirtualUserChatUsecase()
            agent_task = asyncio.ensure_future(
                usecase.process_virtual_user_chat_with_voice(
                    chat_room_id=chat_room_id,
                ),
            )
            agent_task.add_done_callback(
                lambda t: asyncio.ensure_future(
                    t.result().aconnect(
                        receive_audio_frames(audio_stream),
                        send_output_chunk,
                    ),
                ),
            )

    try:
        token = (
            AccessToken(API_KEY, API_SECRET)
            .with_identity(str(uuid.uuid4()))
            .with_name("WebSocket参加者")
            .with_grants(
                VideoGrants(
                    room_join=True,
                    room=chat_room_id,
                    can_publish=True,
                    can_subscribe=True,
                    can_publish_data=True,
                ),
            )
            .to_jwt()
        )

        await room.connect(
            LIVEKIT_HOST,
            token,
            options=rtc.RoomOptions(
                auto_subscribe=True,
            ),
        )
        logger.info(f"ルーム {room.name} に接続しました")
        audio_processor_task = asyncio.create_task(audio_processor())
        await room.local_participant.publish_track(local_track, options)
        for participant in room.remote_participants.values():
            for publication in participant.track_publications.values():
                logger.info(f"トラック公開: {publication.sid}")

        while True:
            data = await websocket.receive_text()
            # WebSocketからのメッセージ処理をここで行う

    except WebSocketDisconnect:
        logger.info("WebSocket接続が切断されました")
    except Exception as e:
        logger.error(f"エラーが発生しました: {e}")
    finally:
        audio_processor_task.cancel()
        await room.disconnect()
        logger.info("ルームから切断しました")

1. 仮想ユーザーエージェントの実装

async def process_virtual_user_chat_with_voice(self) -> OpenAIVoiceReactAgent:

このメソッドでは以下の処理を行っています:

  • GPT-4のリアルタイムプレビューモデルを使用したエージェントの初期化
  • インタビュアーとしての役割設定
  • 例外発生時の適切なエラーハンドリング

2. WebSocketエンドポイントの実装

WebSocketエンドポイントでは、以下の重要な機能を提供しています:

2.1 音声処理の初期設定

room = rtc.Room()
sample_rate = 24000
channels = 1
source = rtc.AudioSource(sample_rate, channels, queue_size_ms=500)
  • 24kHzのサンプルレートでの音声処理
  • モノラルチャンネルの設定
  • 500msのキューサイズによる音声バッファリング

2.2 音声データの処理フロー

主な処理フローは以下の通りです:

  1. 音声フレームの受信処理
async def receive_audio_frames(stream: rtc.AudioStream) -> AsyncIterator[str]:
  • 音声フレームのPCMデータ変換
  • Base64エンコーディング
  • JSON形式でのデータ送信
  1. 出力音声の処理
async def send_output_chunk(chunk: str) -> None:
  • 音声データのJSON形式での受信
  • Base64デコードによるバイナリデータ変換
  • 音声バッファの管理

3. LiveKitとの統合

3.1 参加者管理機能

@room.on("participant_connected")
def on_participant_connected(participant: rtc.RemoteParticipant) -> None:
  • 新規参加者の接続イベントのハンドリング
  • 参加者情報のログ記録

3.2 トラック管理機能

@room.on("track_subscribed")
def on_track_subscribed(track: rtc.Track, publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant) -> None:
  • ビデオ/音声トラックの購読処理
  • 仮想ユーザーチャットの初期化と接続

4. エラーハンドリングとセキュリティ

LiveKitとの接続認証

token = AccessToken(API_KEY, API_SECRET)
    .with_identity(str(uuid.uuid4()))
    .with_name("WebSocket参加者")
    .with_grants(VideoGrants(...))
    .to_jwt()
  • クライアントからLiveKitにアクセスするための認証情報の発行

クライアント

先ほどのバックエンドに接続するのは下記です。
Flutterで実装しています。

import 'dart:async';

import 'package:flutter/material.dart';
import 'package:flutter_hooks/flutter_hooks.dart';
import 'package:frontendflutter/widgets/voice_recorder/api/livekit_gateway.dart';
import 'package:livekit_client/livekit_client.dart';

class VoiceRecordScreen extends HookWidget {
  final String chatRoomId;

  const VoiceRecordScreen({
    super.key,
    required this.chatRoomId,
  });

  
  Widget build(BuildContext context) {
    final room = useMemoized(
        () => Room(
              roomOptions: const RoomOptions(
                adaptiveStream: true,
                dynacast: true,
                defaultAudioPublishOptions:
                    AudioPublishOptions(audioBitrate: AudioPreset.music),
                defaultVideoPublishOptions: VideoPublishOptions(),
              ),
            ),
        []);
    final isConnected = useState(false);
    final isLoading = useState(true);
    final error = useState<String?>(null);
    final amplitude = useState<double>(0);
    final liveKitGateway = useMemoized(() => LiveKitGateway(), []);
    final participants = useState<List<RemoteParticipant>>([]);

    void _onRoomDidUpdate() {
      participants.value = room.remoteParticipants.values.toList();
    }

    void _onAudioLevelChanged() {
      final level = room.localParticipant?.audioLevel;
      amplitude.value = level ?? 0;
    }

    Future<void> connectToRoom() async {
      try {
        isLoading.value = true;
        error.value = null;

        final tokenResponse = await liveKitGateway.createToken(
          identity: '${userId}_${DateTime.now().millisecondsSinceEpoch}',
          name: 'ユーザー',
          roomNumber: int.parse(chatRoomId),
        );

        if (tokenResponse == null) {
          throw Exception('トークンの取得に失敗しました');
        }

        final token = tokenResponse.token;

        await room.connect(
          'ws://localhost:7880',
          token,
        );

        isConnected.value = true;

        room.addListener(_onRoomDidUpdate);

        await room.localParticipant?.setMicrophoneEnabled(true);
        await room.localParticipant?.setCameraEnabled(false);

        isLoading.value = false;
      } catch (e) {
        print('ルームへの接続に失敗しました: $e');
        error.value = 'ルームへの接続に失敗しました: $e';
        isLoading.value = false;
      }
    }

    useEffect(() {
      connectToRoom();
      liveKitGateway.connect(
        chatRoomId: chatRoomId,
      );
      final _timer = Timer.periodic(const Duration(milliseconds: 100), (timer) {
        _onAudioLevelChanged();
      });
      return () {
        _timer.cancel();
        liveKitGateway.dispose();
        room.removeListener(_onRoomDidUpdate);
        room.disconnect();
      };
    }, []);

    if (isLoading.value) {
      return Center(child: CircularProgressIndicator());
    }

    if (error.value != null) {
      return Center(child: Text(error.value!));
    }

    return Center(
      child: Column(
        mainAxisAlignment: MainAxisAlignment.center,
        children: [
          Text(
            isConnected.value ? '接続完了' : '接続中...',
            style: Theme.of(context).textTheme.titleMedium,
          ),
          const SizedBox(height: 20),
          AudioVisualizer(amplitude: amplitude.value),
          const SizedBox(height: 20),
          Text('参加者数: ${participants.value.length}'),
        ],
      ),
    );
  }
}

class AudioVisualizer extends StatelessWidget {
  final double amplitude;

  const AudioVisualizer({Key? key, required this.amplitude}) : super(key: key);

  
  Widget build(BuildContext context) {
    return Container(
      width: 200,
      height: 200,
      decoration: BoxDecoration(
        shape: BoxShape.circle,
        color: Colors.blue.withOpacity(0.1),
      ),
      child: CustomPaint(
        painter: AudioVisualizerPainter(amplitude: amplitude),
      ),
    );
  }
}

class AudioVisualizerPainter extends CustomPainter {
  final double amplitude;

  AudioVisualizerPainter({required this.amplitude});

  
  void paint(Canvas canvas, Size size) {
    final center = Offset(size.width / 2, size.height / 2);
    final radius = size.width / 2;
    final paint = Paint()
      ..color = Colors.blue
      ..style = PaintingStyle.stroke
      ..strokeWidth = 2;

    final normalizedAmplitude = amplitude.clamp(0.0, 1.0);
    final currentRadius = radius * normalizedAmplitude;
    canvas.drawCircle(center, currentRadius, paint);
  }

  
  bool shouldRepaint(AudioVisualizerPainter oldDelegate) {
    return oldDelegate.amplitude != amplitude;
  }
}

import 'dart:convert';
import 'package:frontendflutter/shared/openapi/openapi.swagger.dart';
import 'package:frontendflutter/shared/rest_api/rest_api.dart';
import 'package:frontendflutter/shared/websocket/websocket_client.dart';

class LiveKitGateway {
  final RestApi _restApi = RestApi();
  late WebSocketClient _wsClient;
  static const String _defaultPath = "/realtime_chat/ws";

  LiveKitGateway() {
    _wsClient = WebSocketClient();
  }

  Future<void> connect(
      {required String chatRoomId}) async {
    Map<String, dynamic>? queryParams = {
      'chat_room_id': chatRoomId,
      'virtual_user_id': virtualUserId,
      'user_id': userId,
    };
    await _wsClient.connect(_defaultPath, queryParams: queryParams);
  }

  Future<TokenResponse?> createToken({
    required String identity,
    required String name,
    required int roomNumber,
  }) async {
    final response = await _restApi.client.realtimeChatCreateTokenPost(
      body: ParticipantCreate(
        identity: identity,
        name: name,
        roomNumber: roomNumber,
      ),
    );

    if (response.statusCode != 200) {
      throw Exception('トークンの取得に失敗しました');
    }

    return response.body;
  }

  void dispose() {
    _wsClient.dispose();
  }
}

バックエンドもフロントエンドもブログ用に修正していますが動作確認は取れてないです。。。
動いたものを修正しているので、ちょっと手を加えれば動くと思います汗

まとめ

今回はOpenAIのRealtime APIをLangchainとLiveKitを使って実装する方法を紹介しました。
技術選定のポイントとしては:

  • WebRTCによるエコーキャンセレーション問題の解決
  • Langchainによる開発効率の向上
  • LiveKitによる安定した音声通信の実現

これらの組み合わせにより、予期せぬ課金や音声品質の問題を避けながら、快適な音声対話システムを構築することができます。
実装は少し複雑に見えるかもしれませんが、各コンポーネントの役割を理解すれば、それほど難しくありません。LiveKitが多くの複雑な処理を担ってくれているおかげで、私たちは音声対話の本質的な部分に集中することができます。
また、この構成はOpenAI Realtime APIの今後の発展にも対応しやすいものになっています。将来的にマルチモーダル対応が実装された際も、WebRTCを使用していることで、映像の取り扱いもスムーズに行えるでしょう。
みなさんも是非、この記事を参考に音声対話システムの実装にチャレンジしてみてください。実装中に困ったことがあれば、コメント欄でディスカッションできれば嬉しいです。

Discussion