💡
[ライフハック]ラズパイピコで照明のオンオフをする
ラズパイピコを使って、↓を作りました。
😟困ったこと
スイッチでしか制御できない照明があり、手元で操作できないかと。
🤔検討
前にSwitchBotなど既製品で対応しましたが、以下の点で断念しました。
- スイッチ側のカバーが外れてしまう(家のスイッチの問題)
- スイッチが複数あるため、台数を買うにはなかなか出費が多い
ラズパイピコとSG90が大量に余っていたため、それを組み合わせて実現することにしました。
💡やったこと
- 利用者は、スマホから照明制御のリクエストを送信
- ラズパイピコは、リクエストを受けてSG90を制御
- 照明がつく(消える)
🔧パーツ一覧
no | 部品名 | 個数 | 備考 |
---|---|---|---|
1 | ラズベリーパイピコW | 1 | 無線今回は、4Bを使用 |
2 | SG90 | 2 | 秋月電子 |
3 | スイッチ | 2 | 秋月電子 お好きな色 |
4 | ジャンパー線 | 適量 | - |
5 | スイッチ用ケース | 1 | 3Dプリンタで自作(後述) |
6 | モバイルバッテリー | 1 | IoT対応のもの |
接続図
💻環境
開発環境
- ラズベリーピコ
- ファームウェア : v1.24.0 (2024-10-25) .uf2 / [Release notes] (latest)
- エディタ(IDE)
- Thonny 4.16
ラズベリーパイピコのセットアップ
- セットアップは下記のページを参照してください↓
- ラズパイピコW用のファームウェアは↓となります
📝手順
以下を記載します。
- 3Dプリンタによるスイッチの作成
- 仕様検討
- コード
筐体の作成(3Dモデリング)
- ベースになる寸法は以下となります
- ↓のような感じです、ネジ穴で固定することができます
- 最終的には下記のようにしました
- モデリングはFreeCADを使用しています。
- サーボモータが0度の状態は以下となります
仕様検討
- スマホから手軽に操作したい
- Webサーバを搭載し、GET/POSTメソッドの対応を行う
- 手動でオンオフもしたい
- タクトスイッチを搭載する
ファイル構成
ファイル名 | 説明 |
---|---|
main.py | メインスレッド、Webサーバ |
tact_switch.py | スイッチ監視 |
servo.py | サーボモータ制御 |
index.html | 照明制御ページ |
NotFound.html | NotFoundページ |
webサーバ側
- 画面
- トップページのみとし、オン/オフのみ
- API リクエスト一覧
メソッド | エンドポイント | 説明 | リクエスト内容 | レスポンス内容 |
---|---|---|---|---|
GET | /status |
照明状態の取得 | なし |
{"light": "<状態>"} (on またはoff ) |
POST | /light |
照明の制御 |
{"light": "<状態>"} (on またはoff ) |
成功: {"status": "success", "light": "<状態>"} 失敗: {"status": "error", "message": "Invalid action"}
|
POST | /reset |
デバイスをリセット | {"reset": "on"} |
成功: {"status": "success", "reset": "on"} 失敗: {"status": "error", "message": "Invalid action"}
|
コード
各制御のコードを記載します
サーボ制御
- 取り付けが逆のため、オンとオフで角度の方向が異なります
- light_statusで照明の状態を保持しています
servo.py
from machine import Pin
from machine import PWM
import time
# サーボの動作範囲
ANGLE_RANGE = 180
# PWMの時間の範囲
TIME_RANGE = 1.9
# サーボに指定できる最小の時間
MIN_TIME = 0.5
# サーボのPWMの周期時間
CYCLE_TIME = 20.0
pwms = {
"on": {"pin": PWM(Pin(20, Pin.OUT)), "angle": 45}, # 3
"off": {"pin": PWM(Pin(19, Pin.OUT)), "angle": -45}, # 4
}
class Servos:
"""
サーボ管理
"""
def __init__(self):
self.light_status = False # 初期値を False に設定
for pwm in pwms.values():
pwm["pin"].freq(50)
return
def moveServo(self, pwm, angle):
# 入力角度のチェック
if angle < -90 or angle > 90:
return False
# 0~180換算
angle = angle + 90
percent = angle / ANGLE_RANGE
addTime = TIME_RANGE * percent
time = MIN_TIME + addTime
ratio = time / CYCLE_TIME
ratio = (int)(65535 * ratio)
pwm.duty_u16(ratio)
return True
def shake(self, key):
# 0度
self.moveServo(pwms[key]["pin"], 0)
time.sleep(0.1)
# 指定角度
self.moveServo(pwms[key]["pin"], pwms[key]["angle"])
time.sleep(0.1)
# 0度
self.moveServo(pwms[key]["pin"], 0)
time.sleep(0.1)
# 照明状態の更新
self.light_status = True if key == "on" else False
return
def main():
servos = Servos()
for _ in range(3):
servos.shake("on")
servos.shake("off")
return
if __name__ == "__main__":
main()
スイッチ制御
- ポーリングでスイッチを監視します
- 3回連続押し続けた場合に判定します
tact_switch.py
import uasyncio as asyncio
from machine import Pin
from servo import Servos
sws = {
"on": {"pin": Pin(14, Pin.IN, Pin.PULL_UP), "count": 0},
"off": {"pin": Pin(15, Pin.IN, Pin.PULL_UP), "count": 0},
}
async def proc_sw(servos=None):
print("proc_sw:run")
# polling
while True:
await asyncio.sleep_ms(50)
for key, sw in sws.items():
# swカウント
# print(key, sw["pin"].value())
if sw["pin"].value() == 0:
sw["count"] += 1
print(f"{key} pressed {sw['count']} times")
else:
sw["count"] = 0
# swカウントが3以上ならモータ制御
if sw["count"] > 2:
sw["count"] = 0
servos.shake(key)
return
async def async_main():
servos = Servos()
asyncio.create_task(proc_sw(servos))
await asyncio.sleep(30)
def main():
asyncio.run(async_main())
if __name__ == "__main__":
main()
メインスレッド(Webサーバ)
- SSID, PASSWORDは自身の環境に合わせてください
main.py
import network
import asyncio
from time import sleep
import machine
import ubinascii
import _thread
from servo import Servos
from tact_switch import proc_sw
# サーボのインスタンスを生成
servos = Servos()
# Wi-Fi ルーターのSSIDとパスワード
SSID = "XXXXXXXXX"
PASSWORD = "YYYYYYYYYY"
PORT = 80
# ルーティングテーブル
ROOTING_TABLE = {
"/": "index.html",
"/index": "index.html",
}
# コンテントタイプ
CONTENT_TYPE = {
"html": "text/html",
"jpg": "image/jpg",
"png": "image/png",
"ico": "image/x-icon",
}
# GETアクションテーブル
GET_ACTION_TABLE = {
"/status": lambda: get_device_status(),
}
def get_device_status():
light_value = "on" if servos.light_status else "off"
return f'{{"light": {light_value}}}'
def json_response(url):
# JSON形式のデータを作成(ここでは例としてデバイスのステータスを返す)
if url in GET_ACTION_TABLE.keys():
response = GET_ACTION_TABLE[url]() # JSON形式のレスポンス
return ("HTTP/1.0 200 OK\r\nContent-Type: application/json\r\n\r\n", response)
return None
# POSTアクションテーブル
# POSTのときの処理をここに登録
POST_ACTION_TABLE = {
"/light": lambda posted_data: post_light_action(posted_data),
"/reset": lambda posted_data: post_reset_action(posted_data),
}
# lightの状態を管理する関数
def post_light_action(posted_data):
if posted_data.get("light") == "on":
servos.shake("on")
return '{"status": "success", "light": "on"}'
elif posted_data.get("light") == "off":
servos.shake("off")
return '{"status": "success", "light": "off"}'
return '{"status": "error", "message": "Invalid action"}'
# リセット
def thread_reset():
sleep(1)
machine.reset()
def post_reset_action(posted_data):
if posted_data.get("reset") == "on":
_thread.start_new_thread(thread_reset, ())
return '{"status": "success", "reset": "on"}'
return '{"status": "error", "message": "Invalid action"}'
# postページを加工する関数をここに登録
def post_name(page_data, posted_data):
edited_page = str(page_data).format(posted_data["name"])
return edited_page
# Webページを取得する関数
def get_page(file_name):
open_mode = "r"
if get_file_type(file_name) != "html":
open_mode = "rb"
data = ""
try:
with open(file_name, open_mode) as f:
data = f.read()
except Exception as e:
print(f"Error opening file: {e}")
with open("NotFound.html", "r") as f:
data = f.read()
return data
# WEBページをルーティングする関数
def rooting_from_url(rooting):
# ファイル名指定だったら、ファイル名を返す
if len(rooting.split(".")) >= 2:
return rooting.split("/")[-1]
# ルーティング表を参照して、ファイル名を返す
if rooting in ROOTING_TABLE.keys():
return ROOTING_TABLE[rooting]
# 各表になかったら、NotFound.htmlを返す
return "NotFound.html"
# urlからファイルタイプを取得する関数
def get_file_type(url):
url_split = url.split(".")
if len(url_split) == 1:
mime_type = "html"
else:
mime_type = url_split[-1]
return mime_type
# ファイルタイプからContent-Typeを取得する関数
def get_content_type(url):
mime_type = get_file_type(url)
content_type = ""
if mime_type in CONTENT_TYPE.keys():
content_type = (
f"HTTP/1.0 200 OK\r\nContent-type: {CONTENT_TYPE[mime_type]}\r\n\r\n"
)
return content_type
# Wi-Fiに接続する関数
def connect_and_return_ip():
# Connect to WLAN
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(SSID, PASSWORD)
while wlan.isconnected() == False:
print("Waiting for connection...")
sleep(1)
ip = wlan.ifconfig()[0]
print(f"Connected on {ip}")
return ip
# GETメソッドの時の処理
def get_method(url):
result = json_response(url)
if result is not None:
return result
filename = rooting_from_url(url)
# Raspberry Pi PICO Wに対するアクション実行
page = get_page(filename)
return (get_content_type(url), page)
# POSTメソッドの時の処理
def post_method(url, posted_data):
if url in POST_ACTION_TABLE.keys():
response = POST_ACTION_TABLE[url](posted_data) # JSON形式のレスポンス
return ("HTTP/1.0 200 OK\r\nContent-Type: application/json\r\n\r\n", response)
return (
"HTTP/1.0 404 Not Found\r\nContent-Type: application/json\r\n\r\n",
'{"status": "error", "message": "Not found"}',
)
# URLデコードを行う関数
# micropythonにはurllib.parseモジュールはないため自前で実装
# '+' を ' ' に変換し、%xx 形式の文字列を対応するASCII文字に変換する
def url_decode(s):
if "%" not in s:
return s
s_replaced = s.replace("+", " ")
s_decoded = ubinascii.unhexlify(s_replaced.replace("%", "").encode()).decode()
return s_decoded
def get_posted_data(request):
request_lines = request.decode("utf8").split("\r\n")
posted_data = {
key: url_decode(value)
for key, value in [line.split("=") for line in request_lines[-1].split("&")]
}
return posted_data
# リクエストに対して、コンテントタイプとWebページを返す関数
def get_content_and_page(request):
request_line = request.split(b"\r\n")[0].decode("utf-8")
method = request_line.split()[0]
url = request_line.split()[1]
if method == "GET":
return get_method(url)
elif method == "POST":
return post_method(url, get_posted_data(request))
raise ValueError("method is not GET or POST")
# クライアント(ブラウザ)からの接続に対応する関数
async def async_server(reader, writer):
request = await reader.read(1024)
if len(request) == 0:
return
content_type, page = get_content_and_page(request)
writer.write(content_type)
writer.write(page)
await writer.drain()
await writer.wait_closed()
async def async_sw(servos):
await proc_sw(servos)
# メイン処理部分
def main():
global servos
# メイン処理部分
# Wi-Fiに接続し、IPアドレスを取得します
ip = connect_and_return_ip()
loop = asyncio.new_event_loop()
# スイッチタスクの作成と実行
loop.create_task(async_sw(servos))
# 非同期でWEBサーバを起動
coroutine = asyncio.start_server(async_server, ip, PORT)
server = loop.run_until_complete(coroutine)
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
return
if __name__ == "__main__":
main()
WEBサーバ側のコードは、下記のページを参考にさせていただきました。
さいごに
無事に作りたかったものが作れました。
ただ、たまにオンされなかったり(反応しても押すまでいかない)があったりと、
安定して動いているとはいいがたいです。
スイッチの押す部分の機構や例外処理時のリセットなど、安定して動くよう調整していきたいと思います。
また、制御用のAPIを利用して、Webアプリと接続してみたいと思います。
Discussion