🐍

Raspberry Pi Pico Wのゴミ箱センサー

2023/04/28に公開

はじめに

株式会社DELTA エンジニアの長田(おさだ)です。

前回はゴミ箱センサーの開発スタイルについて書かせていただきましたが、今回はゴミ箱センサーの技術的な部分を備忘録として記事にしたいと思います。

Raspberry Pi Pico Wの工事設計認証(いわゆる技適)の取得および表示手順が完了したため2023年3月27日に日本でも発売されました。
前回の記事ではRaspberry Pi Picoを使用していましたが、Raspberry Pi Pico WにはWi-Fi機能が追加されており、せっかくなのでゴミ箱センサーにもWi-Fiを使用した機能を追加しています。

開発環境

【Raspberry Pi Pico W】無線LAN機能の使い方完全ガイド等を参考に開発をしており、開発環境は以下の通りです。
・Apple M2(macOS Ventura 13.2.1)
・MicroPython[1]
・Thonny

ゴミ箱センサーの概要

前回の記事にも書かせていただきましたが、ゴミ箱センサーの機能は『超音波センサーで距離を計測、計測結果によってLED点灯』となっています。
以下、少し詳細に説明させていただきます。

超音波センサー

ゴミ箱内部の天板に超音波センサーを設置して、ゴミ箱内のゴミの高さ(ゴミと天板との距離)を計測しています。

計測した値によって、ゴミ箱が空なのか八分目なのか満杯なのか判定をしてLEDの発光色や発光パターンを変更しています。

配線

Raspberry Pi Pico Wには超音波センサーから信号を受け取る配線郡とLED発光の制御をしている配線郡が必要となっています。


出典:ラズベリーパイ財団

Raspberry Pi Pico W 配線色 超音波センサー(HC-SR04)
40 Vcc
13 GND(途中に抵抗入れてます)
19 Trig
20 Echo(途中に抵抗入れてます)
Raspberry Pi Pico W 配線色 LED(NeoPixel)
39 +5V
38 GND
1 Din

Slack通知

またRaspberry Pi Pico Wに変更した際に、Slack通知の機能も追加しました。
通知条件はゴミ箱が満杯になってしばらくゴミ袋の交換がなかった場合と満杯が解消されたタイミングになります。

プログラムの説明

ファイル構成は以下の通りで、main.pyから各モジュールを呼び出しています。
処理についてはコメントを参照していただければと思いますが、いくつか別途説明させていただきます。

main.pyのソースはこちらです
main.py
from machine import Pin, Timer
import utime
import hcsr04
import neoPixel
import slack
import iqr

# HC-SR04センサーの設定
sensor = hcsr04.HC_SR04(trigger_pin=14, echo_pin=15)

# NeoPixel(LED)の設定
neopixel_controller = neoPixel.NeoPixelController(num_leds=5, led_pin=0)

# IQRの設定
# 外れ値ではない範囲を狭くするためにパラメータを0.25に設定
iqr_controller = iqr.OutlierRemover(0.25)

# Slack通知用の設定
# Wi-FiのSSIDとパスワード
ssid = 'Wi-FiのSSID'
password = 'Wi-FiのPASSWORD'
notifier = slack.SlackNotifier(ssid, password)

# SlackのWebhook URL
url = 'SlackのWebhook URL'
message_text_full = "ゴミ箱が満杯です。"
message_text_empty = "ゴミ袋の交換ありがとうございましtz。"

# LEDの点灯を判定(外れ値対策)するための配列
# 0.2秒間隔の要素15なので3秒間分の距離をキューイングします
history = [150 for x in range(0, 15)]

# Slack通知までの時間を計測する配列
# color_flashの点灯時間(LED個数で変動)も考慮が必要
# 例)LED5個でcolor_flashの場合、距離の計測間隔は0.5(LED)+0.2(defaultの計測間隔)秒なので配列が500の場合は350秒で通知が飛びます
long_history = [150 for x in range(0, 500)]

# 繰り返しのSlack通知を防ぐためのフラグ
isFull = False

while True:
    cm = sensor.read_distance()
    if cm > 2 and cm < 400:  # 距離が2~400cmの場合のみ対応
        # LED点灯用の処理
        history.append(int(cm))
        history.pop(0)
        print("distance=", int(cm), "cm")  # 距離をint型で表示
        filtered_history = iqr_controller.remove_outliers(history)

	if all(elem <= 35 for elem in filtered_history):
            print("35以下なのでcolor_flash")
            neopixel_controller.color_flash()

	elif all(elem <= 55 for elem in filtered_history):
            print("55以下なのでset_led_colorで黄色")
            neopixel_controller.set_led_color(30, 30, 0)

	elif all(elem <= 100 for elem in filtered_history):
            print("100以下なのでset_led_colorで青色")
            neopixel_controller.set_led_color(0, 0, 20)

        else:
            neopixel_controller.set_led_color(0, 0, 0)  # LEDを消灯

        # Slack通知用の処理
        long_history.append(int(cm))
        long_history.pop(0)
        filtered_long_history = iqr_controller.remove_outliers(long_history)

        # 350秒間の全ての値が35以下だったらSlack通知
	if all(elem <= 35 for elem in filtered_long_history) and not isFull:
            print('send slack full')
            notifier.send_slack(url, message_text_full)
            isFull = True

        # 35以下が解消されたときのみSlack通知
	if all(elem > 35 for elem in filtered_long_history) and isFull:
            print('send slack empty')
            notifier.send_slack(url, message_text_empty)
            isFull = False

        utime.sleep(0.2)  # 0.2秒間隔で計測

    else:
        neopixel_controller.set_led_color(0, 0, 0)  # LEDを消灯
main.py以外のソースはこちらです

超音波センサー(HC-SR04)の処理

距離の計測にはHC-SR04を使用しています。
triggerやechoの値をコンストラクターで受け取っています。(ハンダ付けしてしまえば変更する値ではないんですが、ソースコード流用を考慮して)
こちらを参考にしました。

hcsr04.py
from machine import Pin
import utime

class HC_SR04:
    def __init__(self, trigger_pin, echo_pin):
        self.trigger = Pin(trigger_pin, Pin.OUT)
        self.echo = Pin(echo_pin, Pin.IN)

    # HC-SR04で距離を測定する
    def read_distance(self):
        self.trigger.low()
        utime.sleep_us(2)
        self.trigger.high()
        utime.sleep_us(10)
        self.trigger.low()

        while self.echo.value() == 0:
            signal_off = utime.ticks_us()
        while self.echo.value() == 1:
            signal_on = utime.ticks_us()

        time_passed = signal_on - signal_off
        distance = (time_passed * 0.0343) / 2
        return distance

外れ値の除外処理

IQR (四分位範囲)を使った外れ値の除去

iqr.py
class OutlierRemover:
    def __init__(self, k=1.5):
        self.k = k

    def find_iqr(self, data):
        sorted_data = sorted(data)
        q1_index = len(sorted_data) // 4
        q3_index = (3 * len(sorted_data)) // 4

        q1 = sorted_data[q1_index]
        q3 = sorted_data[q3_index]

        iqr = q3 - q1
        return iqr, q1, q3

    def remove_outliers(self, data):
        iqr, q1, q3 = self.find_iqr(data)

        lower_bound = q1 - self.k * iqr
        upper_bound = q3 + self.k * iqr

        filtered_data = [x for x in data if lower_bound <= x <= upper_bound]
        return filtered_data

NeoPixcel(LED)の処理

コンストラクターにはLEDの個数と制御のGPIOを受け取っています。
試作段階では短いNeoPixcelを使用していたため、LED個数は何度も変更する必要があった値です。

neoPixcel.py
import array
from machine import Pin
import rp2
import utime


class NeoPixelController:
    def __init__(self, num_leds, led_pin):
        self.NUM_LEDS = num_leds
        self.LED_PIN = led_pin

        # StateMachineで正確なパルスを出力するための設定
        self.sm = rp2.StateMachine(
            0, self.ws2812, freq=8_000_000, sideset_base=Pin(self.LED_PIN))
        self.sm.active(1)
        # LEDのRGB値を指定する24bit(8bit x 3)の配列をNeoPixelの数で準備
        self.ar = array.array("I", [0 for _ in range(self.NUM_LEDS)])

    # NeoPixel(WS2812)LEDに正確な制御信号を送信するためのカスタムアセンブリコードを定義
    @rp2.asm_pio(sideset_init=rp2.PIO.OUT_LOW, out_shiftdir=rp2.PIO.SHIFT_LEFT, autopull=True, pull_thresh=24)
    def ws2812():
        T2 = 5
        T1 = 2
        T3 = 3
        wrap_target()
        label("bitloop")
        out(x, 1) .side(0)[T3 - 1]
        jmp(not_x, "do_zero") .side(1)[T1 - 1]
        jmp("bitloop") .side(1)[T2 - 1]
        label("do_zero")
        nop() .side(0)[T2 - 1]
        wrap()

    def _drawpix(self, num, r, g, b):
        self.ar[num] = g << 16 | r << 8 | b

    # 流れるように点灯
    def color_walk(self):
        for i in range(0, self.NUM_LEDS * 3):
            for j in range(0, self.NUM_LEDS):
                if j < 3:
                    self._drawpix((i+j) % self.NUM_LEDS, 5, 5, 5)
                else:
                    self._drawpix((i+j) % self.NUM_LEDS, 255, 0, 0)
            self.sm.put(self.ar, 10)
            utime.sleep(0.05)

    # ゆっくり点滅
    def color_flash(self):
        for i in range(0, self.NUM_LEDS * 2):
            for j in range(0, self.NUM_LEDS):
                k = i % 10
                self._drawpix(j % self.NUM_LEDS, k ** 2, 0, 0)
            self.sm.put(self.ar, 10)
            utime.sleep(0.05)

    # 常時点灯
    def set_led_color(self, red, green, blue):
        for i in range(0, self.NUM_LEDS):
            self._drawpix(i, red, green, blue)
        self.sm.put(self.ar, 10)

Slackの送信処理

Raspberry Pi Pico Wになったため、追加したモジュールになります。
本体起動時(インスタンス化時)にWi-Fiの接続をしますが、起動から送信までにWi-Fiの接続が切れる可能性もあるため、Slack送信前に再接続の処理を挟んでいます。

slack.py
import time
import network
import machine
from machine import Pin
import urequests
import ujson

class SlackNotifier:
    def __init__(self, ssid, password):
        self.ssid = ssid
        self.password = password
        self.connected = self._connect_to_wifi(ssid, password)

    def _connect_to_wifi(self, ssid, password):
        wlan = network.WLAN(network.STA_IF)
        wlan.active(True)

        if not wlan.isconnected():
            print('Connecting to network...')
            wlan.connect(ssid, password)

            # 接続が完了するまで待機
            while not wlan.isconnected():
                time.sleep(1)

        print('Network config:', wlan.ifconfig())
        return wlan.isconnected()

    def _send_request_with_retry(self, url, message, headers, max_retries=3, timeout=10):
        retries = 0
        while retries < max_retries:
            try:
                res = urequests.post(
                    url,
                    data=ujson.dumps(message).encode("utf-8"),
                    headers=headers,
                    timeout=timeout
                )
                return res
            except OSError as e:
                print("Error:", e)
                print("Retrying... (attempt {}/{})".format(retries + 1, max_retries))
                retries += 1
                time.sleep(1)
        return None

    def send_slack(self, url, message_text):
        if not self.connected:
            # 途中で切断した場合は再接続を試みる
            print("Trying to reconnect...")
            self.connected = self._connect_to_wifi(self.ssid, self.password)
            if not self.connected:
                print("Failed to reconnect to Wi-Fi, cannot send Slack message.")
                return

        # jsonデータで送信するという事を明示的に宣言
        header = {'Content-Type': 'application/json'}

        # 送信するメッセージ
        message = {"text": message_text}

        # HTTPリクエストをPOSTとして送信
        res = self._send_request_with_retry(url, message, header)
        if res is not None:
            print("Request successful:", res.status_code)
            res.close()
        else:
            print("Request failed after retries.")

実装ポイント① : 計測した距離のキューイング(history)

計測した距離をそのまま判定してしまうと、測定のたびにLEDの発光が変わる可能性があります。(例えば、ゴミを捨てた一瞬の値を拾って点灯してしまう)
そのため、3秒間の値をキューイングして全ての値が基準を満たす場合にLED点灯をしています。

実装ポイント② : 外れ値対応

Raspberry Pi Picoでは通常のPythonが使用できないため、NumPyのようなライブラリが利用できません。
そのため、外れ値を取り除くために四分位範囲を用いた処理を書きました。
外れ値ではない範囲を狭くするためにパラメータを0.25に設定するようmain.pyから値を渡しています。

実装ポイント③ : Slack通知の重複送信対応(long_history)

ゴミ箱が満杯になってLED点灯するタイミングとSlack送信のタイミングを変えています。ゴミを捨てるために腕を入れたりゴミ袋交換時に満杯と判断することがあるためです。
約6分間、満杯と判定され続けた場合にSlack通知するようにしています。(まずは6分間としていますが、これが正しいタイミングか否かはSlack通知の頻度で修正予定です)

また、満杯のみの条件では満杯である限り送信が続いてしまうためisFullのフラグでも送信タイミングの制御をしています。

電子工作の説明

工作ポイント① : Raspberry Pi Pico W

ポイントと言うほどではないんですが、ハンダ付け後に熱収縮チューブでカバーしてゴミ箱に設置しています。
デバッグ用にBOOTSELボタンのみ露出させています。

工作ポイント② : LED点灯不具合の対応

ゴミ箱に設置してみると、LEDの点灯が期待通りに点灯しないケースがありました。
HC-SR04センサーの固定が不安定になっているようでしたので、センサーに台座を取り付けて固定を安定させました。台座取り付け前はこちらです。

まとめ

本記事ではソースコードの紹介と改良点について説明させていただきました。
1つ1つの機能は沢山の方たちが記事にしていたので、真新しい機能はないかもしれませんが参考になれば嬉しいです。
ラズパイを触りはじめて数週間ですが、どっぷりと沼に足を踏み入れた感じがしています。センサーから取得した値の処理(いわゆるプログラミング)だけでなく、物理的な固定方法などの工夫も必要でプロダクトデザインにも興味が湧いてきています。
今後もゴミ箱センサーの改良を加えながら、開発を続けていきたいと思います。

We're Hiring!

最後までお読みくださり、ありがとうございます。
現在 株式会社DELTA では様々な形で一緒に働く仲間を募集中です。

そんな方はぜひこちらから、お気軽にご連絡ください!
https://docs.google.com/forms/d/e/1FAIpQLSfQuWNU1il5lq2rVdICM0tSK_jTsjqwc52LYEwUxBq7_ImtrQ/viewform

脚注
  1. Raspberry Pi PicoとRaspberry Pi Pico Wではファームウェアが違うため、私は30分ほど溶かしてしまいました。記事にはちゃんと書いてあったので読んでいなかっただけです。。。 ↩︎

DELTAテックブログ

Discussion