👌

Slackの特定チャンネルのメッセージをスレッド含め取得

に公開

はじめに

色々検索したりしてみたのですが、イマイチピンポイントでコードまで含めた完成形に近い記事が見当たらなかったのでSlackの特定チャンネルのメッセージをスレッド含めて全件取得するコードをNo Coding(No 人間成分)で作成しました。

環境

  • ローカル環境(Windows)
  • Python
  • Cursor
  • Claude Desktop

やったこと

ユーザーからの依頼で特定チャンネルのメッセージを全件取得したいとの事で、時間をかければSlackの標準エクスポート機能でも出来るのですが今回はプライベートチャンネルだったため標準機能(パブリックチャンネルのみエクスポート可能)が使えず、APIによる取得を行いました。

当初は、conversations_historyでmax1000件毎に取得したら一瞬で終わったので楽勝かと思ったら
ユーザーからスレッド迄含めて取得できていないとのこと。

どうやら、conversations_repliesを使わないと返信が取得できないようです。

全体像は分かったので後はNo Codingでプロンプトだけでコードを作成しました。

開発

Slack Appの権限は生成AIに確認するとすぐに出てきます。

動作確認

2019年から存在するチャンネルだったのでメッセージ総数は3175件、そのうちスレッドがあるメッセージ数が1311件でした

メッセージの取得自体は何ら制限もなく30分もかからなかったと思いますが最後のファイルに保存する
処理が異様に遅く2時間近く待って保存されました。

コード

メッセージを取得するコード。Bot Token/Channel id はローカルなのでベタ張りです。

import json
import os
from datetime import datetime
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
import time

# ★ここに自分のSlack Bot Tokenを記入
SLACK_BOT_TOKEN = ""
CHANNEL_ID = ""

# ファイルサイズの上限(MB)
MAX_FILE_SIZE_MB = 50

# 保存先ディレクトリ(Noneの場合は現在のディレクトリ)
SAVE_DIRECTORY = None  # 例: "exports" や "/path/to/save" など

client = WebClient(token=SLACK_BOT_TOKEN)

def fetch_all_messages(channel_id):
    """チャンネルの全メッセージを取得"""
    messages = []
    cursor = None
    
    print("チャンネルのメッセージを取得中...")
    
    while True:
        try:
            response = client.conversations_history(
                channel=channel_id,
                limit=1000,
                cursor=cursor
            )
            messages.extend(response['messages'])
            print(f"取得済みメッセージ数: {len(messages)}")

            if response.get('has_more'):
                cursor = response['response_metadata']['next_cursor']
                # API制限を考慮して少し待機
                time.sleep(1)
            else:
                break
        except SlackApiError as e:
            print(f"メッセージ取得エラー: {e}")
            break

    return messages

def fetch_thread_replies(channel_id, thread_ts):
    """指定されたスレッドの返信を取得"""
    try:
        response = client.conversations_replies(
            channel=channel_id,
            ts=thread_ts
        )
        # 最初のメッセージ(親メッセージ)を除いた返信のみを返す
        return response['messages'][1:] if len(response['messages']) > 1 else []
    except SlackApiError as e:
        print(f"スレッド返信取得エラー (ts: {thread_ts}): {e}")
        return []

def fetch_all_messages_with_threads(channel_id):
    """チャンネルの全メッセージとスレッドを取得"""
    # メインメッセージを取得
    messages = fetch_all_messages(channel_id)
    
    print("\nスレッドの返信を取得中...")
    
    # スレッドがあるメッセージの返信を取得
    for i, message in enumerate(messages):
        if 'thread_ts' in message and message.get('reply_count', 0) > 0:
            print(f"スレッド取得中 ({i+1}/{len(messages)}): {message.get('reply_count', 0)}件の返信")
            
            thread_replies = fetch_thread_replies(channel_id, message['thread_ts'])
            if thread_replies:
                # メッセージにスレッドの返信を追加
                message['thread_replies'] = thread_replies
            
            # API制限を考慮して待機
            time.sleep(0.5)
    
    return messages

def get_file_size_mb(data):
    """データのJSONファイルサイズを計算(MB)"""
    json_string = json.dumps(data, ensure_ascii=False, indent=2)
    size_bytes = len(json_string.encode('utf-8'))
    return size_bytes / (1024 * 1024)

def save_messages_to_files(messages, base_filename="slack_messages", max_size_mb=MAX_FILE_SIZE_MB):
    """メッセージを複数のファイルに分割して保存"""
    
    # 保存先ディレクトリを作成
    if SAVE_DIRECTORY:
        os.makedirs(SAVE_DIRECTORY, exist_ok=True)
    
    # タイムスタンプを含むベースファイル名を作成
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    base_name = f"{base_filename}_{timestamp}"
    
    file_count = 1
    current_batch = []
    total_messages = 0
    
    for message in messages:
        # 現在のバッチにメッセージを追加してサイズをチェック
        test_batch = current_batch + [message]
        
        if get_file_size_mb(test_batch) > max_size_mb and current_batch:
            # サイズ上限を超えた場合、現在のバッチを保存
            filename = f"{base_name}_part{file_count:03d}.json"
            if SAVE_DIRECTORY:
                filename = os.path.join(SAVE_DIRECTORY, filename)
            save_batch_to_file(current_batch, filename)
            total_messages += len(current_batch)
            
            # 新しいバッチを開始
            current_batch = [message]
            file_count += 1
        else:
            current_batch.append(message)
    
    # 最後のバッチを保存
    if current_batch:
        if file_count == 1:
            filename = f"{base_name}.json"
        else:
            filename = f"{base_name}_part{file_count:03d}.json"
        if SAVE_DIRECTORY:
            filename = os.path.join(SAVE_DIRECTORY, filename)
        save_batch_to_file(current_batch, filename)
        total_messages += len(current_batch)
    
    print(f"\n=== 保存完了 ===")
    print(f"保存先: {SAVE_DIRECTORY or '現在のディレクトリ'}")
    print(f"総メッセージ数: {total_messages}")
    print(f"ファイル数: {file_count}")

def save_batch_to_file(messages, filename):
    """メッセージのバッチをファイルに保存"""
    # スレッドの返信数を計算
    thread_replies_count = sum(
        len(msg.get('thread_replies', [])) for msg in messages
    )
    
    # メタデータを追加
    data = {
        "metadata": {
            "export_timestamp": datetime.now().isoformat(),
            "channel_id": CHANNEL_ID,
            "message_count": len(messages),
            "thread_replies_count": thread_replies_count,
            "file_size_mb": round(get_file_size_mb(messages), 2)
        },
        "messages": messages
    }
    
    with open(filename, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=2)
    
    file_size = round(get_file_size_mb(data), 2)
    print(f"保存: {filename} ({len(messages)}メッセージ, {thread_replies_count}スレッド返信, {file_size}MB)")

def get_channel_info(channel_id):
    """チャンネル情報を取得"""
    try:
        response = client.conversations_info(channel=channel_id)
        return response['channel']
    except SlackApiError as e:
        print(f"チャンネル情報取得エラー: {e}")
        return None

def main():
    print("=== Slack チャンネルメッセージ取得ツール ===")
    
    # チャンネル情報を表示
    channel_info = get_channel_info(CHANNEL_ID)
    if channel_info:
        print(f"チャンネル: #{channel_info.get('name', 'Unknown')} ({CHANNEL_ID})")
    else:
        print(f"チャンネルID: {CHANNEL_ID}")
    
    print(f"最大ファイルサイズ: {MAX_FILE_SIZE_MB}MB")
    print()
    
    try:
        # メッセージとスレッドを取得
        all_messages = fetch_all_messages_with_threads(CHANNEL_ID)
        
        if not all_messages:
            print("メッセージが見つかりませんでした。")
            return
        
        # スレッドの統計を表示
        messages_with_threads = sum(1 for msg in all_messages if 'thread_replies' in msg)
        total_thread_replies = sum(len(msg.get('thread_replies', [])) for msg in all_messages)
        
        print(f"\n=== 取得結果 ===")
        print(f"メインメッセージ数: {len(all_messages)}")
        print(f"スレッドがあるメッセージ数: {messages_with_threads}")
        print(f"スレッド返信総数: {total_thread_replies}")
        
        # ファイルに保存
        save_messages_to_files(all_messages)
        
    except KeyboardInterrupt:
        print("\n処理が中断されました。")
    except Exception as e:
        print(f"予期しないエラーが発生しました: {e}")

if __name__ == "__main__":
    main()


Discussion