🦁

pythonで使う汎用タイマを作ってみた

10 min read

あるツールを作成している時、一定時間、毎時特定分、日時指定タイマが必要だったので汎用タイマを探してみたのだが無かった。pythonは豊富なモジュールが提供されているので、自分が見つけられなかっただけかもしれない。まあそれなら自分で作った方が手っ取り早いかということで、適当に作ってみた。
 以下、そのソース。
 汎用タイマが必要だけど作るのめんどくさいなあという方はどうぞ使ってみてください。適当に作ったので試験もそこそこしかしていないが、ツール稼働後数ヶ月、特に問題なく動いているので、まあそこそこちゃんと動くと思う。使い方は後ろの方にある__main__を見てほしい。

# mytimer.py
# 2021.7.14 : Ikeda 
# 汎用Timer

import os, datetime, time, sys
import signal

# ---------------------------------------------------------------------------
# Timer
# ---------------------------------------------------------------------------
TIMER_BASEINTERVAL = 1  # 1秒ごとにタイマハンドラが起動する

TIMER_KIND_GENERAL = 0x01   # 通常タイマ(n秒後に起動)
                            # option1: second
TIMER_KIND_SIGNAL = 0x02    # アラームタイマ(毎時n分00秒に起動)
                            # option1: minute(0~59)
TIMER_KIND_TIME = 0x04      # 時間指定タイマ(hh:mm:00に起動)
                            # option1: 'hh:mm'(hh:0~23, mm:0~59)
TIMER_KIND_DATETIME = 0x08  # 日時指定タイマ(yyyy:mm:dd hh:mm:00に起動)
                            # option1: 'yyyy-mm-ddThh:mm:ss'(mm:1~12, dd, 1~31, hh:0~23, mm:0~59, ss:無視される)
                            # 現状エラー処理がないので、意味のある日付を入力すること
                            # 周期起動設定は無効
TIMER_KIND_PERIOD = 0x10    # タイマ属性:周期起動

class MyTimer:
    def __init__(self):
        self.initializeTimer()

        return

    # 汎用Timer処理の初期化
    def initializeTimer(self):
        self.wholeTimer = []        # 全てのTimerをまとめたもの
        self.ctrState = True        # Timer全体の制御状態
                                    # True: 動作(default)、False: 停止
        self.isHandlerOn = False    # Handler起動フラグ
                                    # Handler内の処理によってはHnadlerが二重起動するので、その対策
                                    # 本来は、時間のかかる処理はHandler外に出すべきだろうが、次Versionにて対応
        self.conflictCounter = 0    # Handler二重起動カウンタ
                                    # 捨てたHandler分の補正をする為だが、次Versionで対応

        # 非同期イベントhandlerを設定、起動
        signal.signal(signal.SIGALRM, self.tmHandler)
        signal.setitimer(signal.ITIMER_REAL, TIMER_BASEINTERVAL, TIMER_BASEINTERVAL)

        # 0秒補正カウンタを現在時刻の秒で初期化する
        self.zeroCorrectionCounter = int(self.getCurrentTime()[17:19])

        return

    # 呼び出し元の関数を実行するHandler
    def handler(self, func, *args):
        return func(*args)

    def doCallBackFunction(self, eachTimer, option):
        if "callBackFunction" in eachTimer:
            call = eachTimer["callBackFunction"]
            self.handler(call, option)

        return

    # Timer Handler、タイマーのメイン処理部
    def tmHandler(self, arg1, args2):
        if self.isHandlerOn:
            # Handler二重起動!
            # 何もせずにそのまま抜ける
            self.conflictCounter += 1
#            print("# Handler conflict!!!:", self.conflictCounter)
            return
        else:
           self. isHandlerOn = True

        # 現在時刻を取得する
        # exp: 2021-07-14T09:00:00
        #      01234567890123456789
        currentTime = self.getCurrentTime()
        currentDate = currentTime[0:10]
        currentHourMinute = currentTime[11:16]
        currentMinute = int(currentTime[14:16])
        currentSecond = int(currentTime[17:19])

        # 0秒補正カウンタを1進める
        self.zeroCorrectionCounter += 1
        # 0秒補正カウンタでHandlerの取りこぼしを確認する
        shitHappen = False
        if self.zeroCorrectionCounter == 60:
            if currentSecond == 0:
                # 一致しているので、取りこぼし無し
                shitHappen = False
                self.zeroCorrectionCounter = 0   # 初期化
            else:
                # 一致していないということは、取りこぼしがあった
                shitHappen = True
                print("shit happen!!! current:", currentSecond, "counter:", self.zeroCorrectionCounter)
                # 0秒補正カウンタを初期化する
                self.zeroCorrectionCounter = currentSecond

        # 全てのタイマについて、カウントダウン、T.O.判定等をおこなう
        # タイムアウト処理中にタイマ管理辞書(wholeTimer)を更新することがあるので、
        # タイムアウト判定処理に影響がないようコピーしたものを使用する
        copiedWholeTimer = self.wholeTimer.copy()
        for eachTimer in copiedWholeTimer:
            if eachTimer['kind'] & TIMER_KIND_GENERAL:
                # 通常タイマ
                eachTimer['lapse'] -= 1
                if eachTimer['lapse'] == 0:
                    # タイムアウト
                    self.timeoutGeneralTimer(eachTimer)

            if currentSecond == 0 or shitHappen == True:
                # 0秒になった、あるいはHandler取りこぼしがあったので、アラーム系の判定をする
                if eachTimer['kind'] & TIMER_KIND_SIGNAL:
                    # アラームタイマ(毎時n分00秒に起動)
                    if eachTimer['minute'] == currentMinute:
                        # 指定した分になった
                        self.timeoutSignalTimer(eachTimer)
                if eachTimer['kind'] & TIMER_KIND_TIME:
                    # 時間指定タイマ(hhmm00に起動)
                    if eachTimer['time'] == currentHourMinute:
                        # 指定した時間になった
                        self.timeoutTimeTimer(eachTimer)
                # 0秒補正カウンタを初期化する
                self.zeroCorrectionCounter = currentSecond

            if currentSecond % 5 == 0 or shitHappen == True:
                if eachTimer['kind'] & TIMER_KIND_DATETIME:
                    # 日時指定タイマ(yyyymmdd hhmm00に起動)
                    setTime = datetime.datetime.strptime(eachTimer['datetime'], '%Y-%m-%dT%H:%M:%S')
                    if setTime <= self.now:
                        # 指定した時間を超えた
                        self.timeoutDatetimeTimer(eachTimer)
        
        self. isHandlerOn = False

        return

    def timeoutGeneralTimer(self, eachTimer):
        id = eachTimer['id']

        if eachTimer['kind'] & TIMER_KIND_PERIOD:
            # 周期起動の設定の場合は、再度設定
            eachTimer['lapse'] = eachTimer['value']
        else:
            # 周期起動ではない場合は、タイマを削除
            self.deleteTimer(eachTimer)

        self.doCallBackFunction(eachTimer, id)   # コールバック関数を実行

        return

    def getCurrentTime(self):
        self.now = datetime.datetime.now()
        currentTime = self.now.strftime("%Y-%m-%dT%H:%M:%S")

        return currentTime

    def timeoutSignalTimer(self, eachTimer):
        id = eachTimer['id']

        if eachTimer['kind'] & TIMER_KIND_PERIOD == 0:
            # 周期起動ではない場合は、タイマを削除
            self.deleteTimer(eachTimer)

        self.doCallBackFunction(eachTimer, id)   # コールバック関数を実行

        return

    def timeoutTimeTimer(self, eachTimer):
        id = eachTimer['id']

        if eachTimer['kind'] & TIMER_KIND_PERIOD == 0:
            # 周期起動ではない場合は、タイマを削除
            self.deleteTimer(eachTimer)
        
        self.doCallBackFunction(eachTimer, id)   # コールバック関数を実行

        return

    def timeoutDatetimeTimer(self, eachTimer):
        id = eachTimer['id']

        # 周期起動ではない場合は、タイマを削除
        self.deleteTimer(eachTimer)
        
        self.doCallBackFunction(eachTimer, id) # コールバック関数を実行

        return

    # Timerを登録する
    def createTimer(self, id, kind, option1, option2):
        for eachTimer in self.wholeTimer:
            if eachTimer['id'] == id:
                # 同名称のタイマが既に登録されている場合は上書きする
                # ここでは単純に削除して、この後の処理で再度登録する
                index = self.wholeTimer.index(eachTimer)
                self.wholeTimer.pop(index)

        timerParam = dict()
        timerParam['id'] = id
        timerParam['kind'] = kind
        if kind & TIMER_KIND_GENERAL:
            timerParam['value'] = option1
            timerParam['lapse'] = option1
        if kind & TIMER_KIND_SIGNAL:
            if option1 < 0 or option1 > 59:
                return False
            timerParam['minute'] = option1
        if kind & TIMER_KIND_TIME:
            hh = int(option1[0:2])
            mm = int(option1[3:5])
            if hh < 0 or hh > 23:
                return False
            if mm < 0 or mm > 59:
                return False
            timerParam['time'] = option1
        if kind & TIMER_KIND_DATETIME:
            option1 = option1[0:-2] + "00"
            timerParam['datetime'] = option1
        if option2 != None:
            timerParam['callBackFunction'] = option2

        self.wholeTimer.append(timerParam)
        #print(self.wholeTimer)

        return True

    def deleteTimer(self, timerOutOfUse):
        # 'id', 'kind'が一致したタイマを削除する
        # 同じ'id'で'kind'の違うタイマが前後して設定されることもあるので注意する
        index = 0
        for eachTimer in self.wholeTimer:
            if eachTimer['id'] == timerOutOfUse["id"]:
                if eachTimer['kind'] == timerOutOfUse["kind"]:
                    self.wholeTimer.pop(index)
            index += 1

        return


    def showAllTimer(self):
        print(self.wholeTimer)

        return

# ---------------------------------------------------------------------------
# for TEST
# ---------------------------------------------------------------------------
def mainprocess():
    i = 0
    while True:
        i += 1
        if i == 10:
            break
        print("main process.")
        time.sleep(2)

    return

def retFunc(arg):
    print("T.O. #1")
    tm.showAllTimer()
    return
def retFunc2(arg):
    print("T.O. #2")
    return
def retFunc3(arg):
    print("T.O. #3")
    return
def retFunc4(arg):
    print("T.O. #4")
    return


if __name__ == "__main__":
    global tm
    tm = MyTimer()
    tm.createTimer('short-tm1', TIMER_KIND_GENERAL | TIMER_KIND_PERIOD, 3, retFunc)

    result = tm.createTimer('signal-1', TIMER_KIND_SIGNAL | TIMER_KIND_PERIOD, 34, None)
    result = tm.createTimer('time-1', TIMER_KIND_TIME | TIMER_KIND_PERIOD, '16:34', None)
    result = tm.createTimer('datetime-1', TIMER_KIND_DATETIME | TIMER_KIND_PERIOD, '2021-07-14T16:37:12', retFunc2)
    tm.createTimer('bjv8_N7dQ2A', TIMER_KIND_DATETIME, '2021-07-16T02:58:28', retFunc4)

    #mainprocess()
    time.sleep(120)

    print("owari")

本来はgitで公開するのだろうが、めんどくさいのでソース貼り付け。

Discussion

ログインするとコメントできます