🔲
ESP32 - MicroPythonで家電を制御しよう-その3(LED, Display)
ESP32を使って、IFTTTサービスにリクエストを送り、家電を制御します。その仕組みとコードの部分について説明します。
全体像は、その1(コンセプト、スイッチ)を参照してください。
本記事は、LED(LED管理スレッド)/文字表示(ディスプレイ管理スレッド)についての説明です。
関連記事
LED管理スレッドの作成
LED管理スレッドの作成と動作確認を行います。
開発環境の構築
開発環境の構築は、下記の記事を参考にしてください。
パーツ一覧
機材名 | 数量 | 備考 |
---|---|---|
ESP32評価ボード | 1 | ESP32-WROVER 開発ボード/ESP32-WROOM 開発ボードのどちらでも可 |
LED | 3 | 3つで説明 |
抵抗 | 3 | LEDのデータシートに合わせ使用すること(200Ω ~ 680Ωで確認) |
回路図
コード(LED管理スレッド)
led.pyとutil.pyの2つのファイルで構成されています。
led.py
import _thread
import time
from machine import Pin
from util import *
LED_DEFs = {
"red": (27, ),
"green": (26, ),
"blue": (25, ),
}
ledctl_inited = False
class LedCtl:
_instance = None
def __init__(self):
print("LedCtl - init")
global ledctl_inited
if False == ledctl_inited:
self._leds = {}
for key, val in LED_DEFs.items():
self._leds[key] = {"pin":Pin(val[0], Pin.OUT), "blink":False, "cont":False, "count":0}
ledctl_inited = True
return
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def off_all(self):
for key in self._leds:
self._leds[key]["blink"] = False
self._leds[key]["pin"].value(False)
return
def on_all(self):
for key in self._leds:
self._leds[key]["blink"] = False
self._leds[key]["pin"].value(True)
return
def blink_led(self, led_name, blink=True, cont=False, count=10):
if led_name in self._leds:
self._leds[led_name]["blink"] = blink
self._leds[led_name]["pin"].value(False)
self._leds[led_name]["cont"] = cont
self._leds[led_name]["count"] = count
elif "all" == led_name:
for key in self._leds:
self._leds[key]["blink"] = blink
self._leds[key]["pin"].value(False)
self._leds[key]["cont"] = cont
self._leds[key]["count"] = count
return
def countdown_blink(self, blk_sts):
for key in self._leds:
if self._leds[key]["blink"] and (self._leds[key]["cont"] or self._leds[key]["count"] > 0):
self._leds[key]["pin"].value(blk_sts)
self._leds[key]["count"] -= 1
if self._leds[key]["count"] == 0:
self._leds[key]["pin"].value(False)
return
class LedProc():
def __init__(self, lock=None, snd_que=None, rcv_que=None):
print("led:init")
self._led_ctl = LedCtl()
self._lock = lock
self._snd_que = snd_que
self._rcv_que = rcv_que
return
def _proc_poll(self):
print("_proc_poll - run")
def act_led(msg):
print("act_led_led:run")
d = conv_msg2dict(msg)
print(d)
if "off_all" in d['type']:
self._led_ctl.off_all()
elif "on_all" in d['type']:
self._led_ctl.on_all()
elif "blink" in d['type']:
# name, blink, cont, count
name = 'all'
blink = True
cont = False
count = 10
if 'name' in d:
name = d['name']
if 'blink' in d:
blink = str2bool(d['blink'])
if 'cont' in d:
cont = str2bool(d['cont'])
if 'count' in d:
count = int(d['count'])
self._led_ctl.blink_led(name, blink, cont, count)
print('led:_act_cmd - over')
return
blink_status = False
while True:
time.sleep_ms(200)
msg = recv_que(self._lock, self._rcv_que)
if msg is None:
# print("IndexError")
# LED Blink
blink_status = False if blink_status else True
self._led_ctl.countdown_blink(blink_status)
continue
print("proc_led:msg - ", msg)
act_led(msg)
return
def run(self):
_thread.start_new_thread(self._proc_poll, ())
def main():
lock = _thread.allocate_lock()
que_pre2led = []
led_proc = LedProc(lock, snd_que=None, rcv_que=que_pre2led)
led_proc.run()
time.sleep_ms(1000)
send_que(lock, que_pre2led, ("dst:led,src:pre,cmd:led,type:on_all"))
time.sleep_ms(1000)
send_que(lock, que_pre2led, ("dst:led,src:pre,cmd:led,type:off_all"))
time.sleep_ms(1000)
send_que(lock, que_pre2led, ("dst:led,src:pre,cmd:led,type:blink"))
time.sleep_ms(10000)
if __name__ == "__main__":
main()
util.py
# 通知とIFTTT-Webhookイベントの紐づけ
# - cmd:sw,type:no1,how:released" => sw1をイベントIDとしてIFTTTにリクエスト
KEY_TO_EVENTID = {
"no1" : {
"pressed" : None,
"released" : "sw1",
"long" : None,
},
}
def conv_msg2dict(msg):
# input : "a:b,c:d"
# output: {"a":"b","c":"d"}
d = {}
for item in msg.split(','):
key, val = item.split(':')
d[key] = val
return d
def conv_typ2eventid(typ, how):
if typ in KEY_TO_EVENTID:
return KEY_TO_EVENTID[typ][how]
return None
def send_que(lock, que, value):
if que is None:
print("Error:que is None.")
return
# lock
lock_p = lock.acquire(1, -1) #wait forever
if not lock_p:
print("Error:task can not get the lock.")
else:
que.append(value)
lock.release()
# unlock
return
def recv_que(lock, que):
if que is None:
print("Error:que is None.")
return
val = None
# lock
lock_p = lock.acquire(1, -1) #wait forever
if not lock_p:
print("Error:task can not get the lock.")
else:
if len(que) > 0:
val = que[0]
del que[0]
lock.release()
# unlock
return val
実行方法と結果
>>> import led
>>> led.main()
led:init
LedCtl - init
_proc_poll - run
proc_led:msg - dst:led,src:pre,cmd:led,type:on_all
act_led_led:run
{'cmd': 'led', 'type': 'on_all', 'dst': 'led', 'src': 'pre'} <=全点灯
led:_act_cmd - over
proc_led:msg - dst:led,src:pre,cmd:led,type:off_all
act_led_led:run
{'cmd': 'led', 'type': 'off_all', 'dst': 'led', 'src': 'pre'} <=全消灯
led:_act_cmd - over
proc_led:msg - dst:led,src:pre,cmd:led,type:blink
act_led_led:run
{'cmd': 'led', 'type': 'blink', 'dst': 'led', 'src': 'pre'} <=全点滅
led:_act_cmd - over
>>>
ポイント
LEDの制御
- LEDの制御は、GPIOをアウトプットを指定します
- self._leds[key]["pin"].value()
- False : 消灯
- True : 点灯
- self._leds[key]["pin"].value()
LED制御の通知イベント
- LED制御は3つの指定ができる
- on_all : 全点灯, off_all:全消灯, blink:点滅
ディスプレイ管理スレッドの作成
ディスプレイ管理スレッドの作成と動作確認を行います。
パーツ一覧
機材名 | 数量 | 備考 |
---|---|---|
ESP32評価ボード | 1 | ESP32-WROVER 開発ボード/ESP32-WROOM 開発ボードのどちらでも可 |
128x64ドット - SSD1306有機ELディスプレイ | 1 | I2Cで接続 |
回路図
事前準備
ディスプレイのドライバ取得
ディスプレイ(SSD1306有機ELディスプレイ)のドライバをmicropythonのレポジトリから取得します。
必要なファイルは、下記のssd1306.pyとなります
コード(ディスプレイ管理スレッド)
dsp.py、ssd1306.py、util.pyの3つのファイルで構成されています。
dsp.py
import _thread
import machine
import time
import ssd1306
from util import *
# Defines
PIN_SCL = 22
PIN_SDA = 21
TIMER_ID_DSP = 0
oledctl_inited = False
class OledCtl:
_instance = None
def __init__(self):
print("OledCtl - init")
global oledctl_inited
if False == oledctl_inited:
i2c = machine.I2C(scl=machine.Pin(PIN_SCL), sda=machine.Pin(PIN_SDA))
self._oled = ssd1306.SSD1306_I2C(128, 64, i2c)
oledctl_inited = True
return
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def dsp_clear(self):
self._oled.fill(0)
self._oled.show()
return
def dsp_power(self, mode):
print("dsp:_dsp_power - run")
if mode == "client":
self._oled.fill(0)
self._oled.text('Booting...', 0, 0)
self._oled.text('Client Mode', 0, 10)
self._oled.show()
elif mode == "done":
print("dsp:dsp_pwr_done - run")
self._oled.fill(0)
self._oled.show()
else:
print("dsp:dsp_pwr - error:", mode)
return
def dsp_ifttt(self, evt_id, sts):
print("dsp:dsp_ifttt - run")
self._oled.fill(0)
self._oled.text('<IFTTT Request>', 0, 0)
self._oled.text(('->' + evt_id), 0, 10)
self._oled.text(sts, 0, 20)
self._oled.text('[MicroPython]', 0, 30)
self._oled.show()
return
class DspProc():
def __init__(self, lock=None, snd_que=None, rcv_que=None):
# init
print("dsp:__init__ - run")
self._oled_ctl = OledCtl()
self._tmr = machine.Timer(TIMER_ID_DSP)
self._lock = lock
self._snd_que = snd_que
self._rcv_que = rcv_que
self._clear_flg = False
return
def _proc_dsp(self):
print("dsp:_proc_dsp - run")
def start_timer(type, peri):
print("dsp:start_timer - run", peri)
if type == "clear":
self._tmr.init(period=int(peri), mode=machine.Timer.ONE_SHOT, callback=set_event_clear)
return
def stop_timer():
print("dsp:stop_timer - run")
self._tmr.deinit()
self._clear_flg = False
return
def set_event_clear(t):
print("dsp:_set_event_clear - run")
self._clear_flg = True
return
def act_dsp(msg):
print('dsp:act_dsp - run')
d = conv_msg2dict(msg)
if False == ('cmd' in d) or False == ('type' in d):
return False
# {cmd:dsp,type:ifttt,how:"evt_id",tmr:3000}
# {cmd:dsp,type:power,how:server,tmr:3000}
# {cmd:dsp,type:power,how:client,tmr:3000}
# {cmd:dsp,type:power,how:done,tmr:3000}
# {cmd:dsp,type:clear,how: ,tmr:0}
typ = d['type'] if ('type' in d) else "dummy"
how = d['how'] if ('how' in d) else "dummy"
tm_count = int(d['tmr']) if ('tmr' in d) else -1
if "power" == typ:
self._oled_ctl.dsp_power(how)
elif "clear" == typ:
self._oled_ctl.dsp_clear()
elif "ifttt" == typ:
sts = d['sts'] if ('sts' in d) else "dummy"
self._oled_ctl.dsp_ifttt(how, sts)
else:
return False
# timer
if tm_count > 0:
start_timer(type="clear", peri=tm_count)
print('dsp:act_dsp - over')
return True
while True:
# recvive_que
msg = recv_que(self._lock, self._rcv_que)
if msg is None:
# print("IndexError")
time.sleep_ms(50)
# clear
if self._clear_flg:
self._oled_ctl.dsp_clear()
self._clear_flg = False
continue
# stop : timer
stop_timer()
print("proc_dsp:msg - ", msg)
act_dsp(msg)
return
def run(self):
_thread.start_new_thread(self._proc_dsp, ())
def main():
lock = _thread.allocate_lock()
que_dsp2pre = []
que_pre2dsp = []
dsp_proc = DspProc(lock, snd_que=que_dsp2pre, rcv_que=que_pre2dsp)
dsp_proc.run()
send_que(lock, que_pre2dsp, ("dst:dsp,src:pre,cmd:dsp,type:power,how:server,tmr:3000"))
time.sleep_ms(1_000)
send_que(lock, que_pre2dsp, ("dst:dsp,src:pre,cmd:dsp,type:ifttt,how:evt_id,sts:sending...,tmr:3000"))
time.sleep_ms(1_000)
send_que(lock, que_pre2dsp, ("dst:dsp,src:pre,cmd:dsp,type:ifttt,how:evt_id0,sts:sended.,tmr:3000"))
time.sleep_ms(10_000)
return
if __name__ == "__main__":
main()
実行方法と結果
>>> dsp.main()
dsp:__init__ - run
OledCtl - init
Warning: I2C(-1, ...) is deprecated, use SoftI2C(...) instead
dsp:_proc_dsp - run
dsp:stop_timer - run
proc_dsp:msg - dst:dsp,src:pre,cmd:dsp,type:power,how:server,tmr:3000
dsp:act_dsp - run
dsp:_dsp_power - run
dsp:dsp_pwr - error: server
dsp:start_timer - run 3000
dsp:act_dsp - over
dsp:_set_event_clear - run
dsp:stop_timer - run
proc_dsp:msg - dst:dsp,src:pre,cmd:dsp,type:ifttt,how:evt_id,sts:sending...,tmr:3000
dsp:act_dsp - run
dsp:dsp_ifttt - run
dsp:start_timer - run 3000
dsp:act_dsp - over
dsp:stop_timer - run
proc_dsp:msg - dst:dsp,src:pre,cmd:dsp,type:ifttt,how:evt_id0,sts:sended.,tmr:3000
dsp:act_dsp - run
dsp:dsp_ifttt - run
dsp:start_timer - run 3000
dsp:act_dsp - over
dsp:_set_event_clear - run
>>>
ポイント
ディスプレイの制御
- ディスプレイは、I2C制御で行います
- ESP32では、SCL:22 / SDA:21がI2C用のハードピン
- ドライバから提供されるAPIは以下を使用している
- self._oled.fill() : 塗りつぶし(バッファ上のみ)
- self._oled.text() : 座標を指定し、文字列の描画(バッファ上のみ)
- self._oled.show() : バッファ上のデータを液晶に適用
ディスプレイの表示クリア処理(タイマーの利用)
- 表示クリアを自動的に実施するために、タイマーを利用している
- 通知の際、tmr > 0の場合、start_timer()で作成している
- self._tmr.init(period=int(peri), mode=machine.Timer.ONE_SHOT, callback=set_event_clear)
- periで指定した時間が経過したら、1回だけset_event_clear()をコール
- self._tmr.init(period=int(peri), mode=machine.Timer.ONE_SHOT, callback=set_event_clear)
- set_event_clear()では、clear用のフラグのみを操作
- 本関数内でdsp_clear()をコールすると、タイミングによってタイマー処理とスレッド処理がぶつかるため
- 通知の際、tmr > 0の場合、start_timer()で作成している
参考URL
- MicroPython公式ドキュメント
- MicroPython - ESP32 用クイックリファレンス
- MicroPython - クラス Timer -- ハードウェアタイマーの制御
さいごに
次が最後の予定で、中継スレッドの制御を記載する予定です。
Discussion