💬

PythonとOpenAIのwhisperを使って、無料で話者分離付き文字起こしシステムを構築してNotionに自動アップロードする方法

に公開

はじめに

ブルーグラフィー株式会社の代表の伊藤景司です。
会議の録音や講演の記録、インタビューの書き起こしなど、文字起こしの需要は多くの場面で存在します。特に複数の話者がいる場合、「誰が何を言ったのか」を区別できると非常に便利ですよね。この記事では、Python と OepnAIのWhisperを使って「誰が話しているのか」まで識別できる高度な文字起こしシステムを構築し、さらに結果を自動的にNotionにアップロードする方法をわかりやすく解説します。

特徴

  • オンライン会議やYoutubeの動画など、さまざまなツールで動画を見たり使ったりすると思いますが、このシステムはどんなツールであろうとも、mp4の動画データさえ手元に用意できれば使えるようになっていて、動画元のツールを選ばないところが特徴です。
  • ある程度のスペックのMacさえあれば、無料で構築できるところもポイントです。
  • オンライン会議の場合は、録画機能を使うか、OBSなどのツールを使ってmp4データを生成し、指定したフォルダーにそのファイルが保存されただけで自動で処理が始まるようにしています。

完成イメージ

まずは完成イメージを見てみましょう。通常の文字起こしと話者分離付き文字起こしの違いです:

通常の文字起こし:

こんにちは、本日はお忙しい中お集まりいただきありがとうございます。それでは会議を始めましょう。
まず最初に前回の議事録について確認したいと思います。山田さん、資料の共有をお願いできますか?
はい、画面に表示しています。前回は主に3つの議題について話し合いました。

話者分離付き文字起こし:

[0:00:05] SPEAKER_00: こんにちは、本日はお忙しい中お集まりいただきありがとうございます。それでは会議を始めましょう。
[0:00:12] SPEAKER_00: まず最初に前回の議事録について確認したいと思います。山田さん、資料の共有をお願いできますか?
[0:00:19] SPEAKER_01: はい、画面に表示しています。前回は主に3つの議題について話し合いました。

話者ごとに分離されているため、誰が何を言ったのかが一目瞭然ですね!さらに、このシステムでは文字起こし結果が自動的にNotionデータベースに保存され、どこからでもアクセスできるようになります。

システム全体の流れ

  1. 録音/録画ファイル(MP4など)を指定のフォルダに配置
  2. 自動的に話者分離付き文字起こしを実行
  3. 文字起こし結果をテキストファイルとして保存
  4. テキストファイルの内容をNotionデータベースに自動アップロード
  5. 処理済みファイルを別フォルダに移動

構築する環境

今回は以下の環境で進めていきます:

  • OS: 今回はmacOS前提で記述しています(うまく工夫すれば、Windows、Linux いずれでも可能)
  • Python: バージョン 3.8 以上推奨
  • メモリ: 最低 4GB 以上(8GB 以上推奨)
  • ディスク容量: 最低 2GB の空き容量
  • Notion: Notionアカウントと統合トークン

使用する技術スタック

今回のシステムでは以下の技術・ライブラリを使用します:

  • Whisper: OpenAI が開発した高精度な音声認識モデル
  • Resemblyzer: 話者の声を特徴ベクトルに変換するライブラリ
  • scikit-learn: 機械学習アルゴリズムを提供するライブラリ(話者のクラスタリングに使用)
  • librosa: 音声分析用のライブラリ
  • Bash スクリプト: 文字起こしプロセスを自動化するためのスクリプト
  • Notion API: 文字起こし結果を Notion データベースに保存

設定プロセス

1. Python 仮想環境のセットアップ

まずは terminalを使って、Python の仮想環境を作成しましょう。これにより、プロジェクト固有の依存関係を他のプロジェクトと分離できます。

#スクリプトを保存するディレクトリを作成
mkdir -p ~/Scripts

## 仮想環境を作成するディレクトリも作成
mkdir -p ~/Scripts/venv

# 仮想環境を作成
cd ~/Scripts
python -m venv venv

# 仮想環境を有効化(Mac/Linux)
source venv/bin/activate

2. 必要なパッケージのインストール

仮想環境が有効化されたら、必要なパッケージをインストールしましょう。

# 基本パッケージのインストール
pip install openai-whisper torch numpy

# 話者分離に必要なパッケージのインストール
pip install resemblyzer scikit-learn librosa tqdm packaging

# Notion APIを使用するためのパッケージ
pip install requests

インストールには時間がかかる場合があります。特に torch は大きなパッケージなので、安定したインターネット接続が必要です。

3. Notionの設定

Notionに文字起こし結果をアップロードするには、インテグレーションの設定が必要です:

  1. Notion Developers にアクセス
  2. 「新しいインテグレーション」を作成
  3. インテグレーション名を入力(例: 「文字起こしアップローダー」)
  4. 使用するワークスペースを選択
  5. 作成後、表示される「シークレットキー」をメモ

次に、データベースを作成し、インテグレーションと接続します:

  1. Notionで新しいデータベースを作成(全画面表示)
  2. データベースに最低でも「Name(タイトル)」列を含めてください
  3. データベースの右上「・・・」→「コネクトの追加」から、作成したインテグレーションを追加
  4. データベースのURLからデータベースIDを取得:
  1. スクリプトファイルの作成
    以下の3つのファイルを作成します:
  • transcribe.py: 話者分離付き文字起こしの本体
  • process_recording.sh: 文字起こしを自動化するスクリプト
  • upload_to_notion.py: Notion へのアップロード機能

具体的なコード

ファイル1: transcribe.py(文字起こしの本体)

# transcribe.py
import os
import sys
import time
import whisper
import numpy as np
import torch
import librosa
from resemblyzer import VoiceEncoder, preprocess_wav
from sklearn.cluster import AgglomerativeClustering
from sklearn import __version__ as sklearn_version
from tqdm import tqdm
from datetime import timedelta

def clean_gpu_memory():
    """GPUメモリを解放"""
    if torch.cuda.is_available():
        torch.cuda.empty_cache()

def transcribe_audio(file_path):
    """Whisperによる文字起こし処理"""
    print(f"文字起こしを開始します: {file_path}")
    model = whisper.load_model("medium")
    
    # 言語とオプションを指定
    options = {
        "language": "ja",
        "word_timestamps": True,  # 単語レベルのタイムスタンプを取得
        "verbose": False
    }
    
    result = model.transcribe(file_path, **options)
    clean_gpu_memory()
    
    return result

def split_audio_into_segments(file_path, segment_length=20.0):
    """音声を小さなセグメントに分割してデータを返す"""
    print(f"音声ファイルを {segment_length}秒のセグメントに分割します...")
    
    # 音声データのロード
    try:
        audio, sr = librosa.load(file_path, sr=16000, mono=True)
    except Exception as e:
        print(f"Librosa での読み込みに失敗: {e}")
        # 代替手段
        print("FFmpeg で抽出した WAV を使用して再試行します...")
        import subprocess
        import tempfile
        
        # 一時的なWAVファイルを作成
        temp_wav = tempfile.mktemp(suffix=".wav")
        try:
            cmd = [
                "ffmpeg", "-i", file_path, 
                "-ac", "1", "-ar", "16000", 
                "-hide_banner", "-loglevel", "error",
                temp_wav
            ]
            subprocess.run(cmd, check=True)
            audio, sr = librosa.load(temp_wav, sr=16000, mono=True)
            os.remove(temp_wav)
        except Exception as e2:
            print(f"FFmpeg での抽出にも失敗: {e2}")
            raise RuntimeError("音声データの読み込みに失敗しました")
    
    # 秒数をサンプル数に変換
    segment_samples = int(segment_length * sr)
    
    # セグメントに分割
    segments = []
    for i in range(0, len(audio), segment_samples):
        segment = audio[i:i + segment_samples]
        # 極端に短いセグメントは除外
        if len(segment) > sr:  # 1秒以上のセグメントのみ
            segments.append({
                "audio": segment,
                "start_time": i / sr,
                "end_time": (i + len(segment)) / sr
            })
    
    return segments, sr

def process_segments_for_speaker_embeddings(segments, sr, batch_size=8):
    """セグメントから話者埋め込みを抽出"""
    print("話者特徴量を抽出中...")
    
    # Voice Encoder モデルのロード
    encoder = VoiceEncoder()
    
    # 各セグメントの埋め込みを計算
    all_embeddings = []
    
    # バッチ処理で高速化
    for i in tqdm(range(0, len(segments), batch_size)):
        batch = segments[i:i + batch_size]
        batch_embeddings = []
        
        for segment in batch:
            try:
                # 前処理(必要に応じてリサンプリング)
                preprocessed_wav = preprocess_wav(segment["audio"], source_sr=sr)
                
                # 埋め込みを計算
                embedding = encoder.embed_utterance(preprocessed_wav)
                batch_embeddings.append(embedding)
            except Exception as e:
                print(f"セグメント処理エラー: {e}")
                # エラーの場合はゼロ埋め込みを追加
                batch_embeddings.append(np.zeros(encoder.embed_utterance(np.zeros(sr)).shape))
        
        all_embeddings.extend(batch_embeddings)
        
        # メモリ解放
        if torch.cuda.is_available():
            clean_gpu_memory()
    
    return all_embeddings

def cluster_speakers(embeddings, max_speakers=6):
    """階層型クラスタリングで話者を分類"""
    print(f"最大 {max_speakers} 人の話者を検出します...")
    
    # クラスタリングの準備
    embeddings_array = np.array(embeddings)
    
    # scikit-learnのバージョンを確認
    from packaging import version
    print(f"scikit-learn バージョン: {sklearn_version}")
    
    # バージョンによってパラメータを調整
    if version.parse(sklearn_version) >= version.parse("0.24.0"):
        # 新しいバージョン用のパラメータ設定
        clustering = AgglomerativeClustering(
            n_clusters=min(max_speakers, len(embeddings)),
            metric="cosine",  # 新しいバージョンでは metric と linkage の組み合わせ
            linkage="average"
        )
    else:
        # 古いバージョン用のパラメータ設定
        clustering = AgglomerativeClustering(
            n_clusters=min(max_speakers, len(embeddings)),
            linkage="average"
        )
    
    # クラスタリングの実行
    labels = clustering.fit_predict(embeddings_array)
    
    return labels

def assign_speakers_to_transcript(transcript_result, segments, speaker_labels):
    """文字起こし結果に話者ラベルを付与し、同じ話者の連続した発言をマージ"""
    print("文字起こし結果に話者情報を統合中...")
    
    # 各セグメントの開始・終了時間
    segment_times = [(s["start_time"], s["end_time"]) for s in segments]
    
    # 処理用の一時データ構造
    temp_transcript = []
    
    # 単語単位のタイムスタンプがある場合
    if "segments" in transcript_result:
        for segment in transcript_result["segments"]:
            start_time = segment["start"]
            text = segment["text"].strip()
            
            if not text:
                continue
                
            # このテキストセグメントがどの音声セグメントに属するかを特定
            speaker_idx = None
            for i, (seg_start, seg_end) in enumerate(segment_times):
                if start_time >= seg_start and start_time < seg_end:
                    speaker_idx = i
                    break
            
            # 該当する音声セグメントが見つかった場合
            if speaker_idx is not None and speaker_idx < len(speaker_labels):
                speaker_id = f"SPEAKER_{speaker_labels[speaker_idx]:02d}"
            else:
                # 話者が特定できない場合
                speaker_id = "UNKNOWN"
            
            # 時間情報をフォーマット
            time_str = str(timedelta(seconds=int(start_time))).split('.')[0]
            
            # 情報を一時配列に追加
            temp_transcript.append({
                "timestamp": time_str,
                "speaker": speaker_id,
                "text": text
            })
    
    # 同じ話者の連続した発言をマージ
    merged_transcript = []
    current_speaker = None
    current_entry = None
    
    for entry in temp_transcript:
        if current_speaker != entry["speaker"]:
            # 話者が変わった場合、新しいエントリを作成
            if current_entry:
                merged_transcript.append(current_entry)
            current_speaker = entry["speaker"]
            current_entry = {
                "timestamp": entry["timestamp"],
                "speaker": entry["speaker"],
                "text": entry["text"]
            }
        else:
            # 同じ話者の場合、テキストを連結
            current_entry["text"] += " " + entry["text"]
    
    # 最後のエントリを追加
    if current_entry:
        merged_transcript.append(current_entry)
    
    # 結果を整形
    formatted_transcript = []
    for entry in merged_transcript:
        formatted_transcript.append(f"[{entry['timestamp']}] {entry['speaker']}: {entry['text']}")
    
    return "\n".join(formatted_transcript)

def process_audio_with_speaker_diarization(file_path, max_speakers=6, segment_length=20.0):
    """音声ファイルを処理して、話者情報付きの文字起こしを生成"""
    start_time = time.time()
    
    # 1. 音声ファイルの分割
    segments, sr = split_audio_into_segments(file_path, segment_length)
    print(f"{len(segments)}個のセグメントに分割しました")
    
    # 2. 文字起こしの実行
    transcript_result = transcribe_audio(file_path)
    
    # 3. 話者埋め込みの抽出
    embeddings = process_segments_for_speaker_embeddings(segments, sr)
    
    # 4. 話者クラスタリング
    speaker_labels = cluster_speakers(embeddings, max_speakers)
    
    # 5. 文字起こしと話者情報の統合
    transcript_with_speakers = assign_speakers_to_transcript(
        transcript_result, segments, speaker_labels
    )
    
    end_time = time.time()
    processing_time = end_time - start_time
    print(f"処理時間: {processing_time:.2f}秒")
    
    return transcript_with_speakers

def correct_transcript(text):
    """文字起こし結果の整形・校正"""
    # 不要な空白を整理
    text = text.replace("  ", " ")
    
    # 行ごとに処理
    lines = text.split("\n")
    corrected_lines = []
    
    for line in lines:
        # 行が空でない場合
        if line.strip():
            # タイムスタンプと話者と内容に分割
            parts = line.split(": ", 1)
            if len(parts) == 2:
                header, content = parts
                # 文末に句点がない場合は追加
                if content and not content.endswith(('。', '!', '?')):
                    content += '。'
                line = f"{header}: {content}"
            corrected_lines.append(line)
    
    return "\n".join(corrected_lines)

if __name__ == "__main__":
    if len(sys.argv) < 2:
        print("Usage: python transcribe.py <audio_or_video_file> [max_speakers]")
        sys.exit(1)
    
    file_path = sys.argv[1]
    max_speakers = int(sys.argv[2]) if len(sys.argv) > 2 else 6
    
    # 依存パッケージの確認
    try:
        import packaging.version
    except ImportError:
        print("パッケージ 'packaging' がインストールされていません。インストールします...")
        import subprocess
        subprocess.check_call([sys.executable, "-m", "pip", "install", "packaging"])
        import packaging.version
    
    # オーディオ処理と話者分離付き文字起こし
    transcript = process_audio_with_speaker_diarization(file_path, max_speakers)
    
    # 文字起こし結果の校正
    corrected_transcript = correct_transcript(transcript)
    
    # 結果を保存
    base, _ = os.path.splitext(file_path)
    output_file = base + ".txt"  # 出力ファイル名
    with open(output_file, "w", encoding="utf-8") as f:
        f.write(corrected_transcript)
    
    print(f"話者分離付き文字起こし結果を保存しました: {output_file}")

ファイル2: upload_to_notion.py(Notionアップロード機能)

# upload_to_notion.py
import requests
import sys
import os
import json

# 設定情報(指定されたトークンとデータベースID)
NOTION_TOKEN = "YOUR_NOTION_TOKEN"  # Notion APIトークンを設定してください
DATABASE_ID = "YOUR_DATABASE_ID"    # Notion データベースIDを設定してください

def split_text(text, max_length=2000):
    """
    テキストを max_length 文字以内のチャンクに分割する関数。
    できれば改行位置で区切るようにしています。
    """
    blocks = []
    start = 0
    while start < len(text):
        end = start + max_length
        if end < len(text):
            # 改行位置で区切る(見つからなければそのまま)
            newline_index = text.rfind('\n', start, end)
            if newline_index != -1:
                end = newline_index + 1  # 改行文字を含める
        blocks.append(text[start:end])
        start = end
    return blocks

def upload_transcript(title, transcript):
    """
    指定したタイトルと文字起こし内容を Notion のデータベースにアップロードする関数。
    文字起こし内容は 2000 文字以内のチャンクに分割され、各チャンクが段落ブロックとして追加される。
    デバッグ情報とエラー処理を強化。
    """
    print(f"Notionへのアップロードを開始: タイトル「{title}」")
    print(f"テキスト長: {len(transcript)} 文字")
    
    url = "https://api.notion.com/v1/pages"
    headers = {
        "Authorization": f"Bearer {NOTION_TOKEN}",
        "Content-Type": "application/json",
        "Notion-Version": "2022-06-28"  # Notion API のバージョン
    }
    
    # テキストを 2000 文字以内のチャンクに分割
    text_blocks = split_text(transcript, max_length=2000)
    print(f"テキストを {len(text_blocks)} 個のブロックに分割しました")
    
    # 各チャンクを段落ブロックとして children に追加
    children = []
    for block_text in text_blocks:
        children.append({
            "object": "block",
            "type": "paragraph",
            "paragraph": {
                "rich_text": [
                    {
                        "type": "text",
                        "text": {
                            "content": block_text,
                            "link": None
                        }
                    }
                ]
            }
        })
    
    data = {
        "parent": { "database_id": DATABASE_ID },
        "properties": {
            "Name": {
                "title": [
                    {
                        "text": {
                            "content": title
                        }
                    }
                ]
            }
        },
        "children": children
    }
    
    # デバッグ情報:トークンとデータベースID
    print(f"Notionトークン: {NOTION_TOKEN[:5]}...{NOTION_TOKEN[-5:]} (全{len(NOTION_TOKEN)}文字)")
    print(f"データベースID: {DATABASE_ID}")
    
    try:
        # APIリクエストの送信
        response = requests.post(url, headers=headers, json=data)
        
        # レスポンスの確認
        print(f"Notionレスポンスステータス: {response.status_code}")
        
        if response.status_code == 200:
            print("Notionへのアップロード成功")
            print(f"ページID: {response.json().get('id', 'unknown')}")
            return 0  # 成功
        else:
            print(f"Error: {response.status_code}")
            print(f"レスポンス本文: {response.text}")
            
            # JSONレスポンスの解析を試みる
            try:
                error_json = response.json()
                print(f"エラーコード: {error_json.get('code')}")
                print(f"エラーメッセージ: {error_json.get('message')}")
            except:
                print("JSONレスポンスの解析に失敗しました")
            
            return 1  # エラー
    except Exception as e:
        print(f"例外が発生しました: {e}")
        return 2  # 例外

if __name__ == "__main__":
    # コマンドライン引数が指定されていない場合はデフォルトのテストデータで実行
    if len(sys.argv) < 3:
        print("Usage: python upload_to_notion.py <title> <transcript_file>")
        print("引数が指定されなかったので、テスト用のデータでアップロードを試みます。")
        test_title = "テストアップロード"
        test_transcript = (
            "これはテストの文字起こし内容です。長いテキストの場合、2000文字ごとに分割され、"
            "複数の段落ブロックとして Notion のデータベースにアップロードされます。"
            "(ここに長いテキストを入れてテストしてください。)"
        )
        result = upload_transcript(test_title, test_transcript)
        sys.exit(result)
    else:
        title = sys.argv[1]
        transcript_file = sys.argv[2]
        
        # ファイルの存在チェック
        if not os.path.exists(transcript_file):
            print(f"ファイルが見つかりません: {transcript_file}")
            sys.exit(1)
            
        try:
            with open(transcript_file, "r", encoding="utf-8") as f:
                transcript = f.read()
                print(f"ファイル '{transcript_file}' を読み込みました ({len(transcript)} 文字)")
        except Exception as e:
            print(f"ファイルの読み込みに失敗しました: {e}")
            sys.exit(1)
            
        result = upload_transcript(title, transcript)
        sys.exit(result)

ファイル3: process_recording.sh(自動処理スクリプト)

#!/bin/bash
# process_recording.sh
# このスクリプトは、Raw フォルダーにある mp4 ファイルを対象に、
# 文字起こしを実行し、Notion へのアップロードを行い、処理済みファイルを移動します。

# 第1引数:対象ファイルのパス(例:/path/to/Movies/OBS/Raw/xxx.mp4)
file="$1"
LOG_FILE="/path/to/scripts/automator_debug.log"

# ログに受信ファイル情報を記録
echo "$(date '+%Y-%m-%d %H:%M:%S') - Process triggered: $file" >> "$LOG_FILE"
echo "$(date '+%Y-%m-%d %H:%M:%S') - Received File: $file" >> "$LOG_FILE"

# ファイル名と拡張子の取得(余分な空白を除去し、小文字に変換)
filename=$(basename "$file")
extension="${filename##*.}"
extension_lower=$(echo "$extension" | tr -d '[:space:]' | tr '[:upper:]' '[:lower:]')
echo "$(date '+%Y-%m-%d %H:%M:%S') - File Extension: [$extension_lower]" >> "$LOG_FILE"

# mp4 ファイル以外は何もしない(二重チェック)
if [ "$extension_lower" != "mp4" ]; then
    echo "$(date '+%Y-%m-%d %H:%M:%S') - Not an mp4 file, skipping: $file" >> "$LOG_FILE"
    exit 0
fi

echo "$(date '+%Y-%m-%d %H:%M:%S') - Processing mp4 event: $file" >> "$LOG_FILE"

# 必要なら、ffmpeg のパス(Anaconda 経由の場合)を追加
export PATH="/path/to/anaconda/bin:$PATH"

# 一時ディレクトリ(処理中ファイルの保存場所)
TEMP_DIR="/path/to/scripts/temp_processing"
mkdir -p "$TEMP_DIR"

# mp4 ファイルのベース名をタイトルとして使用(拡張子を除く)
title=$(basename "$file" .mp4)

# 一時的なテキストファイルのパス
temp_txt_file="$TEMP_DIR/${title}_temp.txt"

# 1. 文字起こしの実行(transcribe.py を実行して txt ファイルを生成)
echo "$(date '+%Y-%m-%d %H:%M:%S') - Starting transcription for: $file" >> "$LOG_FILE"
python3 /path/to/scripts/transcribe.py "$file" >> "$LOG_FILE" 2>&1
echo "$(date '+%Y-%m-%d %H:%M:%S') - Transcription complete for: $file" >> "$LOG_FILE"

# 2. 生成された txt ファイルのパス(拡張子を .txt に置換)
txt_file="${file%.*}.txt"

# 3. txt ファイルの生成完了を待機する(最大300秒、5秒ごとにチェック)
timeout=300
elapsed=0
while [ ! -f "$txt_file" ] && [ $elapsed -lt $timeout ]; do
    sleep 5
    elapsed=$((elapsed + 5))
    echo "$(date '+%Y-%m-%d %H:%M:%S') - Waiting for txt file... ($elapsed seconds)" >> "$LOG_FILE"
done

if [ ! -f "$txt_file" ]; then
    echo "$(date '+%Y-%m-%d %H:%M:%S') - Error: txt file not found after waiting: $txt_file" >> "$LOG_FILE"
    exit 1
fi

echo "$(date '+%Y-%m-%d %H:%M:%S') - txt file confirmed: $txt_file" >> "$LOG_FILE"

# テキストファイルを一時ファイルにコピー
echo "$(date '+%Y-%m-%d %H:%M:%S') - Copying txt file to temp location: $temp_txt_file" >> "$LOG_FILE"
cp "$txt_file" "$temp_txt_file"

# 4. Notion へのアップロードを実行(一時ファイルを使用)
echo "$(date '+%Y-%m-%d %H:%M:%S') - Starting Notion upload for: $temp_txt_file" >> "$LOG_FILE"
python3 /path/to/scripts/upload_to_notion.py "$title" "$temp_txt_file" >> "$LOG_FILE" 2>&1
notion_status=$?
echo "$(date '+%Y-%m-%d %H:%M:%S') - Notion upload completed with status: $notion_status" >> "$LOG_FILE"

# 5. 処理完了後、mp4 と txt ファイルを Processed フォルダーに移動(再トリガー防止)
PROCESSED_DIR="/path/to/Movies/OBS/Processed"
mkdir -p "$PROCESSED_DIR"
echo "$(date '+%Y-%m-%d %H:%M:%S') - Moving files to Processed: $file and $txt_file" >> "$LOG_FILE"

# ファイル移動
mv "$file" "$PROCESSED_DIR/" 2>> "$LOG_FILE"
mv "$txt_file" "$PROCESSED_DIR/" 2>> "$LOG_FILE"

# 一時ファイルの削除
rm -f "$temp_txt_file" 2>> "$LOG_FILE"
echo "$(date '+%Y-%m-%d %H:%M:%S') - Temp file removed: $temp_txt_file" >> "$LOG_FILE"

echo "$(date '+%Y-%m-%d %H:%M:%S') - Files moved to Processed: $file and $txt_file" >> "$LOG_FILE"

exit 0

詳細なセットアップ手順

ステップ1: ディレクトリ構造の準備

まずは必要なディレクトリ構造を作成しましょう。

# スクリプト用のディレクトリを作成
mkdir -p ~/Scripts

# 処理済みファイルの保存用ディレクトリを作成
mkdir -p ~/Processed_Recordings

# 仮想環境を作成
cd ~/Scripts
python -m venv venv

ステップ2: 仮想環境のアクティブ化と依存パッケージのインストール

# 仮想環境を有効化
source ~/Scripts/venv/bin/activate

# 必要なパッケージをインストール
pip install openai-whisper torch numpy resemblyzer scikit-learn librosa tqdm packaging requests

ステップ3: スクリプトの作成

先程のコードを使って3つのスクリプトファイルを作成します。

# 文字起こしスクリプトの作成
cat > ~/Scripts/transcribe.py << 'EOF'
# ここに transcribe.py の内容をコピー
EOF

# Notion アップロードスクリプトの作成
cat > ~/Scripts/upload_to_notion.py << 'EOF'
# ここに upload_to_notion.py の内容をコピー
EOF

# 処理自動化スクリプトの作成
cat > ~/Scripts/process_recording.sh << 'EOF'
# ここに process_recording.sh の内容をコピー
EOF

ステップ4: Notionの設定を反映

upload_to_notion.pyファイルを編集して、実際のNotionトークンとデータベースIDを設定します。

# 設定を編集
nano ~/Scripts/upload_to_notion.py

# 以下の行を自分の情報に置き換え
# NOTION_TOKEN = "your_notion_token_here"
# DATABASE_ID = "your_database_id_here"

ステップ5: スクリプトに実行権限を付与

chmod +x ~/Scripts/process_recording.sh

ステップ6: テスト実行

システムが正しく動作するか確認するためにテスト実行します。

# 手動でテスト実行
~/Scripts/process_recording.sh /path/to/your/test_recording.mp4

システムの使い方

手動実行

任意のMP4ファイルに対して手動で文字起こしを実行する場合:

~/Scripts/process_recording.sh /path/to/your/recording.mp4

自動化の設定

macOSの場合フォルダアクションを使用

  1. Automatorを開く
  2. 「新規ドキュメント」→「フォルダアクション」を選択
  3. 「フォルダを選択」で監視するフォルダを選択(例: ~/Recordings)
  4. 「シェルスクリプトを実行」アクションを追加
  5. 以下のスクリプトを入力:
# MP4ファイルのみを処理するように修正
for f in "$@"
do
  # ファイル拡張子をチェック(大文字小文字を区別しない)
  ext=$(echo "${f##*.}" | tr '[:upper:]' '[:lower:]')
  
  # MP4ファイルのみ処理
  if [ "$ext" = "mp4" ]; then
    /Users/kjmooon/Scripts/process_recording.sh "$f"
  else
    echo "MP4ファイルではないためスキップします: $f" >> "/Users/kjmooon/Scripts/automator_debug.log"
  fi
done
  1. 保存して閉じる

アウトプットのイメージ

生成されるテキストファイル形式

[0:00:05] SPEAKER_00: こんにちは、本日はお忙しい中お集まりいただきありがとうございます。それでは会議を始めましょう。

[0:00:12] SPEAKER_00: まず最初に前回の議事録について確認したいと思います。山田さん、資料の共有をお願いできますか?

[0:00:19] SPEAKER_01: はい、画面に表示しています。前回は主に3つの議題について話し合いました。一つ目はプロジェクトAの進捗状況、二つ目は新規採用について、三つ目は次回のイベント計画についてです。

[0:00:31] SPEAKER_00: ありがとうございます。では、まずプロジェクトAの進捗状況から確認していきましょう。佐藤さん、現在の状況を共有していただけますか?

[0:00:40] SPEAKER_02: はい、プロジェクトAは現在予定通り進行しています。先週の障害については既に解決し、現在はテスト段階に入っています。来週には最初のベータ版を公開できる見込みです。

Notionでの表示

Notionデータベースには、以下のように表示されます:

  • タイトル: 元のMP4ファイル名(例: 「会議録_2025-03-24」)
  • 内容: 話者分離付き文字起こし結果(上記のフォーマット)

各セグメントが整然と表示され、誰がいつ何を話したのかが一目でわかります。

カスタマイズのポイント

1. Whisperモデルのサイズ調整

処理速度と精度のバランスに応じてモデルサイズを変更できます。

# transcribe.py の transcribe_audio 関数内
model = whisper.load_model("small")  # "tiny", "base", "small", "medium", "large" から選択

2. 話者数の調整

会議の参加者数に合わせて最大話者数を調整します。

# コマンドライン引数で指定
~/Scripts/process_recording.sh /path/to/recording.mp4 8  # 最大8人の話者を検出

または、スクリプト内で直接変更:

# transcribe.py 内
max_speakers = 8  # デフォルト値を変更

3. セグメント長の調整

音声をどのくらいの長さで区切るかを調整できます。長すぎると精度が下がり、短すぎると処理時間が増加します。

# process_audio_with_speaker_diarization 関数の呼び出し時
transcript = process_audio_with_speaker_diarization(file_path, max_speakers, segment_length=15.0)  # 15秒に変更

4. 出力形式のカスタマイズ

時間表示形式や話者表示を変更したい場合は、assign_speakers_to_transcript 関数内の出力フォーマットを調整します。

トラブルシューティング

よくある問題と解決策

  1. メモリエラー:
    • 症状: RuntimeError: CUDA out of memory
    • 解決策: Whisperのモデルサイズを小さくする(medium→small→base
  2. インストールエラー:
    • 症状: "ERROR: Could not build wheels for ..."
    • 解決策: 開発ツールをインストール
    # Macの場合
    xcode-select --install
    
  3. 音声読み込みエラー:
    • 症状: Could not open audio file
    • 解決策: FFmpegをインストール
    # Macの場合
    brew install ffmpeg
    
  4. Notionアップロードエラー:
    • 症状: {"object":"error","status":404,"code":"object_not_found"}
    • 解決策: Notionのデータベースが存在するか、インテグレーションがデータベースにアクセス権を持っているか確認
  5. 処理時間が長すぎる:
    • 症状: 1時間の録音処理に数時間かかる
    • 解決策:
      • Whisperのモデルサイズを小さくする
      • segment_length を増やして話者分離のセグメント数を減らす
      • GPUがある場合は GPU モードを使用する

まとめ

このチュートリアルでは、Pythonを使用して話者分離付き文字起こしシステムを構築し、結果をNotionにアップロードする完全な自動化システムを実装しました。
主な機能を振り返ると:

  1. 高精度な文字起こし: OpenAI Whisperによる高品質な音声認識
  2. 話者分離: 機械学習技術を使って誰が話しているかを識別
  3. 同一話者のマージ: 同じ話者の連続した発言を自然にまとめる
  4. タイムスタンプ付き出力: いつ誰が何を言ったのかが一目でわかる
  5. Notionへの自動アップロード: チーム全体で簡単に参照可能
  6. 完全自動化: 録音完了から処理、アップロードまですべて自動化

このシステムは、会議録の作成、インタビューの文字起こし、講義の記録など、様々な場面で活用できます。さらに拡張することで、例えば話者の実名でラベリングしたり、特定の話題だけを抽出したりすることも可能です。
このチュートリアルが、あなたの文字起こし作業の効率化に役立つことを願っています。ぜひカスタマイズして、自分の用途に最適化してみてください!

Discussion