LLMのAPIを利用して動画の解析を実施する

概要
LLMのAPIを組み合わせて動画ファイルの解析を実施する処理を作る。
とりあえずは以下が実行出来れば良い。
- 動画の概要文章の取得
- 動画の文字起こしを取得
- 動画の再生時間を取得
これらの処理を書いてみてどんなユースケースで利用出来るか考えてみる。

Gemini APIの利用準備
Gemini APIは動画ファイルを渡す事が出来るので、まずはこれをベースに作っていく。
以下を参考に事前準備を行う。

動画概要の取得
これは比較的簡単に実現出来る。
以下のようなクラスを用意して動画の解析結果をJSONで得られるようにすればOK。

動画内の文字起こしの取得
Gemini APIだと出力トークン数の問題で長い動画の文字起こしが出来ないので Google Cloud Speech-to-Text
を利用してみる。
まずはAPIを有効にしていく。

チュートリアルの通りに進めてみる
以下にちょうど良いチュートリアルがあってのでこの通りに進めてみる。

専用ライブラリのインストール
以下を実行する。
rye add google-cloud-speech

他の文字起こしAPIでも同じインターフェースで利用出来るように以下のような Protocol
を使ったインターフェース用のクラスを作成。
from typing import Protocol, TypedDict
class CreateVideoTranscriptDto(TypedDict):
video_url: str
class CreateVideoTranscriptResult(TypedDict):
transcript: str
class VideoTranscriptRepositoryInterface(Protocol):
async def create_video_transcript(
self, dto: CreateVideoTranscriptDto
) -> CreateVideoTranscriptResult: ...
以下が実装。動画ファイルはGCSにアップロードしてある前提。最初に動画ファイルから音声を分離してGCSにアップロード、その後GCS内の音声ファイルを指定してSpeech-to-Text APIを利用して文字起こしを取得。
import os
import json
import base64
import tempfile
import ffmpeg
import time
from pathlib import Path
# TODO: なぜかmypyの型エラーになるので type: ignore で回避している
from google.cloud import storage, speech # type: ignore
from google.oauth2 import service_account
from google.api_core import exceptions as google_exceptions
from domain.repository.video_transcript_repository_interface import (
CreateVideoTranscriptDto,
CreateVideoTranscriptResult,
VideoTranscriptRepositoryInterface,
)
from log.logger import AppLogger, InfoLogExtra
from infrastructure.google.parse_gcs_path import parse_gcs_path
class GoogleVideoTranscriptRepository(VideoTranscriptRepositoryInterface):
def __init__(self) -> None:
app_logger = AppLogger()
self.logger = app_logger.logger
encoded_service_account_key = os.getenv("GOOGLE_CLOUD_CREDENTIALS")
if encoded_service_account_key is None:
raise Exception("GOOGLE_CLOUD_CREDENTIALS is not set.")
decoded_service_account_key = base64.b64decode(
encoded_service_account_key
).decode("utf-8")
service_account_info = json.loads(decoded_service_account_key)
self.google_cloud_storage_client = storage.Client.from_service_account_info(
service_account_info
)
self.credentials = service_account.Credentials.from_service_account_info(
# service_account_info の型チェックは難しいので type: ignore で回避している
service_account_info # type: ignore
)
self.speech_client = speech.SpeechClient(credentials=self.credentials)
def extract_audio_and_transcribe(self, video_uri: str) -> str:
gcs_path_components = parse_gcs_path(video_uri)
bucket = self.google_cloud_storage_client.bucket(
gcs_path_components["bucket_name"]
)
with tempfile.NamedTemporaryFile(
suffix=".mov"
) as video_temp, tempfile.NamedTemporaryFile(suffix=".wav") as audio_temp:
file_path = video_uri.replace(
f"gs://{gcs_path_components['bucket_name']}/", ""
)
blob = bucket.blob(file_path)
blob.download_to_filename(video_temp.name)
self.logger.info(
"GoogleVideoTranscriptRepository.extract_audio_and_transcribe.DownloadedVideo",
extra=InfoLogExtra(
info_message=f"Downloaded {file_path} to {video_temp.name}",
),
)
try:
(
ffmpeg.input(video_temp.name)
.output(audio_temp.name, acodec="pcm_s16le", ac=1, ar="16000")
.overwrite_output()
.run(capture_stdout=True, capture_stderr=True)
)
except ffmpeg.Error as e:
self.logger.error(
"GoogleVideoTranscriptRepository.extract_audio_and_transcribe.FFmpegError",
extra=InfoLogExtra(
info_message=f"FFmpeg error: {str(e)}",
),
)
raise
self.logger.info(
"GoogleVideoTranscriptRepository.extract_audio_and_transcribe.ExtractedAudio",
extra=InfoLogExtra(
info_message=f"Extracted audio to {audio_temp.name}",
),
)
audio_file_path = Path(file_path)
audio_file_name = audio_file_path.stem + ".wav"
audio_file_path = audio_file_path.parent / audio_file_name
audio_blob = bucket.blob(str(audio_file_path))
audio_blob.upload_from_filename(audio_temp.name)
self.logger.info(
"GoogleVideoTranscriptRepository.extract_audio_and_transcribe.UploadedExtractedAudio",
extra=InfoLogExtra(
info_message=f"Uploaded extracted audio to gs://{gcs_path_components['bucket_name']}/{audio_file_path}",
),
)
# Transcribe the audio
audio = speech.RecognitionAudio(
uri=f"gs://{gcs_path_components['bucket_name']}/{audio_file_path}"
)
config = speech.RecognitionConfig(
encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,
sample_rate_hertz=16000,
language_code="ja-JP",
)
try:
operation = self.speech_client.long_running_recognize(
config=config, audio=audio
)
# Increase timeout to 10 minutes (600 seconds)
timeout = 600
start_time = time.time()
while True:
if operation.done(): # type: ignore
break
if time.time() - start_time > timeout:
raise TimeoutError(
f"Transcription operation timed out after {timeout} seconds"
)
time.sleep(10) # Poll every 10 seconds
response = operation.result() # type: ignore
transcript = ""
for result in response.results:
transcript += result.alternatives[0].transcript + " "
self.logger.info(
"GoogleVideoTranscriptRepository.extract_audio_and_transcribe.TranscriptionComplete",
extra=InfoLogExtra(
info_message=f"Transcription completed for {file_path}",
),
)
return transcript.strip()
except (google_exceptions.GoogleAPICallError, TimeoutError) as e:
self.logger.error(
"GoogleVideoTranscriptRepository.extract_audio_and_transcribe.TranscriptionError",
extra=InfoLogExtra(
info_message=f"Transcription error: {str(e)}",
),
)
raise
async def create_video_transcript(
self, dto: CreateVideoTranscriptDto
) -> CreateVideoTranscriptResult:
try:
transcript = self.extract_audio_and_transcribe(dto["video_url"])
except TimeoutError as e:
self.logger.error(
"GoogleVideoTranscriptRepository.create_video_transcript.TimeoutError",
extra=InfoLogExtra(
info_message=f"Transcription timed out: {str(e)}",
),
)
raise
except Exception as e:
self.logger.error(
"GoogleVideoTranscriptRepository.create_video_transcript.Error",
extra=InfoLogExtra(
info_message=f"Error during create video transcript: {str(e)}",
),
)
raise
result: CreateVideoTranscriptResult = {
"transcript": transcript,
}
return result
以下のように文字起こしが取得できる。
{'transcript': '猫ちゃん 猫ちゃん こっちおいで こっちおいで'}
今の時点では話者分離も出来ずに精度も怪しい箇所があるのでここから改善していく。

精度に関しては動画内の音声が原因の可能性もある。
最初は割とはっきりとした言葉をしゃべっている動画を元にテストしたほうが良いかも。

動画の再生時間を取得する
LLMは使わないで ffmpeg
のライブラリを使うのが早そう。extract_video_duration
を実装したら簡単に取得できた。
import os
import json
import base64
import ffmpeg
import tempfile
import math
# TODO: なぜかmypyの型エラーになるので type: ignore で回避している
from google.cloud import storage # type: ignore
from google.oauth2 import service_account
import vertexai
from vertexai.generative_models import GenerativeModel, Part
from domain.repository.video_repository_interface import (
VideoRepositoryInterface,
AnalysisVideoDto,
AnalysisVideoResult,
)
from log.logger import AppLogger, InfoLogExtra
from infrastructure.google.parse_gcs_path import parse_gcs_path
class GeminiVideoRepository(VideoRepositoryInterface):
def __init__(self) -> None:
app_logger = AppLogger()
self.logger = app_logger.logger
encoded_service_account_key = os.getenv("GOOGLE_CLOUD_CREDENTIALS")
if encoded_service_account_key is None:
raise Exception("GOOGLE_CLOUD_CREDENTIALS is not set.")
decoded_service_account_key = base64.b64decode(
encoded_service_account_key
).decode("utf-8")
service_account_info = json.loads(decoded_service_account_key)
self.google_cloud_storage_client = storage.Client.from_service_account_info(
service_account_info
)
self.credentials = service_account.Credentials.from_service_account_info(
# service_account_info の型チェックは難しいので type: ignore で回避している
service_account_info # type: ignore
)
vertexai.init(
project=os.getenv("GOOGLE_CLOUD_PROJECT_ID"),
location=os.getenv("GOOGLE_CLOUD_REGION"),
credentials=self.credentials,
)
def extract_video_duration(self, video_uri: str) -> float:
gcs_path_components = parse_gcs_path(video_uri)
bucket = self.google_cloud_storage_client.bucket(
gcs_path_components["bucket_name"]
)
file_path = video_uri.replace(f"gs://{gcs_path_components['bucket_name']}/", "")
blob = bucket.blob(file_path)
with tempfile.NamedTemporaryFile(
suffix=f".{gcs_path_components['file_extension']}"
) as temp_file:
blob.download_to_filename(temp_file.name)
self.logger.info(
"GeminiVideoRepository.extract_video_duration.DownloadedVideo",
extra=InfoLogExtra(
info_message=f"Downloaded {file_path} to {temp_file.name}",
),
)
try:
probe = ffmpeg.probe(temp_file.name)
video_info = next(
s for s in probe["streams"] if s["codec_type"] == "video"
)
duration = float(video_info["duration"])
self.logger.info(
"GeminiVideoRepository.extract_video_duration.Success",
extra=InfoLogExtra(
info_message=f"Successfully extracted video duration: {duration} seconds"
),
)
return duration
except ffmpeg.Error as e:
self.logger.error(
"GeminiVideoRepository.extract_video_duration.FFmpegError",
extra=InfoLogExtra(info_message=f"FFmpeg error: {str(e)}"),
)
raise
except Exception as e:
self.logger.error(
"GeminiVideoRepository.extract_video_duration.Error",
extra=InfoLogExtra(
info_message=f"Error retrieving video duration: {str(e)}"
),
)
raise
async def video_analysis(self, dto: AnalysisVideoDto) -> AnalysisVideoResult:
model = GenerativeModel(
"gemini-1.5-flash-001",
)
video = Part.from_uri(
mime_type="video/quicktime",
uri=dto["video_url"],
)
contents = [
video,
"""
# Instruction
- 動画の内容を確認して要約の作成をお願いします。
- 動画の内容を文字起こし作成をお願いします。
# 制約条件
- 以下のJSON形式で返すようにお願いします。
- {"summary": "動画の要約文章をここに設定"}
- "summary" には動画の要約文章を設定します。
- ハルシネーションを起こさないでください。
""",
]
generation_config = {
"response_mime_type": "application/json",
"temperature": 0.0,
"top_k": 1,
"top_p": 0.9,
}
response = await model.generate_content_async(
contents,
generation_config=generation_config,
)
response_content = response.text
try:
result: AnalysisVideoResult = json.loads(response_content)
duration = self.extract_video_duration(dto["video_url"])
result["duration_in_seconds"] = math.floor(duration)
except json.JSONDecodeError:
raise Exception(f"JSON decode error: {response_content}")
except Exception as e:
self.logger.error(
"GeminiVideoRepository.video_analysis.Error",
extra=InfoLogExtra(
info_message=f"Error during video analysis: {str(e)}"
),
)
raise
return result

from google.cloud import storage
の部分で何故かmypyのエラーが出る。
rye run mypy --strict
src/infrastructure/repository/gemini/gemini_video_repository.py:7: error: Module "google.cloud" has no attribute "storage" [attr-defined]
src/infrastructure/repository/google/google_video_transcript_repository.py:8: error: Module "google.cloud" has no attribute "storage" [attr-defined]
Found 2 errors in 2 files (checked 42 source files)
make: *** [typecheck] Error 1
zsh: exit 2 make typecheck
もちろん正常動作しているので何故これが出るのが不明。
google.cloud
の型ヒントサポートが不十分だからだろうか?