Zenn
Open4

Gemini 2.0 FlashのマルチモーダルライブAPIを試す

kun432kun432

Gemini 2.0 Flashのサンプルは色々用意されているがマルチモーダルライブAPIに関するものはざっと以下あたりだと思う。

Google CloudのVertex AIを使用するもの。

マルチモーダルライブAPIを使ったフロントエンド・バックエンド構成のサンプル
https://github.com/GoogleCloudPlatform/generative-ai/tree/main/gemini/multimodal-live-api/websocket-demo-app

マルチモーダルライブAPIを使ったリアルタイムなRAG
https://github.com/GoogleCloudPlatform/generative-ai/tree/main/gemini/multimodal-live-api/real_time_rag_retail_gemini_2_0.ipynb

マルチモーダル+Google検索を使ったマーケティングでのユースケース例
https://github.com/GoogleCloudPlatform/generative-ai/tree/main/gemini/use-cases/marketing/creating_marketing_assets_gemini_2_0.ipynb

Gemini Cookbook。こちらはGoogle AIを使用。

マルチモーダルライブAPIの基本
https://github.com/google-gemini/cookbook/tree/main/gemini-2/live_api_starter.ipynb

マルチモーダルライブAPIでツールを使う
https://github.com/google-gemini/cookbook/tree/main/gemini-2/live_api_tool_use.ipynb

こちらもマルチモーダルライブAPIでツールの使用だけど、複数のツールを使ってGoogle MAPの操作を行うといったものに見える
https://github.com/google-gemini/cookbook/tree/main/gemini-2/plotting_and_mapping.ipynb

Vertex AI/Google AI、どちらの場合もGen AI SDKを使っているようで、基本的には同じ書き方でいけそう(認証方法だけ異なる感じ)

とりあえず適当に試してみようと思う。

kun432kun432

Gemini Cookbook: Multimodal Live API - Quickstart

マルチモーダルライブAPIをやっていく。

https://github.com/google-gemini/cookbook/tree/main/gemini-2/live_api_starter.ipynb

マルチモーダルライブAPIについてはこちら。

https://ai.google.dev/api/multimodal-live?hl=ja

Multimodal Live API を使用すると、テキスト、音声、動画の入力と音声とテキストの出力を使用して、低レイテンシの双方向インタラクションを実現できます。これにより、モデルをいつでも中断できる、自然で人間のような音声会話が実現します。モデルの動画理解機能により、コミュニケーション モダリティが拡張され、カメラ入力やスクリーンキャストを共有して質問できるようになります。

OpenAIでいうところのRealtime APIと同じような位置づけに思える。

Colaboratoryで。

Gen AI SDKのパッケージをインストール。

!pip install -U -q google-genai

Google APIキーをセット

from google.colab import userdata
import os

os.environ['GOOGLE_API_KEY'] = userdata.get('GOOGLE_API_KEY')

text-to-text

まずtext-to-textのシンプルな例。

from google import genai
from google import genai

# notebookの場合は必要
import nest_asyncio
nest_asyncio.apply()

# クライアント初期化。`v1alpha`の指定が必要。
client = genai.Client(http_options={'api_version': 'v1alpha'})

# テキストでのレスポンスを指定
config={
    "generation_config": {
        "response_modalities": ["TEXT"]
    }
}

async def main():
    # 非同期クライアントを使って`live.connect()`で接続
    async with client.aio.live.connect(
        model="gemini-2.0-flash-exp",
        config=config
    ) as session:
        message = "こんにちは!あなたのお名前は?"
        print("User:", message, "\n")
        await session.send(message, end_of_turn=True)

        # テキストレスポンスの場合、モデルのターンが終わるとループから抜ける
        turn = session.receive()
        print("Assitant:", end="")
        async for chunk in turn:
            if chunk.text is not None:
                print(chunk.text)

asyncio.run(main())
出力
User: こんにちは!あなたのお名前は? 

Assitant:こんにちは
!私はGoogleによってトレーニングされた、大規模言語モデルのGeminiです。

text-to-audio

音声での出力。WAVファイルに保存する場合。

from google import genai
import asyncio
import contextlib
import wave
from IPython.display import Audio

import nest_asyncio
nest_asyncio.apply()

client = genai.Client(http_options={'api_version': 'v1alpha'})

# 音声でのレスポンスを指定
config={
    "generation_config": {
        "response_modalities": ["AUDIO"]
    }
}


# 非同期でenumerateする
async def async_enumerate(it):
  n = 0
  async for item in it:
    yield n, item
    n +=1


# WAVファイルへの出力を行う関数
@contextlib.contextmanager
def wave_file(filename, channels=1, rate=24000, sample_width=2):
    with wave.open(filename, "wb") as wf:
        wf.setnchannels(channels)
        wf.setsampwidth(sample_width)
        wf.setframerate(rate)
        yield wf

async def main():
    async with client.aio.live.connect(
        model="gemini-2.0-flash-exp",
        config=config
    ) as session:
        file_name = 'audio.wav'
        with wave_file(file_name) as wav:
            message = "こんにちは!あなたのお名前は?"
            print("> ", message, "\n")
            await session.send(message, end_of_turn=True)

            turn = session.receive()
            async for n,response in async_enumerate(turn):
                if response.data is not None:
                    wav.writeframes(response.data)

                if n==0:
                    print(response.server_content.model_turn.parts[0].inline_data.mime_type)
                print('.', end='')

        display(Audio(file_name, autoplay=True))

asyncio.run(main())

こんな感じで自動再生される

実際に生成されたものはこちら

https://audio.com/kun432/audio/realtime-live-api-demo-1

非同期タスク化

マルチモーダルライブAPIの本来の使い方は、シーケンシャルなやりとりではなく、リアルタイムかつ割り込み可能な形で実装する必要がある。そのためには「送信」「受信」等を別々の非同期タスクとして実装する必要がある。

サンプルは、「送信」「受信」を別々の非同期タスクとして実装してあるのだが、Colaboratoryの制約によりリアルタイムではなくシーケンシャルなものとなっている。リアルタイムなものは別のサンプルにある様子。

なお、ロギングは標準のloggingモジュールを使ってあったが、Loguruに変えた。

!pip install loguru
from google import genai
import asyncio
import contextlib
import wave
from IPython.display import Audio
from loguru import logger
import sys

# notebookの場合は必要
import nest_asyncio
nest_asyncio.apply()

# DEBUGにすればより詳細に出力される
logger.remove()
logger.add(sys.stderr, level="INFO")

# 非同期でenumerateする
async def async_enumerate(it):
  n = 0
  async for item in it:
    yield n, item
    n +=1


# WAVファイルへの出力を行う関数
@contextlib.contextmanager
def wave_file(filename, channels=1, rate=24000, sample_width=2):
    with wave.open(filename, "wb") as wf:
        wf.setnchannels(channels)
        wf.setsampwidth(sample_width)
        wf.setframerate(rate)
        yield wf

class AudioLoop:
    def __init__(self, client, turns=None, config=None):
        self.client = client
        self.session = None
        self.index = 0
        self.turns = turns
        if config is None:
            config={
                "generation_config": {
                    "response_modalities": ["AUDIO"]
                }
            }
        self.config = config

    async def run(self):
        """
        メインのループ処理を行うメソッド
        - Live API に接続し`websocket` を開く
        - 最初の `setup` メソッドを呼び出す
        - メインループに入って、`send`が`False`を返すまで`send`と`recv`を交互に繰り返す
        """
        logger.debug('connect')
        async with self.client.aio.live.connect(
            model="gemini-2.0-flash-exp",
            config=self.config
        ) as session:
            self.session = session

            async for sent in self.send():
                # 実際には、sendとrecvは別々のタスクになる
                await self.recv()

    async def _iter(self):
        """
        非同期でユーザーからの入力を取得してループ内で処理するメソッド
        - turnsが与えられていたらそれを順に処理する
        - turnがなければユーザからの入力を受け取る
        """
        if self.turns:
            for text in self.turns:
                print("message >", text)
                yield text
        else:
            print("Type 'q' to quit")
            while True:
                text = await asyncio.to_thread(input, "message > ")

                # `q`が入力されたら終了
                if text.lower() == 'q':
                    break

                yield text

    async def send(self):
        """
        入力テキストをAPIに送信するメソッド
        - ユーザの入力を`client_content`メッセージにラップしてモデルに送信
            - `client_content`メッセージは`BidiGenerateContentClientContent`のインスタンス
        - ユーザーが`q`を入力したら`False`を返す
        """
        async for text in self._iter():
            logger.debug('send')

            # モデルにメッセージを送信
            await self.session.send(text, end_of_turn=True)
            logger.debug('sent')
            yield text

    async def recv(self):
        """
        APIからオーディオを受け取って再生するメソッド
        - オーディオチャンクをループで収集し`.wav` ファイルに書き込む
        - モデルが`turn_complete`メソッドを送るとループから抜け出し、音声を再生
        注: この例ではシンプルに、「すべての」オーディオを受け取ってから再生している
        """
        # WAVファイルを新規作成
        file_name = f"audio_{self.index}.wav"
        with wave_file(file_name) as wav:
            self.index += 1

            logger.debug('receive')

            # ソケットからチャンクを読み込み
            turn = self.session.receive()
            async for n, response in async_enumerate(turn):
                logger.debug(f'got chunk: {str(response)}')

                if response.data is None:
                    logger.debug(f'Unhandled server message! - {response}')
                else:
                    wav.writeframes(response.data)
                    if n == 0:
                        print(response.server_content.model_turn.parts[0].inline_data.mime_type)
                    print('.', end='')

            print('\n')

        display(Audio(file_name, autoplay=True))
        await asyncio.sleep(2)

実行

async def main():
    client = genai.Client(http_options={'api_version': 'v1alpha'})
    await AudioLoop(client, ['こんにちは', "あなたの名前を教えて。"]).run()

asyncio.run(main())

どんな感じで動作するかは、実際に試してみるのが良い。

kun432kun432

ところでRealtime Live APIのドキュメントのここ、少し気になった。

https://ai.google.dev/api/multimodal-live?hl=ja

Multimodal Live API は、サーバー間通信用に設計されています。
ウェブアプリとモバイルアプリの場合は、Daily のパートナーによる統合を使用することをおすすめします。

このDailyというやつ、別のリアルタイム音声処理フレームワークでも見かけた。

https://www.daily.co/

音声・動画向けのWebRTCプラットフォームという感じっぽい。

ログインするとコメントできます