🚲

ディレクトリに保存されたファイルの内容をSlackで発言

2022/07/25に公開

はじめに

  • 事前に決めたディレクトリに保存した任意のテキストファイルの内容を対象とする
  • Slackの特定チャンネルにREST APIを利用して発言を実施する
  • 機能としては「Incoming Webhooks」を利用する
  • 依頼元プログラムは、ディレクトリに発言内容を記載したテキストファイルを配置する
  • ファイル名が重ならないようにファイル名にはランダム文字列を付与
  • 発言後に配置されたファイルは自動で削除
  • 失敗時は数回リトライをする

ファイルを経由する理由

ファイルを出力するプロセス側で直接SlackへのREST APIを実行しないのは次の理由からです。

  • 依頼元プログラムは可能な限りブロックをさせたくない。
  • 特にリクエスト送信失敗時のリトライ処理のようなことは実施しなくない。
  • 直接ネットワーク通信を行うのはライブラリがなく手間(古いC++等)
  • サーバを跨ぐ場合はNFSディレクトリで共有しておりファイルを出力側はファイルを配置するだけにしたい。

※CIFSの場合は特別な対応が必要

利用している要素

  • Slackへの発言は「Incoming Webhooks」を利用する。

https://slack.com/intl/ja-jp/help/articles/115005265063-Slack-での-Incoming-Webhook-の利用

  • Pythonを利用する。「slackweb」を利用する。

https://github.com/satoshi03/slack-python-webhook

  • 特定ディレクトリの変更を監視する。

https://github.com/gorakhargosh/watchdog

監視と発言プログラム

  • 第一引数に監視対象ディレクトリ
  • 第二引数に「Incoming Webhooks」のWebhook先URL
  • 監視対象ディレクトリ内の「*.txt」の変更を監視する
  • 送信後にファイルは削除する
  • ファイルの内容の取得と削除は監視スレッドで実施
  • Slackへの送信は別スレッドで実施
  • 1時間で自動的に停止する。こちらは起動方法に応じて要変更
from watchdog.events import PatternMatchingEventHandler
from watchdog.observers import Observer
import os
import time
import sys
import slackweb
from concurrent.futures import ThreadPoolExecutor

TARGET_DIR=sys.argv[1]
SLACK_URL=sys.argv[2]
THREAD_POOL=ThreadPoolExecutor(max_workers=1)

error_count = {
    "succeeded" :0,
    "failed":0,
}

def read_lines(src_path):
    lines = []
    f = open(src_path, "r",encoding='utf-8')
    for line in f:
        lines.append(line.strip())
    f.close()
    return lines

def send_Slack(lines):
    for i in range(0,3):
        try:
            slack = slackweb.Slack(url=SLACK_URL)
            res = slack.notify(text="\n".join(lines))
            if res != "ok":
                raise Exception("Failed to slack.notify.")

            error_count["succeeded"] += 1
            print("Succeeded.")
            return 
        except Exception as e:
            error_count["failed"] += 1
            print("Failed.","i=",i,"e=",e)  
            time.sleep(3)

def on_modified(event):
    try:
        lines = read_lines(event.src_path)
        THREAD_POOL.submit(send_Slack, lines)
        os.remove(event.src_path)
    except Exception as e:
        print("Failed.",e)

event_handler = PatternMatchingEventHandler(['*.txt'])
event_handler.on_modified = on_modified

observer = Observer()
observer.schedule(event_handler, TARGET_DIR, recursive=True)
observer.start()

try:
    for i in range(0,1 * 60 * 60):
        time.sleep(1)
except KeyboardInterrupt as e:
    pass

observer.stop()
observer.join()
THREAD_POOL.shutdown()

print(error_count)
if error_count["failed"] > 0:
    exit(100)
else:
    exit(0)

問題点

  • リトライ中にプロセスが停止するとそのメッセージは消失する。
  • 送信失敗は依頼元には連絡されない。
  • 失敗が詰まると遅延する。

その他

このPython自体の常時稼働ですが、1時間で自動停止を前提に、Jenkinsで定期的に起動しています。
常駐するサービスプログラムはそれだけで手間と手順があるので1時間で自動で停止するようにして1時間毎にJenkinsで起動し直しています。

また送信プログラム自体の失敗の監視をJenkinsに実施させています。
そのため、Jenkinsのジョブはexit(0)以外の場合はメール送信するように設定します。

用途としては少しずれている自覚はありますがJenkinsを常駐プログラムの親サービスとして稀に使います。
ビルド後の処理で、失敗時に別のジョブが常駐ジョブを再キックするように組んでおけば、停止時のリスタートも自動で可能です。
1分に一回起動で同時起動不可のようなジョブでも簡易的には可能

JenkinsをMaster/Slave構成で組んでおけば冗長稼働も可能です。

Discussion