👌
Slackの特定チャンネルのメッセージをスレッド含め取得
はじめに
色々検索したりしてみたのですが、イマイチピンポイントでコードまで含めた完成形に近い記事が見当たらなかったのでSlackの特定チャンネルのメッセージをスレッド含めて全件取得するコードをNo Coding(No 人間成分)で作成しました。
環境
- ローカル環境(Windows)
- Python
- Cursor
- Claude Desktop
やったこと
ユーザーからの依頼で特定チャンネルのメッセージを全件取得したいとの事で、時間をかければSlackの標準エクスポート機能でも出来るのですが今回はプライベートチャンネルだったため標準機能(パブリックチャンネルのみエクスポート可能)が使えず、APIによる取得を行いました。
当初は、conversations_historyでmax1000件毎に取得したら一瞬で終わったので楽勝かと思ったら
ユーザーからスレッド迄含めて取得できていないとのこと。
どうやら、conversations_repliesを使わないと返信が取得できないようです。
全体像は分かったので後はNo Codingでプロンプトだけでコードを作成しました。
開発
Slack Appの権限は生成AIに確認するとすぐに出てきます。
動作確認
2019年から存在するチャンネルだったのでメッセージ総数は3175件、そのうちスレッドがあるメッセージ数が1311件でした
メッセージの取得自体は何ら制限もなく30分もかからなかったと思いますが最後のファイルに保存する
処理が異様に遅く2時間近く待って保存されました。
コード
メッセージを取得するコード。Bot Token/Channel id はローカルなのでベタ張りです。
import json
import os
from datetime import datetime
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
import time
# ★ここに自分のSlack Bot Tokenを記入
SLACK_BOT_TOKEN = ""
CHANNEL_ID = ""
# ファイルサイズの上限(MB)
MAX_FILE_SIZE_MB = 50
# 保存先ディレクトリ(Noneの場合は現在のディレクトリ)
SAVE_DIRECTORY = None # 例: "exports" や "/path/to/save" など
client = WebClient(token=SLACK_BOT_TOKEN)
def fetch_all_messages(channel_id):
"""チャンネルの全メッセージを取得"""
messages = []
cursor = None
print("チャンネルのメッセージを取得中...")
while True:
try:
response = client.conversations_history(
channel=channel_id,
limit=1000,
cursor=cursor
)
messages.extend(response['messages'])
print(f"取得済みメッセージ数: {len(messages)}")
if response.get('has_more'):
cursor = response['response_metadata']['next_cursor']
# API制限を考慮して少し待機
time.sleep(1)
else:
break
except SlackApiError as e:
print(f"メッセージ取得エラー: {e}")
break
return messages
def fetch_thread_replies(channel_id, thread_ts):
"""指定されたスレッドの返信を取得"""
try:
response = client.conversations_replies(
channel=channel_id,
ts=thread_ts
)
# 最初のメッセージ(親メッセージ)を除いた返信のみを返す
return response['messages'][1:] if len(response['messages']) > 1 else []
except SlackApiError as e:
print(f"スレッド返信取得エラー (ts: {thread_ts}): {e}")
return []
def fetch_all_messages_with_threads(channel_id):
"""チャンネルの全メッセージとスレッドを取得"""
# メインメッセージを取得
messages = fetch_all_messages(channel_id)
print("\nスレッドの返信を取得中...")
# スレッドがあるメッセージの返信を取得
for i, message in enumerate(messages):
if 'thread_ts' in message and message.get('reply_count', 0) > 0:
print(f"スレッド取得中 ({i+1}/{len(messages)}): {message.get('reply_count', 0)}件の返信")
thread_replies = fetch_thread_replies(channel_id, message['thread_ts'])
if thread_replies:
# メッセージにスレッドの返信を追加
message['thread_replies'] = thread_replies
# API制限を考慮して待機
time.sleep(0.5)
return messages
def get_file_size_mb(data):
"""データのJSONファイルサイズを計算(MB)"""
json_string = json.dumps(data, ensure_ascii=False, indent=2)
size_bytes = len(json_string.encode('utf-8'))
return size_bytes / (1024 * 1024)
def save_messages_to_files(messages, base_filename="slack_messages", max_size_mb=MAX_FILE_SIZE_MB):
"""メッセージを複数のファイルに分割して保存"""
# 保存先ディレクトリを作成
if SAVE_DIRECTORY:
os.makedirs(SAVE_DIRECTORY, exist_ok=True)
# タイムスタンプを含むベースファイル名を作成
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
base_name = f"{base_filename}_{timestamp}"
file_count = 1
current_batch = []
total_messages = 0
for message in messages:
# 現在のバッチにメッセージを追加してサイズをチェック
test_batch = current_batch + [message]
if get_file_size_mb(test_batch) > max_size_mb and current_batch:
# サイズ上限を超えた場合、現在のバッチを保存
filename = f"{base_name}_part{file_count:03d}.json"
if SAVE_DIRECTORY:
filename = os.path.join(SAVE_DIRECTORY, filename)
save_batch_to_file(current_batch, filename)
total_messages += len(current_batch)
# 新しいバッチを開始
current_batch = [message]
file_count += 1
else:
current_batch.append(message)
# 最後のバッチを保存
if current_batch:
if file_count == 1:
filename = f"{base_name}.json"
else:
filename = f"{base_name}_part{file_count:03d}.json"
if SAVE_DIRECTORY:
filename = os.path.join(SAVE_DIRECTORY, filename)
save_batch_to_file(current_batch, filename)
total_messages += len(current_batch)
print(f"\n=== 保存完了 ===")
print(f"保存先: {SAVE_DIRECTORY or '現在のディレクトリ'}")
print(f"総メッセージ数: {total_messages}")
print(f"ファイル数: {file_count}")
def save_batch_to_file(messages, filename):
"""メッセージのバッチをファイルに保存"""
# スレッドの返信数を計算
thread_replies_count = sum(
len(msg.get('thread_replies', [])) for msg in messages
)
# メタデータを追加
data = {
"metadata": {
"export_timestamp": datetime.now().isoformat(),
"channel_id": CHANNEL_ID,
"message_count": len(messages),
"thread_replies_count": thread_replies_count,
"file_size_mb": round(get_file_size_mb(messages), 2)
},
"messages": messages
}
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
file_size = round(get_file_size_mb(data), 2)
print(f"保存: {filename} ({len(messages)}メッセージ, {thread_replies_count}スレッド返信, {file_size}MB)")
def get_channel_info(channel_id):
"""チャンネル情報を取得"""
try:
response = client.conversations_info(channel=channel_id)
return response['channel']
except SlackApiError as e:
print(f"チャンネル情報取得エラー: {e}")
return None
def main():
print("=== Slack チャンネルメッセージ取得ツール ===")
# チャンネル情報を表示
channel_info = get_channel_info(CHANNEL_ID)
if channel_info:
print(f"チャンネル: #{channel_info.get('name', 'Unknown')} ({CHANNEL_ID})")
else:
print(f"チャンネルID: {CHANNEL_ID}")
print(f"最大ファイルサイズ: {MAX_FILE_SIZE_MB}MB")
print()
try:
# メッセージとスレッドを取得
all_messages = fetch_all_messages_with_threads(CHANNEL_ID)
if not all_messages:
print("メッセージが見つかりませんでした。")
return
# スレッドの統計を表示
messages_with_threads = sum(1 for msg in all_messages if 'thread_replies' in msg)
total_thread_replies = sum(len(msg.get('thread_replies', [])) for msg in all_messages)
print(f"\n=== 取得結果 ===")
print(f"メインメッセージ数: {len(all_messages)}")
print(f"スレッドがあるメッセージ数: {messages_with_threads}")
print(f"スレッド返信総数: {total_thread_replies}")
# ファイルに保存
save_messages_to_files(all_messages)
except KeyboardInterrupt:
print("\n処理が中断されました。")
except Exception as e:
print(f"予期しないエラーが発生しました: {e}")
if __name__ == "__main__":
main()
Discussion