Open10

LLMのAPIを利用して動画の解析を実施する

keitaknkeitakn

概要

LLMのAPIを組み合わせて動画ファイルの解析を実施する処理を作る。

とりあえずは以下が実行出来れば良い。

  • 動画の概要文章の取得
  • 動画の文字起こしを取得
  • 動画の再生時間を取得

これらの処理を書いてみてどんなユースケースで利用出来るか考えてみる。

keitaknkeitakn

動画内の文字起こしの取得

Gemini APIだと出力トークン数の問題で長い動画の文字起こしが出来ないので Google Cloud Speech-to-Text を利用してみる。

まずはAPIを有効にしていく。

keitaknkeitakn

チュートリアルの通りに進めてみる

以下にちょうど良いチュートリアルがあってのでこの通りに進めてみる。

https://cloud.google.com/speech-to-text/docs/video-model

keitaknkeitakn

専用ライブラリのインストール

以下を実行する。

rye add google-cloud-speech
keitaknkeitakn

他の文字起こしAPIでも同じインターフェースで利用出来るように以下のような Protocol を使ったインターフェース用のクラスを作成。

from typing import Protocol, TypedDict


class CreateVideoTranscriptDto(TypedDict):
    video_url: str


class CreateVideoTranscriptResult(TypedDict):
    transcript: str


class VideoTranscriptRepositoryInterface(Protocol):
    async def create_video_transcript(
        self, dto: CreateVideoTranscriptDto
    ) -> CreateVideoTranscriptResult: ...

以下が実装。動画ファイルはGCSにアップロードしてある前提。最初に動画ファイルから音声を分離してGCSにアップロード、その後GCS内の音声ファイルを指定してSpeech-to-Text APIを利用して文字起こしを取得。

import os
import json
import base64
import tempfile
import ffmpeg
import time
from pathlib import Path

# TODO: なぜかmypyの型エラーになるので type: ignore で回避している
from google.cloud import storage, speech  # type: ignore
from google.oauth2 import service_account
from google.api_core import exceptions as google_exceptions
from domain.repository.video_transcript_repository_interface import (
    CreateVideoTranscriptDto,
    CreateVideoTranscriptResult,
    VideoTranscriptRepositoryInterface,
)
from log.logger import AppLogger, InfoLogExtra
from infrastructure.google.parse_gcs_path import parse_gcs_path


class GoogleVideoTranscriptRepository(VideoTranscriptRepositoryInterface):
    def __init__(self) -> None:
        app_logger = AppLogger()
        self.logger = app_logger.logger

        encoded_service_account_key = os.getenv("GOOGLE_CLOUD_CREDENTIALS")
        if encoded_service_account_key is None:
            raise Exception("GOOGLE_CLOUD_CREDENTIALS is not set.")

        decoded_service_account_key = base64.b64decode(
            encoded_service_account_key
        ).decode("utf-8")
        service_account_info = json.loads(decoded_service_account_key)

        self.google_cloud_storage_client = storage.Client.from_service_account_info(
            service_account_info
        )

        self.credentials = service_account.Credentials.from_service_account_info(
            # service_account_info の型チェックは難しいので type: ignore で回避している
            service_account_info  # type: ignore
        )

        self.speech_client = speech.SpeechClient(credentials=self.credentials)

    def extract_audio_and_transcribe(self, video_uri: str) -> str:
        gcs_path_components = parse_gcs_path(video_uri)
        bucket = self.google_cloud_storage_client.bucket(
            gcs_path_components["bucket_name"]
        )

        with tempfile.NamedTemporaryFile(
            suffix=".mov"
        ) as video_temp, tempfile.NamedTemporaryFile(suffix=".wav") as audio_temp:
            file_path = video_uri.replace(
                f"gs://{gcs_path_components['bucket_name']}/", ""
            )
            blob = bucket.blob(file_path)
            blob.download_to_filename(video_temp.name)

            self.logger.info(
                "GoogleVideoTranscriptRepository.extract_audio_and_transcribe.DownloadedVideo",
                extra=InfoLogExtra(
                    info_message=f"Downloaded {file_path} to {video_temp.name}",
                ),
            )

            try:
                (
                    ffmpeg.input(video_temp.name)
                    .output(audio_temp.name, acodec="pcm_s16le", ac=1, ar="16000")
                    .overwrite_output()
                    .run(capture_stdout=True, capture_stderr=True)
                )
            except ffmpeg.Error as e:
                self.logger.error(
                    "GoogleVideoTranscriptRepository.extract_audio_and_transcribe.FFmpegError",
                    extra=InfoLogExtra(
                        info_message=f"FFmpeg error: {str(e)}",
                    ),
                )
                raise

            self.logger.info(
                "GoogleVideoTranscriptRepository.extract_audio_and_transcribe.ExtractedAudio",
                extra=InfoLogExtra(
                    info_message=f"Extracted audio to {audio_temp.name}",
                ),
            )

            audio_file_path = Path(file_path)
            audio_file_name = audio_file_path.stem + ".wav"
            audio_file_path = audio_file_path.parent / audio_file_name

            audio_blob = bucket.blob(str(audio_file_path))
            audio_blob.upload_from_filename(audio_temp.name)

            self.logger.info(
                "GoogleVideoTranscriptRepository.extract_audio_and_transcribe.UploadedExtractedAudio",
                extra=InfoLogExtra(
                    info_message=f"Uploaded extracted audio to gs://{gcs_path_components['bucket_name']}/{audio_file_path}",
                ),
            )

            # Transcribe the audio
            audio = speech.RecognitionAudio(
                uri=f"gs://{gcs_path_components['bucket_name']}/{audio_file_path}"
            )
            config = speech.RecognitionConfig(
                encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,
                sample_rate_hertz=16000,
                language_code="ja-JP",
            )

            try:
                operation = self.speech_client.long_running_recognize(
                    config=config, audio=audio
                )

                # Increase timeout to 10 minutes (600 seconds)
                timeout = 600
                start_time = time.time()

                while True:
                    if operation.done():  # type: ignore
                        break
                    if time.time() - start_time > timeout:
                        raise TimeoutError(
                            f"Transcription operation timed out after {timeout} seconds"
                        )
                    time.sleep(10)  # Poll every 10 seconds

                response = operation.result()  # type: ignore

                transcript = ""
                for result in response.results:
                    transcript += result.alternatives[0].transcript + " "

                self.logger.info(
                    "GoogleVideoTranscriptRepository.extract_audio_and_transcribe.TranscriptionComplete",
                    extra=InfoLogExtra(
                        info_message=f"Transcription completed for {file_path}",
                    ),
                )

                return transcript.strip()

            except (google_exceptions.GoogleAPICallError, TimeoutError) as e:
                self.logger.error(
                    "GoogleVideoTranscriptRepository.extract_audio_and_transcribe.TranscriptionError",
                    extra=InfoLogExtra(
                        info_message=f"Transcription error: {str(e)}",
                    ),
                )
                raise

    async def create_video_transcript(
        self, dto: CreateVideoTranscriptDto
    ) -> CreateVideoTranscriptResult:
        try:
            transcript = self.extract_audio_and_transcribe(dto["video_url"])
        except TimeoutError as e:
            self.logger.error(
                "GoogleVideoTranscriptRepository.create_video_transcript.TimeoutError",
                extra=InfoLogExtra(
                    info_message=f"Transcription timed out: {str(e)}",
                ),
            )
            raise

        except Exception as e:
            self.logger.error(
                "GoogleVideoTranscriptRepository.create_video_transcript.Error",
                extra=InfoLogExtra(
                    info_message=f"Error during create video transcript: {str(e)}",
                ),
            )
            raise

        result: CreateVideoTranscriptResult = {
            "transcript": transcript,
        }

        return result

以下のように文字起こしが取得できる。

{'transcript': '猫ちゃん 猫ちゃん こっちおいで こっちおいで'}

今の時点では話者分離も出来ずに精度も怪しい箇所があるのでここから改善していく。

keitaknkeitakn

精度に関しては動画内の音声が原因の可能性もある。
最初は割とはっきりとした言葉をしゃべっている動画を元にテストしたほうが良いかも。

keitaknkeitakn

動画の再生時間を取得する

LLMは使わないで ffmpeg のライブラリを使うのが早そう。extract_video_duration を実装したら簡単に取得できた。

https://github.com/nekochans/ai-cat-api/pull/149

import os
import json
import base64
import ffmpeg
import tempfile
import math

# TODO: なぜかmypyの型エラーになるので type: ignore で回避している
from google.cloud import storage  # type: ignore
from google.oauth2 import service_account
import vertexai
from vertexai.generative_models import GenerativeModel, Part
from domain.repository.video_repository_interface import (
    VideoRepositoryInterface,
    AnalysisVideoDto,
    AnalysisVideoResult,
)
from log.logger import AppLogger, InfoLogExtra
from infrastructure.google.parse_gcs_path import parse_gcs_path


class GeminiVideoRepository(VideoRepositoryInterface):
    def __init__(self) -> None:
        app_logger = AppLogger()
        self.logger = app_logger.logger

        encoded_service_account_key = os.getenv("GOOGLE_CLOUD_CREDENTIALS")
        if encoded_service_account_key is None:
            raise Exception("GOOGLE_CLOUD_CREDENTIALS is not set.")

        decoded_service_account_key = base64.b64decode(
            encoded_service_account_key
        ).decode("utf-8")
        service_account_info = json.loads(decoded_service_account_key)

        self.google_cloud_storage_client = storage.Client.from_service_account_info(
            service_account_info
        )

        self.credentials = service_account.Credentials.from_service_account_info(
            # service_account_info の型チェックは難しいので type: ignore で回避している
            service_account_info  # type: ignore
        )

        vertexai.init(
            project=os.getenv("GOOGLE_CLOUD_PROJECT_ID"),
            location=os.getenv("GOOGLE_CLOUD_REGION"),
            credentials=self.credentials,
        )

    def extract_video_duration(self, video_uri: str) -> float:
        gcs_path_components = parse_gcs_path(video_uri)
        bucket = self.google_cloud_storage_client.bucket(
            gcs_path_components["bucket_name"]
        )

        file_path = video_uri.replace(f"gs://{gcs_path_components['bucket_name']}/", "")
        blob = bucket.blob(file_path)

        with tempfile.NamedTemporaryFile(
            suffix=f".{gcs_path_components['file_extension']}"
        ) as temp_file:
            blob.download_to_filename(temp_file.name)
            self.logger.info(
                "GeminiVideoRepository.extract_video_duration.DownloadedVideo",
                extra=InfoLogExtra(
                    info_message=f"Downloaded {file_path} to {temp_file.name}",
                ),
            )
            try:
                probe = ffmpeg.probe(temp_file.name)
                video_info = next(
                    s for s in probe["streams"] if s["codec_type"] == "video"
                )
                duration = float(video_info["duration"])
                self.logger.info(
                    "GeminiVideoRepository.extract_video_duration.Success",
                    extra=InfoLogExtra(
                        info_message=f"Successfully extracted video duration: {duration} seconds"
                    ),
                )
                return duration
            except ffmpeg.Error as e:
                self.logger.error(
                    "GeminiVideoRepository.extract_video_duration.FFmpegError",
                    extra=InfoLogExtra(info_message=f"FFmpeg error: {str(e)}"),
                )
                raise
            except Exception as e:
                self.logger.error(
                    "GeminiVideoRepository.extract_video_duration.Error",
                    extra=InfoLogExtra(
                        info_message=f"Error retrieving video duration: {str(e)}"
                    ),
                )
                raise

    async def video_analysis(self, dto: AnalysisVideoDto) -> AnalysisVideoResult:
        model = GenerativeModel(
            "gemini-1.5-flash-001",
        )

        video = Part.from_uri(
            mime_type="video/quicktime",
            uri=dto["video_url"],
        )

        contents = [
            video,
            """
            # Instruction
            - 動画の内容を確認して要約の作成をお願いします。
            - 動画の内容を文字起こし作成をお願いします。
            
            # 制約条件
            - 以下のJSON形式で返すようにお願いします。
              - {"summary": "動画の要約文章をここに設定"}
                - "summary" には動画の要約文章を設定します。
            - ハルシネーションを起こさないでください。
            """,
        ]

        generation_config = {
            "response_mime_type": "application/json",
            "temperature": 0.0,
            "top_k": 1,
            "top_p": 0.9,
        }

        response = await model.generate_content_async(
            contents,
            generation_config=generation_config,
        )

        response_content = response.text

        try:
            result: AnalysisVideoResult = json.loads(response_content)
            duration = self.extract_video_duration(dto["video_url"])
            result["duration_in_seconds"] = math.floor(duration)
        except json.JSONDecodeError:
            raise Exception(f"JSON decode error: {response_content}")
        except Exception as e:
            self.logger.error(
                "GeminiVideoRepository.video_analysis.Error",
                extra=InfoLogExtra(
                    info_message=f"Error during video analysis: {str(e)}"
                ),
            )
            raise

        return result
keitaknkeitakn

from google.cloud import storage の部分で何故かmypyのエラーが出る。

rye run mypy --strict
src/infrastructure/repository/gemini/gemini_video_repository.py:7: error: Module "google.cloud" has no attribute "storage"  [attr-defined]
src/infrastructure/repository/google/google_video_transcript_repository.py:8: error: Module "google.cloud" has no attribute "storage"  [attr-defined]
Found 2 errors in 2 files (checked 42 source files)
make: *** [typecheck] Error 1
zsh: exit 2     make typecheck

もちろん正常動作しているので何故これが出るのが不明。
google.cloud の型ヒントサポートが不十分だからだろうか?