🔲

ESP32 - MicroPythonで家電を制御しよう-その3(LED, Display)

2021/08/07に公開

ESP32を使って、IFTTTサービスにリクエストを送り、家電を制御します。その仕組みとコードの部分について説明します。
全体像は、その1(コンセプト、スイッチ)を参照してください。
本記事は、LED(LED管理スレッド)/文字表示(ディスプレイ管理スレッド)についての説明です。

関連記事

LED管理スレッドの作成

LED管理スレッドの作成と動作確認を行います。

開発環境の構築

開発環境の構築は、下記の記事を参考にしてください。

https://zenn.dev/kotaproj/articles/d969fb39100da443f41f
https://qiita.com/kotaproj/items/b53006aef9d04053a5ee

パーツ一覧

機材名 数量 備考
ESP32評価ボード 1 ESP32-WROVER 開発ボード/ESP32-WROOM 開発ボードのどちらでも可
LED 3 3つで説明
抵抗 3 LEDのデータシートに合わせ使用すること(200Ω ~ 680Ωで確認)

回路図

コード(LED管理スレッド)

led.pyとutil.pyの2つのファイルで構成されています。

led.py
import _thread
import time
from machine import Pin

from util import *

LED_DEFs = {
    "red": (27, ),
    "green": (26, ),
    "blue": (25, ),
}

ledctl_inited = False


class LedCtl:
    _instance = None

    def __init__(self):
        print("LedCtl - init")
        global ledctl_inited
        if False == ledctl_inited:
            self._leds = {}
            for key, val in LED_DEFs.items():
                self._leds[key] = {"pin":Pin(val[0], Pin.OUT), "blink":False, "cont":False, "count":0}
            ledctl_inited = True
        return

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)

        return cls._instance

    def off_all(self):
        for key in self._leds:
            self._leds[key]["blink"] = False
            self._leds[key]["pin"].value(False)
        return

    def on_all(self):
        for key in self._leds:
            self._leds[key]["blink"] = False
            self._leds[key]["pin"].value(True)
        return

    def blink_led(self, led_name, blink=True, cont=False, count=10):
        if led_name in self._leds:
            self._leds[led_name]["blink"] = blink
            self._leds[led_name]["pin"].value(False)
            self._leds[led_name]["cont"] = cont
            self._leds[led_name]["count"] = count
        elif "all" == led_name:
            for key in self._leds:
                self._leds[key]["blink"] = blink
                self._leds[key]["pin"].value(False)
                self._leds[key]["cont"] = cont
                self._leds[key]["count"] = count
        return

    def countdown_blink(self, blk_sts):
        for key in self._leds:
            if self._leds[key]["blink"] and (self._leds[key]["cont"] or self._leds[key]["count"] > 0):
                self._leds[key]["pin"].value(blk_sts)
                self._leds[key]["count"] -= 1
                if self._leds[key]["count"] == 0:
                    self._leds[key]["pin"].value(False)
        return



class LedProc():
    
    def __init__(self, lock=None, snd_que=None, rcv_que=None):
        print("led:init")
        self._led_ctl = LedCtl()
        self._lock = lock
        self._snd_que = snd_que
        self._rcv_que = rcv_que
        return

    def _proc_poll(self):
        print("_proc_poll - run")


        def act_led(msg):
            print("act_led_led:run")
            d = conv_msg2dict(msg)
            print(d)
            if "off_all" in d['type']:
                self._led_ctl.off_all()
            elif "on_all" in d['type']:
                self._led_ctl.on_all()
            elif "blink" in d['type']:
                # name, blink, cont, count
                name = 'all'
                blink = True
                cont = False
                count = 10
                if 'name' in d:
                    name = d['name']
                if 'blink' in d:
                    blink = str2bool(d['blink'])
                if 'cont' in d:
                    cont = str2bool(d['cont'])
                if 'count' in d:
                    count = int(d['count'])
                self._led_ctl.blink_led(name, blink, cont, count)
            print('led:_act_cmd - over')
            return

        blink_status = False
        while True:
            time.sleep_ms(200)
            msg = recv_que(self._lock, self._rcv_que)
            if msg is None:
                # print("IndexError")
                # LED Blink
                blink_status = False if blink_status else True
                self._led_ctl.countdown_blink(blink_status)
                continue

            print("proc_led:msg - ", msg)
            act_led(msg)
        return


    def run(self):
        _thread.start_new_thread(self._proc_poll, ())



def main():
    lock = _thread.allocate_lock()

    que_pre2led = []
    led_proc = LedProc(lock, snd_que=None, rcv_que=que_pre2led)
    led_proc.run()
    time.sleep_ms(1000)
    send_que(lock, que_pre2led, ("dst:led,src:pre,cmd:led,type:on_all"))
    time.sleep_ms(1000)
    send_que(lock, que_pre2led, ("dst:led,src:pre,cmd:led,type:off_all"))
    time.sleep_ms(1000)
    send_que(lock, que_pre2led, ("dst:led,src:pre,cmd:led,type:blink"))
    time.sleep_ms(10000)


if __name__ == "__main__":
    main()
util.py
# 通知とIFTTT-Webhookイベントの紐づけ
#  - cmd:sw,type:no1,how:released" => sw1をイベントIDとしてIFTTTにリクエスト
KEY_TO_EVENTID = {
    "no1" : {
        "pressed" : None,
        "released" : "sw1",
        "long" : None,
    },
}

def conv_msg2dict(msg):
    # input : "a:b,c:d"
    # output: {"a":"b","c":"d"}
    d = {}
    for item in msg.split(','):
        key, val = item.split(':')
        d[key] = val
    return d

def conv_typ2eventid(typ, how):
    if typ in KEY_TO_EVENTID:
        return KEY_TO_EVENTID[typ][how]
    return None

def send_que(lock, que, value):
    if que is None:
        print("Error:que is None.")
        return
    # lock
    lock_p = lock.acquire(1, -1) #wait forever
    if not lock_p:
        print("Error:task can not get the lock.")
    else:
        que.append(value)
        lock.release()
    # unlock
    return

def recv_que(lock, que):
    if que is None:
        print("Error:que is None.")
        return

    val = None

    # lock
    lock_p = lock.acquire(1, -1) #wait forever
    if not lock_p:
        print("Error:task can not get the lock.")
    else:
        if len(que) > 0:
            val = que[0]
            del que[0]
        lock.release()
    # unlock
    return val

実行方法と結果

>>> import led
>>> led.main()
led:init
LedCtl - init
_proc_poll - run
proc_led:msg -  dst:led,src:pre,cmd:led,type:on_all
act_led_led:run
{'cmd': 'led', 'type': 'on_all', 'dst': 'led', 'src': 'pre'}    <=全点灯
led:_act_cmd - over
proc_led:msg -  dst:led,src:pre,cmd:led,type:off_all
act_led_led:run
{'cmd': 'led', 'type': 'off_all', 'dst': 'led', 'src': 'pre'}    <=全消灯
led:_act_cmd - over
proc_led:msg -  dst:led,src:pre,cmd:led,type:blink
act_led_led:run
{'cmd': 'led', 'type': 'blink', 'dst': 'led', 'src': 'pre'}    <=全点滅
led:_act_cmd - over
>>> 

ポイント

LEDの制御

  • LEDの制御は、GPIOをアウトプットを指定します
    • self._leds[key]["pin"].value()
      • False : 消灯
      • True : 点灯

LED制御の通知イベント

  • LED制御は3つの指定ができる
    • on_all : 全点灯, off_all:全消灯, blink:点滅

ディスプレイ管理スレッドの作成

ディスプレイ管理スレッドの作成と動作確認を行います。

パーツ一覧

機材名 数量 備考
ESP32評価ボード 1 ESP32-WROVER 開発ボード/ESP32-WROOM 開発ボードのどちらでも可
128x64ドット - SSD1306有機ELディスプレイ 1 I2Cで接続

回路図

事前準備

ディスプレイのドライバ取得

ディスプレイ(SSD1306有機ELディスプレイ)のドライバをmicropythonのレポジトリから取得します。
必要なファイルは、下記のssd1306.pyとなります

https://github.com/micropython/micropython/blob/master/drivers/display/ssd1306.py

コード(ディスプレイ管理スレッド)

dsp.py、ssd1306.py、util.pyの3つのファイルで構成されています。

dsp.py
import _thread
import machine
import time

import ssd1306

from util import *

# Defines
PIN_SCL = 22
PIN_SDA = 21

TIMER_ID_DSP = 0
oledctl_inited = False


class OledCtl:
    _instance = None

    def __init__(self):
        print("OledCtl - init")
        global oledctl_inited
        if False == oledctl_inited:
            i2c = machine.I2C(scl=machine.Pin(PIN_SCL), sda=machine.Pin(PIN_SDA))
            self._oled = ssd1306.SSD1306_I2C(128, 64, i2c)
            oledctl_inited = True
        return

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)

        return cls._instance


    def dsp_clear(self):
        self._oled.fill(0)
        self._oled.show()
        return

    def dsp_power(self, mode):
        print("dsp:_dsp_power - run")
        if mode == "client":
            self._oled.fill(0)
            self._oled.text('Booting...', 0, 0)
            self._oled.text('Client Mode', 0, 10)
            self._oled.show()
        elif mode == "done":
            print("dsp:dsp_pwr_done - run")
            self._oled.fill(0)
            self._oled.show()
        else:
            print("dsp:dsp_pwr - error:", mode)
        return


    def dsp_ifttt(self, evt_id, sts):
        print("dsp:dsp_ifttt - run")
        self._oled.fill(0)
        self._oled.text('<IFTTT Request>', 0, 0)
        self._oled.text(('->' +  evt_id), 0, 10)
        self._oled.text(sts, 0, 20)
        self._oled.text('[MicroPython]', 0, 30)
        self._oled.show()
        return


class DspProc():
    
    def __init__(self, lock=None, snd_que=None, rcv_que=None):

        # init
        print("dsp:__init__ - run")
        self._oled_ctl = OledCtl()
        self._tmr = machine.Timer(TIMER_ID_DSP)
        self._lock = lock
        self._snd_que = snd_que
        self._rcv_que = rcv_que
        self._clear_flg = False
        return

    def _proc_dsp(self):
        print("dsp:_proc_dsp - run")

        def start_timer(type, peri):
            print("dsp:start_timer - run", peri)
            if type == "clear":
                self._tmr.init(period=int(peri), mode=machine.Timer.ONE_SHOT, callback=set_event_clear)
            return

        def stop_timer():
            print("dsp:stop_timer - run")
            self._tmr.deinit()
            self._clear_flg = False
            return

        def set_event_clear(t):
            print("dsp:_set_event_clear - run")
            self._clear_flg = True
            return

        def act_dsp(msg):
            print('dsp:act_dsp - run')
            d = conv_msg2dict(msg)
            if False == ('cmd' in d) or False == ('type' in d):
                return False

            # {cmd:dsp,type:ifttt,how:"evt_id",tmr:3000}
            # {cmd:dsp,type:power,how:server,tmr:3000}
            # {cmd:dsp,type:power,how:client,tmr:3000}
            # {cmd:dsp,type:power,how:done,tmr:3000}
            # {cmd:dsp,type:clear,how: ,tmr:0}
            typ = d['type'] if ('type' in d) else "dummy"
            how = d['how'] if ('how' in d) else "dummy"
            tm_count = int(d['tmr']) if ('tmr' in d) else -1

            if "power" == typ:
                self._oled_ctl.dsp_power(how)
            elif "clear" == typ:
                self._oled_ctl.dsp_clear()
            elif "ifttt" == typ:
                sts = d['sts'] if ('sts' in d) else "dummy"
                self._oled_ctl.dsp_ifttt(how, sts)
            else:
                return False

            # timer
            if tm_count > 0:
                start_timer(type="clear", peri=tm_count)


            print('dsp:act_dsp - over')
            return True

        while True:
            # recvive_que
            msg = recv_que(self._lock, self._rcv_que)
            if msg is None:
                # print("IndexError")
                time.sleep_ms(50)
                # clear
                if self._clear_flg:
                    self._oled_ctl.dsp_clear()
                    self._clear_flg = False
                continue

            # stop : timer
            stop_timer()
            print("proc_dsp:msg - ", msg)
            act_dsp(msg)

        return


    def run(self):
        _thread.start_new_thread(self._proc_dsp, ())



def main():

    lock = _thread.allocate_lock()

    que_dsp2pre = []
    que_pre2dsp = []
    dsp_proc = DspProc(lock, snd_que=que_dsp2pre, rcv_que=que_pre2dsp)
    dsp_proc.run()
    
    send_que(lock, que_pre2dsp, ("dst:dsp,src:pre,cmd:dsp,type:power,how:server,tmr:3000"))
    time.sleep_ms(1_000)
    send_que(lock, que_pre2dsp, ("dst:dsp,src:pre,cmd:dsp,type:ifttt,how:evt_id,sts:sending...,tmr:3000"))
    time.sleep_ms(1_000)
    send_que(lock, que_pre2dsp, ("dst:dsp,src:pre,cmd:dsp,type:ifttt,how:evt_id0,sts:sended.,tmr:3000"))
    time.sleep_ms(10_000)
    return

if __name__ == "__main__":
    main()

実行方法と結果

>>> dsp.main()
dsp:__init__ - run
OledCtl - init
Warning: I2C(-1, ...) is deprecated, use SoftI2C(...) instead
dsp:_proc_dsp - run
dsp:stop_timer - run
proc_dsp:msg -  dst:dsp,src:pre,cmd:dsp,type:power,how:server,tmr:3000
dsp:act_dsp - run
dsp:_dsp_power - run
dsp:dsp_pwr - error: server
dsp:start_timer - run 3000
dsp:act_dsp - over
dsp:_set_event_clear - run
dsp:stop_timer - run
proc_dsp:msg -  dst:dsp,src:pre,cmd:dsp,type:ifttt,how:evt_id,sts:sending...,tmr:3000
dsp:act_dsp - run
dsp:dsp_ifttt - run
dsp:start_timer - run 3000
dsp:act_dsp - over
dsp:stop_timer - run
proc_dsp:msg -  dst:dsp,src:pre,cmd:dsp,type:ifttt,how:evt_id0,sts:sended.,tmr:3000
dsp:act_dsp - run
dsp:dsp_ifttt - run
dsp:start_timer - run 3000
dsp:act_dsp - over
dsp:_set_event_clear - run
>>> 

ポイント

ディスプレイの制御

  • ディスプレイは、I2C制御で行います
    • ESP32では、SCL:22 / SDA:21がI2C用のハードピン
  • ドライバから提供されるAPIは以下を使用している
    • self._oled.fill() : 塗りつぶし(バッファ上のみ)
    • self._oled.text() : 座標を指定し、文字列の描画(バッファ上のみ)
    • self._oled.show() : バッファ上のデータを液晶に適用

ディスプレイの表示クリア処理(タイマーの利用)

  • 表示クリアを自動的に実施するために、タイマーを利用している
    • 通知の際、tmr > 0の場合、start_timer()で作成している
      • self._tmr.init(period=int(peri), mode=machine.Timer.ONE_SHOT, callback=set_event_clear)
        • periで指定した時間が経過したら、1回だけset_event_clear()をコール
    • set_event_clear()では、clear用のフラグのみを操作
      • 本関数内でdsp_clear()をコールすると、タイミングによってタイマー処理とスレッド処理がぶつかるため

参考URL

  • MicroPython公式ドキュメント

https://micropython-docs-ja.readthedocs.io/ja/latest/

  • MicroPython - ESP32 用クイックリファレンス

https://micropython-docs-ja.readthedocs.io/ja/latest/esp32/quickref.html

  • MicroPython - クラス Timer -- ハードウェアタイマーの制御

https://micropython-docs-ja.readthedocs.io/ja/latest/library/machine.Timer.html

さいごに

次が最後の予定で、中継スレッドの制御を記載する予定です。

GitHubで編集を提案

Discussion