💡

[ライフハック]ラズパイピコで照明のオンオフをする

2024/10/27に公開

ラズパイピコを使って、↓を作りました。

https://twitter.com/tw_kotatu/status/1850501610784608300

😟困ったこと

スイッチでしか制御できない照明があり、手元で操作できないかと。

🤔検討

前にSwitchBotなど既製品で対応しましたが、以下の点で断念しました。

  • スイッチ側のカバーが外れてしまう(家のスイッチの問題)
  • スイッチが複数あるため、台数を買うにはなかなか出費が多い

ラズパイピコとSG90が大量に余っていたため、それを組み合わせて実現することにしました。

💡やったこと

  1. 利用者は、スマホから照明制御のリクエストを送信
  2. ラズパイピコは、リクエストを受けてSG90を制御
  3. 照明がつく(消える)

🔧パーツ一覧

no 部品名 個数 備考
1 ラズベリーパイピコW 1 無線今回は、4Bを使用
2 SG90 2 秋月電子
3 スイッチ 2 秋月電子 お好きな色
4 ジャンパー線 適量 -
5 スイッチ用ケース 1 3Dプリンタで自作(後述)
6 モバイルバッテリー 1 IoT対応のもの

接続図

💻環境

開発環境

  • ラズベリーピコ
    • ファームウェア : v1.24.0 (2024-10-25) .uf2 / [Release notes] (latest)
  • エディタ(IDE)
    • Thonny 4.16

ラズベリーパイピコのセットアップ

  • セットアップは下記のページを参照してください↓

https://www.raspberrypi.com/documentation/microcontrollers/micropython.html

  • ラズパイピコW用のファームウェアは↓となります

https://micropython.org/download/RPI_PICO_W/

📝手順

以下を記載します。

  • 3Dプリンタによるスイッチの作成
  • 仕様検討
  • コード

筐体の作成(3Dモデリング)

  • ベースになる寸法は以下となります
  • ↓のような感じです、ネジ穴で固定することができます
  • 最終的には下記のようにしました
    • モデリングはFreeCADを使用しています。
  • サーボモータが0度の状態は以下となります

仕様検討

  • スマホから手軽に操作したい
    • Webサーバを搭載し、GET/POSTメソッドの対応を行う
  • 手動でオンオフもしたい
    • タクトスイッチを搭載する

ファイル構成

ファイル名 説明
main.py メインスレッド、Webサーバ
tact_switch.py スイッチ監視
servo.py サーボモータ制御
index.html 照明制御ページ
NotFound.html NotFoundページ

webサーバ側

  • 画面
    • トップページのみとし、オン/オフのみ
  • API リクエスト一覧
メソッド エンドポイント 説明 リクエスト内容 レスポンス内容
GET /status 照明状態の取得 なし {"light": "<状態>"}onまたはoff
POST /light 照明の制御 {"light": "<状態>"}onまたはoff 成功: {"status": "success", "light": "<状態>"}
失敗: {"status": "error", "message": "Invalid action"}
POST /reset デバイスをリセット {"reset": "on"} 成功: {"status": "success", "reset": "on"}
失敗: {"status": "error", "message": "Invalid action"}

コード

各制御のコードを記載します

サーボ制御

  • 取り付けが逆のため、オンとオフで角度の方向が異なります
  • light_statusで照明の状態を保持しています
servo.py
from machine import Pin
from machine import PWM
import time


# サーボの動作範囲
ANGLE_RANGE = 180
# PWMの時間の範囲
TIME_RANGE = 1.9
# サーボに指定できる最小の時間
MIN_TIME = 0.5
# サーボのPWMの周期時間
CYCLE_TIME = 20.0

pwms = {
    "on": {"pin": PWM(Pin(20, Pin.OUT)), "angle": 45},  # 3
    "off": {"pin": PWM(Pin(19, Pin.OUT)), "angle": -45},  # 4
}


class Servos:
    """
    サーボ管理
    """

    def __init__(self):
        self.light_status = False  # 初期値を False に設定
        for pwm in pwms.values():
            pwm["pin"].freq(50)
        return

    def moveServo(self, pwm, angle):

        # 入力角度のチェック
        if angle < -90 or angle > 90:
            return False

        # 0~180換算
        angle = angle + 90
        percent = angle / ANGLE_RANGE
        addTime = TIME_RANGE * percent
        time = MIN_TIME + addTime
        ratio = time / CYCLE_TIME
        ratio = (int)(65535 * ratio)
        pwm.duty_u16(ratio)
        return True

    def shake(self, key):
        # 0度
        self.moveServo(pwms[key]["pin"], 0)
        time.sleep(0.1)
        # 指定角度
        self.moveServo(pwms[key]["pin"], pwms[key]["angle"])
        time.sleep(0.1)
        # 0度
        self.moveServo(pwms[key]["pin"], 0)
        time.sleep(0.1)
        # 照明状態の更新
        self.light_status = True if key == "on" else False

        return


def main():
    servos = Servos()

    for _ in range(3):
        servos.shake("on")
        servos.shake("off")
    return


if __name__ == "__main__":
    main()

スイッチ制御

  • ポーリングでスイッチを監視します
  • 3回連続押し続けた場合に判定します
tact_switch.py
import uasyncio as asyncio
from machine import Pin

from servo import Servos

sws = {
    "on": {"pin": Pin(14, Pin.IN, Pin.PULL_UP), "count": 0},
    "off": {"pin": Pin(15, Pin.IN, Pin.PULL_UP), "count": 0},
}


async def proc_sw(servos=None):
    print("proc_sw:run")

    # polling
    while True:
        await asyncio.sleep_ms(50)

        for key, sw in sws.items():
            # swカウント
            # print(key, sw["pin"].value())
            if sw["pin"].value() == 0:
                sw["count"] += 1
                print(f"{key} pressed {sw['count']} times")
            else:
                sw["count"] = 0

            # swカウントが3以上ならモータ制御
            if sw["count"] > 2:
                sw["count"] = 0
                servos.shake(key)
    return


async def async_main():
    servos = Servos()
    asyncio.create_task(proc_sw(servos))
    await asyncio.sleep(30)


def main():
    asyncio.run(async_main())


if __name__ == "__main__":
    main()

メインスレッド(Webサーバ)

  • SSID, PASSWORDは自身の環境に合わせてください
main.py
import network
import asyncio
from time import sleep
import machine
import ubinascii
import _thread

from servo import Servos
from tact_switch import proc_sw

# サーボのインスタンスを生成
servos = Servos()

# Wi-Fi ルーターのSSIDとパスワード
SSID = "XXXXXXXXX"
PASSWORD = "YYYYYYYYYY"
PORT = 80
# ルーティングテーブル
ROOTING_TABLE = {
    "/": "index.html",
    "/index": "index.html",
}
# コンテントタイプ
CONTENT_TYPE = {
    "html": "text/html",
    "jpg": "image/jpg",
    "png": "image/png",
    "ico": "image/x-icon",
}

# GETアクションテーブル
GET_ACTION_TABLE = {
    "/status": lambda: get_device_status(),
}


def get_device_status():
    light_value = "on" if servos.light_status else "off"
    return f'{{"light": {light_value}}}'


def json_response(url):
    # JSON形式のデータを作成(ここでは例としてデバイスのステータスを返す)
    if url in GET_ACTION_TABLE.keys():
        response = GET_ACTION_TABLE[url]()  # JSON形式のレスポンス
        return ("HTTP/1.0 200 OK\r\nContent-Type: application/json\r\n\r\n", response)
    return None


# POSTアクションテーブル
# POSTのときの処理をここに登録
POST_ACTION_TABLE = {
    "/light": lambda posted_data: post_light_action(posted_data),
    "/reset": lambda posted_data: post_reset_action(posted_data),
}


# lightの状態を管理する関数
def post_light_action(posted_data):
    if posted_data.get("light") == "on":
        servos.shake("on")
        return '{"status": "success", "light": "on"}'
    elif posted_data.get("light") == "off":
        servos.shake("off")
        return '{"status": "success", "light": "off"}'
    return '{"status": "error", "message": "Invalid action"}'


# リセット
def thread_reset():
    sleep(1)
    machine.reset()


def post_reset_action(posted_data):
    if posted_data.get("reset") == "on":
        _thread.start_new_thread(thread_reset, ())
        return '{"status": "success", "reset": "on"}'
    return '{"status": "error", "message": "Invalid action"}'


# postページを加工する関数をここに登録
def post_name(page_data, posted_data):
    edited_page = str(page_data).format(posted_data["name"])
    return edited_page


# Webページを取得する関数
def get_page(file_name):
    open_mode = "r"
    if get_file_type(file_name) != "html":
        open_mode = "rb"
    data = ""
    try:
        with open(file_name, open_mode) as f:
            data = f.read()
    except Exception as e:
        print(f"Error opening file: {e}")
        with open("NotFound.html", "r") as f:
            data = f.read()
    return data


# WEBページをルーティングする関数
def rooting_from_url(rooting):
    # ファイル名指定だったら、ファイル名を返す
    if len(rooting.split(".")) >= 2:
        return rooting.split("/")[-1]
    # ルーティング表を参照して、ファイル名を返す
    if rooting in ROOTING_TABLE.keys():
        return ROOTING_TABLE[rooting]
    # 各表になかったら、NotFound.htmlを返す
    return "NotFound.html"


# urlからファイルタイプを取得する関数
def get_file_type(url):
    url_split = url.split(".")
    if len(url_split) == 1:
        mime_type = "html"
    else:
        mime_type = url_split[-1]
    return mime_type


# ファイルタイプからContent-Typeを取得する関数
def get_content_type(url):
    mime_type = get_file_type(url)
    content_type = ""
    if mime_type in CONTENT_TYPE.keys():
        content_type = (
            f"HTTP/1.0 200 OK\r\nContent-type: {CONTENT_TYPE[mime_type]}\r\n\r\n"
        )
    return content_type


# Wi-Fiに接続する関数
def connect_and_return_ip():
    # Connect to WLAN
    wlan = network.WLAN(network.STA_IF)
    wlan.active(True)
    wlan.connect(SSID, PASSWORD)
    while wlan.isconnected() == False:
        print("Waiting for connection...")
        sleep(1)
    ip = wlan.ifconfig()[0]
    print(f"Connected on {ip}")
    return ip


# GETメソッドの時の処理
def get_method(url):
    result = json_response(url)
    if result is not None:
        return result
    filename = rooting_from_url(url)

    # Raspberry Pi PICO Wに対するアクション実行
    page = get_page(filename)
    return (get_content_type(url), page)


# POSTメソッドの時の処理
def post_method(url, posted_data):
    if url in POST_ACTION_TABLE.keys():
        response = POST_ACTION_TABLE[url](posted_data)  # JSON形式のレスポンス
        return ("HTTP/1.0 200 OK\r\nContent-Type: application/json\r\n\r\n", response)
    return (
        "HTTP/1.0 404 Not Found\r\nContent-Type: application/json\r\n\r\n",
        '{"status": "error", "message": "Not found"}',
    )


# URLデコードを行う関数
# micropythonにはurllib.parseモジュールはないため自前で実装
# '+' を ' ' に変換し、%xx 形式の文字列を対応するASCII文字に変換する
def url_decode(s):
    if "%" not in s:
        return s
    s_replaced = s.replace("+", " ")
    s_decoded = ubinascii.unhexlify(s_replaced.replace("%", "").encode()).decode()
    return s_decoded


def get_posted_data(request):
    request_lines = request.decode("utf8").split("\r\n")
    posted_data = {
        key: url_decode(value)
        for key, value in [line.split("=") for line in request_lines[-1].split("&")]
    }
    return posted_data


# リクエストに対して、コンテントタイプとWebページを返す関数
def get_content_and_page(request):
    request_line = request.split(b"\r\n")[0].decode("utf-8")
    method = request_line.split()[0]
    url = request_line.split()[1]
    if method == "GET":
        return get_method(url)
    elif method == "POST":
        return post_method(url, get_posted_data(request))
    raise ValueError("method is not GET or POST")


# クライアント(ブラウザ)からの接続に対応する関数
async def async_server(reader, writer):
    request = await reader.read(1024)
    if len(request) == 0:
        return
    content_type, page = get_content_and_page(request)
    writer.write(content_type)
    writer.write(page)
    await writer.drain()
    await writer.wait_closed()


async def async_sw(servos):
    await proc_sw(servos)


# メイン処理部分
def main():

    global servos

    # メイン処理部分
    # Wi-Fiに接続し、IPアドレスを取得します
    ip = connect_and_return_ip()
    loop = asyncio.new_event_loop()

    # スイッチタスクの作成と実行
    loop.create_task(async_sw(servos))

    # 非同期でWEBサーバを起動
    coroutine = asyncio.start_server(async_server, ip, PORT)
    server = loop.run_until_complete(coroutine)
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    finally:
        server.close()
        loop.run_until_complete(server.wait_closed())
        loop.close()

    return


if __name__ == "__main__":
    main()

WEBサーバ側のコードは、下記のページを参考にさせていただきました。

https://www.youtube.com/watch?v=XgYMclaMwvA&t=1s&ab_channel=たねちゃんねる【テクノロジー】

さいごに

無事に作りたかったものが作れました。
ただ、たまにオンされなかったり(反応しても押すまでいかない)があったりと、
安定して動いているとはいいがたいです。

スイッチの押す部分の機構や例外処理時のリセットなど、安定して動くよう調整していきたいと思います。
また、制御用のAPIを利用して、Webアプリと接続してみたいと思います。

GitHubで編集を提案

Discussion