Chatworkの通知を受け取るSlackアプリを作った話
こんにちは、株式会社シグマアイのリサーチャーの@mory22kです。
今回は、業務円滑化のために社内で導入したChatworkの通知をSlackで受け取るインテグレーションについて紹介します。
はじめに
弊社はたくさんのクライアント様と頻繁にやりとりしており、クライアント様の要望や制約に応じて様々なコミュニケーションツールを併用しています。
こうした複数のツールの併用を続けると、情報の見逃しや重複などのインシデントに繋がりかねません。幸い弊社はまだそれほどまで多くのアプリを併用するには至っていませんが、今後も様々な相手とやりとりを進める中で、情報の一元化がますます重要になると考えています。
複数のメッセージアプリの併用は思わぬ事故につながるかもしれません
こうした課題を解決するため、Slackを基点に、様々なツールの通知を集約する仕組みを導入し、情報管理を一元化する取り組みを進めることとなりました。
今回の記事では、その一環としてChatworkの通知をSlackで受け取るアプリを作成した事例を紹介します。
通知機能の要件
今回実装した通知機能の目的は「メッセージが来たことを知らせる」ことであり、通知を受け取ったら、その内容を確認するためにChatworkにアクセスするという使い方を想定しています。そこで、以下のような要件を設定しました。
- Chatworkの特定のチャットグループにメッセージが投稿された際に、できるだけリアルタイムにSlackに通知を送信する
- メッセージの「内容」「送信日時」の情報は必ず通知に含める
また、以下のような制約を設けました。
- デプロイ先は社内のデモンストレーション用サーバとし、不特定多数からのアクセスが原理的に不可能な状況とする
設計
ChatworkからSlackへの通知機能の実装方法は、どこを起点とするかによって2通り考えられます。
パターン1: Webhook URLで起動する
- 社内サーバでWebhook URLを発行し、ChatworkにそのURLを教えるパターン
- 発行したURLに送信されたメッセージがちゃんとChatworkから送られたものなのかを判定するために署名チェックを用いる必要あり
パターン2: Chatwork APIを定期的に監視する
- 手持ちサーバから定期的に (5分間隔など) 新メッセージがあるかをChatworkに問い合わせる
- Chatwork APIのアクセス頻度上限に達しない範囲で、社内での使用目的も考慮して、適切な頻度で問い合わせる
選択したパターン
計算コスト、通信容量や各スキーマの設計思想に照らし合わせて考えると、本来であればパターン1を採用するのが最適な選択です。しかし、今回デプロイ先とした社内サーバは主に社内デモ用として使用されており、外部からのアクセスが遮断されているので、Webhook URLを発行してもChatworkからアクセスすることができません。したがって今回はやむを得ずパターン2を採用することとなりました。
実装
今回はPythonで実装しました。以下に実装の概要を示します。
アプリ本体の構成
.
├── .env
├── README.md
├── app.py
├── entrypoint.sh
├── poetry.lock
└── pyproject.toml
-
app.py
: メインスクリプト -
entrypoint.sh
: アプリの起動スクリプト -
.env
: 環境変数を設定するファイル -
poetry.lock
,pyproject.toml
: 依存パッケージの管理ファイル -
README.md
: 説明書
app.py
メインスクリプト # app.py
from dotenv import load_dotenv
import os
import json
import logging
import requests
from datetime import datetime, timedelta, timezone
import time
# 環境変数の読み込み
load_dotenv()
CHATWORK_API_TOKEN = os.getenv("CHATWORK_API_TOKEN")
CHATWORK_ROOM_ID = os.getenv("CHATWORK_ROOM_ID")
SLACK_WEBHOOK_URL = os.getenv("SLACK_WEBHOOK_URL")
EXECUTION_INTERVAL_MIN = int(os.getenv("EXECUTION_INTERVAL_MIN", "15"))
START_HOUR = int(os.getenv("START_HOUR", "9"))
END_HOUR = int(os.getenv("END_HOUR", "18"))
# 日本時間の設定
JST = timezone(timedelta(hours=9))
def create_logger(name: str = __name__) -> logging.Logger:
"""ロガーを作成"""
logger = logging.getLogger(name)
if not logger.handlers:
handler = logging.StreamHandler()
handler.setLevel(logging.INFO)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
logger = create_logger()
def fetch_new_chatwork_messages():
"""Chatworkから新規メッセージを取得"""
url = f"https://api.chatwork.com/v2/rooms/{CHATWORK_ROOM_ID}/messages?force=0"
headers = {"X-ChatWorkToken": CHATWORK_API_TOKEN}
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as http_err:
logger.debug(f"HTTPエラーが発生しました: {http_err}")
except requests.exceptions.RequestException as err:
logger.debug(f"リクエストエラーが発生しました: {err}")
return []
def forward_to_slack(messages):
"""メッセージをSlackに送信"""
slack_data = {"text": messages}
try:
response = requests.post(
SLACK_WEBHOOK_URL,
data=json.dumps(slack_data),
headers={"Content-Type": "application/json"},
)
response.raise_for_status()
logger.info(f"Messages sent to Slack successfully.\n{messages}")
except requests.exceptions.RequestException as e:
logger.error(f"Failed to send messages to Slack: {e}")
def format_message(message_data):
"""メッセージをSlack用にフォーマット"""
account_id = message_data.get("account", {}).get("account_id", "Unknown")
body = message_data.get("body", "")
timestamp = message_data.get("send_time", 0)
send_time = datetime.fromtimestamp(timestamp, JST).strftime("%Y-%m-%d %H:%M:%S")
title = f"New message from [id:{account_id}] at {send_time}"
return f"{title}\n{'=' * 10}\n{body}\n{'=' * 10}"
def fetch_and_forward_new_messages():
"""新規メッセージを取得し、Slackに転送"""
messages = fetch_new_chatwork_messages()
if messages:
messages_formatted = [format_message(message) for message in messages]
messages_combined = "\n\n".join(messages_formatted)
forward_to_slack(messages_combined)
else:
logger.debug("No new messages found.")
def wait_until_start_hour(start_hour: int):
"""指定された開始時刻まで待機する"""
while True:
now = datetime.now()
# today
start_time = datetime(now.year, now.month, now.day, start_hour, 0, 0)
# tomorrow
if now >= start_time:
start_time += timedelta(days=1)
remaining_time_sec = (start_time - now).total_seconds()
if remaining_time_sec > 0:
remaining_time_hhmmss = timedelta(seconds=int(remaining_time_sec))
logger.info(
f"Waiting until {start_hour}:00... Sleeping for {remaining_time_hhmmss} seconds."
)
time.sleep(remaining_time_sec)
else:
logger.info(f"Start time reached: {start_hour}:00")
break
if __name__ == "__main__":
while True:
current_time = datetime.now(JST)
current_hour = current_time.hour
if START_HOUR <= current_hour < END_HOUR:
fetch_and_forward_new_messages()
logger.info(f"Sleeping for {EXECUTION_INTERVAL_MIN} minutes.")
time.sleep(EXECUTION_INTERVAL_MIN * 60)
else:
wait_until_start_hour(START_HOUR)
解説
def fetch_new_chatwork_messages():
- Chatwork APIを利用して指定されたルームの新しいメッセージを取得。
-
force=0
を指定し、APIで未取得のメッセージのみを取得します。
def forward_to_slack(messages):
- SlackのWebhook URLを利用して、取得したメッセージを転送。
- メッセージは
json.dumps
を用いてJSON形式でPOSTリクエストされます。
def format_message(message_data):
- Chatworkのメッセージから次のものを抽出。
- 送信者のID (
account_id
) - メッセージ本文 (
body
) - 送信時刻 (
send_time
)
- 送信者のID (
- 日本時間 (
JST
) で整形した送信時刻を含めてフォーマットします。
def fetch_and_forward_new_messages():
- メインとなる関数。
- Chatworkから新しいメッセージを取得し、Slackに転送するという一連の処理を行います。
-
fetch_new_chatwork_messages
で新しいメッセージを取得 - 取得したメッセージがある場合は
format_message
でひとつづつフォーマットし、messages_combined
に結合 -
forward_to_slack
でSlackに転送
-
def wait_until_start_hour(start_hour: int):
- 指定された開始時刻まで待機。
- 指定された開始時刻までの時間を計算し、その時間だけスリープします。
- 指定された時刻を過ぎている場合、翌日の同時刻まで待機します。
.env
環境変数ファイル CHATWORK_API_TOKEN=(your chatwork api token)
CHATWORK_ROOM_ID=(your chatwork room id)
SLACK_WEBHOOK_URL=(your slack webhook url)
EXECUTION_INTERVAL=15
START_HOUR=9
END_HOUR=18
シークレット変数を含めた環境変数を記述しています。.env
ファイルをプロジェクトルートに配置し、読み込むようにします。
-
CHATWORK_API_TOKEN
: ChatworkのAPIトークン -
CHATWORK_ROOM_ID
: ChatworkのチャットルームID -
SLACK_WEBHOOK_URL
: SlackのWebhook URL -
EXECUTION_INTERVAL_MIN
: 実行間隔 (分) -
START_HOUR
: 開始時刻 (24H) -
END_HOUR
: 終了時刻 (24H)
デフォルトでは、実行間隔は15分、開始時刻は9時、終了時刻は18時としています。開始時刻・終了時刻はそれぞれ弊社の営業開始時刻・営業終了時刻に合わせています。
entrypoint.sh
起動スクリプト nohup poetry run python app.py &
nohup
を使用しバックグラウンドでアプリを起動しています。
このままだと app.py
内部に記述したログが nohup.out
にどんどん出力されます。放置しすぎるとログファイルが肥大化してしまうので、適宜ログファイルの内容を削除するか、ログローテーションを設定しておくと良いでしょう。
また、サーバの再起動時にもアプリが自動で起動するように別途設定しておくと便利です。
各種変数の取得
以上のアプリを実際に動かすために各種変数を取得します。
CHATWORK_API_TOKEN
)
1. Chatwork APIトークンの取得 (-
Chatworkにログイン
Chatworkのウェブサイトにログイン -
API設定ページにアクセス
画面右上の「設定」メニュー → API設定 -
パーソナルアクセストークンを発行
- パーソナルアクセストークンの発行
- 必要に応じて説明を入力し発行
-
トークンをコピー
発行されたトークンをコピーして、.env
ファイルのCHATWORK_API_TOKEN
に設定
CHATWORK_ROOM_ID
)
2. ROOM IDの取得 (-
Chatworkの対象ルームにアクセス
通知を取得したいルームを開く。 -
ルームIDを確認
-
URLを確認し、最後の数字列をROOMIDとして取得。
-
たとえば
https://www.chatwork.com/#!rid123456789
の場合、
123456789
がROOM ID。
-
-
ROOM IDを保存
このIDをコピーし、.env
ファイルのCHATWORK_ROOM_ID
に設定
SLACK_WEBHOOK_URL
)
3. Slackアプリの設定 (個人的に手間取ったのがこの部分でした。Slackアプリを介した通知は以下のような流れで行われます。
SlackのWebhook URLを取得するためには、Slackアプリを作成し、そのアプリにWebhook URLを設定、そしてチャンネルにメッセージを送信する権限をアプリに付与するという手順が必要です。
アプリを新規作成
Slack API にアクセスし「ビルド」 → 「Your apps」 → 「Create New App」と進む
-
ビルド → Your apps → Create New App → From scratch
From scratchを選択 -
各種設定
社内Slackを選択
2. アプリにチャンネルへの投稿権限を付与
「OAuth & Permissions」 → 「Scopes」 → 「Scopes」 → 「chat:write」を選択
-
OAuth & Permissions → chat:write
今回はチャンネルに通知を投稿するためにchat:writeを選択。
DMを送信する場合にはim:writeを選択する。
3. Incoming Webhook URL を作成
「Incoming Webhooks」 → 「Activate Incoming Webhooks」 → 投稿先のチャンネルを指定 → 「reinstall your app」を選択して再インストール
-
Activate Incoming Webhooks → 投稿先として通知したいチャンネルを選択
-
reinstall your app と表示されるので指示に従って再インストール
-
Webhook URL が生成されるので、コピーして
.env
ファイルのSLACK_WEBHOOK_URL
に設定
ここで表示されるWebhook URLは流出しないように注意
4. アプリの起動
設定が完了し、環境変数ファイル .env
に必要な情報を設定したら、アプリをサーバに配置して起動します。
cd /path/to/app/chatwork-to-slack
./entrypoint.sh
これでChatworkの新しいメッセージがSlackに通知されるようになります。アプリを起動したままにしておけば、指定された間隔で新しいメッセージを取得し、Slackに通知するようになります。
実行中のログは nohup.out
に出力されるので、必要に応じて確認します。
cat nohup.out
> 2024-12-09 04:37:58,857 - __main__ - INFO - Sleeping for 15 minutes.
> 2024-12-09 04:52:59,486 - __main__ - INFO - Sleeping for 15 minutes.
> 2024-12-09 05:08:00,024 - __main__ - INFO - Sleeping for 15 minutes.
ここでは15分間隔でメッセージを取得するよう設定しました (EXECUTION_INTERVAL_MIN=15
)。そうすると15分おきにログが1行づつたまっていきます。定期的にこのログを確認し、アプリがエラーを起こしていないかを確認しましょう。
5. アプリの停止
プロセスIDを取得して kill
します。
今回はアプリのルートディレクトリ名を chatwork-to-slack
としたので、この名前でプロセスを探せばでてきます。
ps aux | grep chatwork-to-slack
USERNAME 1234567 0.0 0.0 12345 12345 XXX/XXX S XX:XX XX:XX /path/to/app/chatwork-to-slack-ABCDEFG-py3.13/bin/python app.py
# pidは1234567
kill 1234567
まとめ
Chatworkの新しいメッセージをSlackに通知する機能を実装しました。定期的にChatwork APIを監視し、新しいメッセージがあればSlackに通知するように設定しました。
今回の実装を通じて、情報の集約に向けた取り組みを進める一助となれば幸いです。
今後の課題
今回実装した例で実際に動かすと、以下のようなメッセージがSlackに通知されます。
New message from [id:1234567] at 2024-12-05 12:00:00
==========
お世話になっております。
添付のエラーにつきまして調査いただけますでしょうか?
急ぎで申し訳ございませんが、yy日までにご対応いただけますと幸いです。
どうぞよろしくお願いします。
[info][title][dtext:file_uploaded][/title][download:000000000]20241205-error.log (XX.XX KB)[/download][/info]
==========
この転送フォーマットはまだまだ改良の余地があります。一例を挙げると、
- 添付ファイルがある場合、メッセージが非常に長くなる
→ 添付ファイル部分を検出してカットし、ファイル名のみが表示されるようにするなど - 誰が送信したかわからない
→ ユーザー名とユーザーIDを紐づけたデータベースを作っておき、適宜参照するなど - 該当のメッセージがどのチャットルームから来たのかわからず、アクセスに時間がかかってしまう
→ メッセージに該当するルームのURLを追加する
今後はこの課題に取り組んでいきたいと思います。
商標およびサービスマークに関する表記: Slackは米国およびその他の国で登録されたSlack Technologies, Inc.の商標およびサービスマークです。
Discussion