Raspberry Pi Pico Wのゴミ箱センサー
はじめに
株式会社DELTA エンジニアの長田(おさだ)です。
前回はゴミ箱センサーの開発スタイルについて書かせていただきましたが、今回はゴミ箱センサーの技術的な部分を備忘録として記事にしたいと思います。
Raspberry Pi Pico Wの工事設計認証(いわゆる技適)の取得および表示手順が完了したため2023年3月27日に日本でも発売されました。
前回の記事ではRaspberry Pi Picoを使用していましたが、Raspberry Pi Pico WにはWi-Fi機能が追加されており、せっかくなのでゴミ箱センサーにもWi-Fiを使用した機能を追加しています。
開発環境
【Raspberry Pi Pico W】無線LAN機能の使い方完全ガイド等を参考に開発をしており、開発環境は以下の通りです。
・Apple M2(macOS Ventura 13.2.1)
・MicroPython[1]
・Thonny
ゴミ箱センサーの概要
前回の記事にも書かせていただきましたが、ゴミ箱センサーの機能は『超音波センサーで距離を計測、計測結果によってLED点灯』となっています。
以下、少し詳細に説明させていただきます。
超音波センサー
ゴミ箱内部の天板に超音波センサーを設置して、ゴミ箱内のゴミの高さ(ゴミと天板との距離)を計測しています。
計測した値によって、ゴミ箱が空なのか八分目なのか満杯なのか判定をしてLEDの発光色や発光パターンを変更しています。
配線
Raspberry Pi Pico Wには超音波センサーから信号を受け取る配線郡とLED発光の制御をしている配線郡が必要となっています。
出典:ラズベリーパイ財団
Raspberry Pi Pico W | 配線色 | 超音波センサー(HC-SR04) |
---|---|---|
40 | 赤 | Vcc |
13 | 黒 | GND(途中に抵抗入れてます) |
19 | 青 | Trig |
20 | 緑 | Echo(途中に抵抗入れてます) |
Raspberry Pi Pico W | 配線色 | LED(NeoPixel) |
---|---|---|
39 | 赤 | +5V |
38 | 黒 | GND |
1 | 白 | Din |
Slack通知
またRaspberry Pi Pico Wに変更した際に、Slack通知の機能も追加しました。
通知条件はゴミ箱が満杯になってしばらくゴミ袋の交換がなかった場合と満杯が解消されたタイミングになります。
プログラムの説明
ファイル構成は以下の通りで、main.pyから各モジュールを呼び出しています。
処理についてはコメントを参照していただければと思いますが、いくつか別途説明させていただきます。
main.pyのソースはこちらです
from machine import Pin, Timer
import utime
import hcsr04
import neoPixel
import slack
import iqr
# HC-SR04センサーの設定
sensor = hcsr04.HC_SR04(trigger_pin=14, echo_pin=15)
# NeoPixel(LED)の設定
neopixel_controller = neoPixel.NeoPixelController(num_leds=5, led_pin=0)
# IQRの設定
# 外れ値ではない範囲を狭くするためにパラメータを0.25に設定
iqr_controller = iqr.OutlierRemover(0.25)
# Slack通知用の設定
# Wi-FiのSSIDとパスワード
ssid = 'Wi-FiのSSID'
password = 'Wi-FiのPASSWORD'
notifier = slack.SlackNotifier(ssid, password)
# SlackのWebhook URL
url = 'SlackのWebhook URL'
message_text_full = "ゴミ箱が満杯です。"
message_text_empty = "ゴミ袋の交換ありがとうございましtz。"
# LEDの点灯を判定(外れ値対策)するための配列
# 0.2秒間隔の要素15なので3秒間分の距離をキューイングします
history = [150 for x in range(0, 15)]
# Slack通知までの時間を計測する配列
# color_flashの点灯時間(LED個数で変動)も考慮が必要
# 例)LED5個でcolor_flashの場合、距離の計測間隔は0.5(LED)+0.2(defaultの計測間隔)秒なので配列が500の場合は350秒で通知が飛びます
long_history = [150 for x in range(0, 500)]
# 繰り返しのSlack通知を防ぐためのフラグ
isFull = False
while True:
cm = sensor.read_distance()
if cm > 2 and cm < 400: # 距離が2~400cmの場合のみ対応
# LED点灯用の処理
history.append(int(cm))
history.pop(0)
print("distance=", int(cm), "cm") # 距離をint型で表示
filtered_history = iqr_controller.remove_outliers(history)
if all(elem <= 35 for elem in filtered_history):
print("35以下なのでcolor_flash")
neopixel_controller.color_flash()
elif all(elem <= 55 for elem in filtered_history):
print("55以下なのでset_led_colorで黄色")
neopixel_controller.set_led_color(30, 30, 0)
elif all(elem <= 100 for elem in filtered_history):
print("100以下なのでset_led_colorで青色")
neopixel_controller.set_led_color(0, 0, 20)
else:
neopixel_controller.set_led_color(0, 0, 0) # LEDを消灯
# Slack通知用の処理
long_history.append(int(cm))
long_history.pop(0)
filtered_long_history = iqr_controller.remove_outliers(long_history)
# 350秒間の全ての値が35以下だったらSlack通知
if all(elem <= 35 for elem in filtered_long_history) and not isFull:
print('send slack full')
notifier.send_slack(url, message_text_full)
isFull = True
# 35以下が解消されたときのみSlack通知
if all(elem > 35 for elem in filtered_long_history) and isFull:
print('send slack empty')
notifier.send_slack(url, message_text_empty)
isFull = False
utime.sleep(0.2) # 0.2秒間隔で計測
else:
neopixel_controller.set_led_color(0, 0, 0) # LEDを消灯
main.py以外のソースはこちらです
超音波センサー(HC-SR04)の処理
距離の計測にはHC-SR04を使用しています。
triggerやechoの値をコンストラクターで受け取っています。(ハンダ付けしてしまえば変更する値ではないんですが、ソースコード流用を考慮して)
こちらを参考にしました。
from machine import Pin
import utime
class HC_SR04:
def __init__(self, trigger_pin, echo_pin):
self.trigger = Pin(trigger_pin, Pin.OUT)
self.echo = Pin(echo_pin, Pin.IN)
# HC-SR04で距離を測定する
def read_distance(self):
self.trigger.low()
utime.sleep_us(2)
self.trigger.high()
utime.sleep_us(10)
self.trigger.low()
while self.echo.value() == 0:
signal_off = utime.ticks_us()
while self.echo.value() == 1:
signal_on = utime.ticks_us()
time_passed = signal_on - signal_off
distance = (time_passed * 0.0343) / 2
return distance
外れ値の除外処理
IQR (四分位範囲)を使った外れ値の除去
class OutlierRemover:
def __init__(self, k=1.5):
self.k = k
def find_iqr(self, data):
sorted_data = sorted(data)
q1_index = len(sorted_data) // 4
q3_index = (3 * len(sorted_data)) // 4
q1 = sorted_data[q1_index]
q3 = sorted_data[q3_index]
iqr = q3 - q1
return iqr, q1, q3
def remove_outliers(self, data):
iqr, q1, q3 = self.find_iqr(data)
lower_bound = q1 - self.k * iqr
upper_bound = q3 + self.k * iqr
filtered_data = [x for x in data if lower_bound <= x <= upper_bound]
return filtered_data
NeoPixcel(LED)の処理
コンストラクターにはLEDの個数と制御のGPIOを受け取っています。
試作段階では短いNeoPixcelを使用していたため、LED個数は何度も変更する必要があった値です。
import array
from machine import Pin
import rp2
import utime
class NeoPixelController:
def __init__(self, num_leds, led_pin):
self.NUM_LEDS = num_leds
self.LED_PIN = led_pin
# StateMachineで正確なパルスを出力するための設定
self.sm = rp2.StateMachine(
0, self.ws2812, freq=8_000_000, sideset_base=Pin(self.LED_PIN))
self.sm.active(1)
# LEDのRGB値を指定する24bit(8bit x 3)の配列をNeoPixelの数で準備
self.ar = array.array("I", [0 for _ in range(self.NUM_LEDS)])
# NeoPixel(WS2812)LEDに正確な制御信号を送信するためのカスタムアセンブリコードを定義
@rp2.asm_pio(sideset_init=rp2.PIO.OUT_LOW, out_shiftdir=rp2.PIO.SHIFT_LEFT, autopull=True, pull_thresh=24)
def ws2812():
T2 = 5
T1 = 2
T3 = 3
wrap_target()
label("bitloop")
out(x, 1) .side(0)[T3 - 1]
jmp(not_x, "do_zero") .side(1)[T1 - 1]
jmp("bitloop") .side(1)[T2 - 1]
label("do_zero")
nop() .side(0)[T2 - 1]
wrap()
def _drawpix(self, num, r, g, b):
self.ar[num] = g << 16 | r << 8 | b
# 流れるように点灯
def color_walk(self):
for i in range(0, self.NUM_LEDS * 3):
for j in range(0, self.NUM_LEDS):
if j < 3:
self._drawpix((i+j) % self.NUM_LEDS, 5, 5, 5)
else:
self._drawpix((i+j) % self.NUM_LEDS, 255, 0, 0)
self.sm.put(self.ar, 10)
utime.sleep(0.05)
# ゆっくり点滅
def color_flash(self):
for i in range(0, self.NUM_LEDS * 2):
for j in range(0, self.NUM_LEDS):
k = i % 10
self._drawpix(j % self.NUM_LEDS, k ** 2, 0, 0)
self.sm.put(self.ar, 10)
utime.sleep(0.05)
# 常時点灯
def set_led_color(self, red, green, blue):
for i in range(0, self.NUM_LEDS):
self._drawpix(i, red, green, blue)
self.sm.put(self.ar, 10)
Slackの送信処理
Raspberry Pi Pico Wになったため、追加したモジュールになります。
本体起動時(インスタンス化時)にWi-Fiの接続をしますが、起動から送信までにWi-Fiの接続が切れる可能性もあるため、Slack送信前に再接続の処理を挟んでいます。
import time
import network
import machine
from machine import Pin
import urequests
import ujson
class SlackNotifier:
def __init__(self, ssid, password):
self.ssid = ssid
self.password = password
self.connected = self._connect_to_wifi(ssid, password)
def _connect_to_wifi(self, ssid, password):
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print('Connecting to network...')
wlan.connect(ssid, password)
# 接続が完了するまで待機
while not wlan.isconnected():
time.sleep(1)
print('Network config:', wlan.ifconfig())
return wlan.isconnected()
def _send_request_with_retry(self, url, message, headers, max_retries=3, timeout=10):
retries = 0
while retries < max_retries:
try:
res = urequests.post(
url,
data=ujson.dumps(message).encode("utf-8"),
headers=headers,
timeout=timeout
)
return res
except OSError as e:
print("Error:", e)
print("Retrying... (attempt {}/{})".format(retries + 1, max_retries))
retries += 1
time.sleep(1)
return None
def send_slack(self, url, message_text):
if not self.connected:
# 途中で切断した場合は再接続を試みる
print("Trying to reconnect...")
self.connected = self._connect_to_wifi(self.ssid, self.password)
if not self.connected:
print("Failed to reconnect to Wi-Fi, cannot send Slack message.")
return
# jsonデータで送信するという事を明示的に宣言
header = {'Content-Type': 'application/json'}
# 送信するメッセージ
message = {"text": message_text}
# HTTPリクエストをPOSTとして送信
res = self._send_request_with_retry(url, message, header)
if res is not None:
print("Request successful:", res.status_code)
res.close()
else:
print("Request failed after retries.")
実装ポイント① : 計測した距離のキューイング(history)
計測した距離をそのまま判定してしまうと、測定のたびにLEDの発光が変わる可能性があります。(例えば、ゴミを捨てた一瞬の値を拾って点灯してしまう)
そのため、3秒間の値をキューイングして全ての値が基準を満たす場合にLED点灯をしています。
実装ポイント② : 外れ値対応
Raspberry Pi Picoでは通常のPythonが使用できないため、NumPyのようなライブラリが利用できません。
そのため、外れ値を取り除くために四分位範囲を用いた処理を書きました。
外れ値ではない範囲を狭くするためにパラメータを0.25に設定するようmain.pyから値を渡しています。
実装ポイント③ : Slack通知の重複送信対応(long_history)
ゴミ箱が満杯になってLED点灯するタイミングとSlack送信のタイミングを変えています。ゴミを捨てるために腕を入れたりゴミ袋交換時に満杯と判断することがあるためです。
約6分間、満杯と判定され続けた場合にSlack通知するようにしています。(まずは6分間としていますが、これが正しいタイミングか否かはSlack通知の頻度で修正予定です)
また、満杯のみの条件では満杯である限り送信が続いてしまうためisFull
のフラグでも送信タイミングの制御をしています。
電子工作の説明
工作ポイント① : Raspberry Pi Pico W
ポイントと言うほどではないんですが、ハンダ付け後に熱収縮チューブでカバーしてゴミ箱に設置しています。
デバッグ用にBOOTSELボタンのみ露出させています。
工作ポイント② : LED点灯不具合の対応
ゴミ箱に設置してみると、LEDの点灯が期待通りに点灯しないケースがありました。
HC-SR04センサーの固定が不安定になっているようでしたので、センサーに台座を取り付けて固定を安定させました。台座取り付け前はこちらです。
まとめ
本記事ではソースコードの紹介と改良点について説明させていただきました。
1つ1つの機能は沢山の方たちが記事にしていたので、真新しい機能はないかもしれませんが参考になれば嬉しいです。
ラズパイを触りはじめて数週間ですが、どっぷりと沼に足を踏み入れた感じがしています。センサーから取得した値の処理(いわゆるプログラミング)だけでなく、物理的な固定方法などの工夫も必要でプロダクトデザインにも興味が湧いてきています。
今後もゴミ箱センサーの改良を加えながら、開発を続けていきたいと思います。
We're Hiring!
最後までお読みくださり、ありがとうございます。
現在 株式会社DELTA では様々な形で一緒に働く仲間を募集中です。
そんな方はぜひこちらから、お気軽にご連絡ください!
-
Raspberry Pi PicoとRaspberry Pi Pico Wではファームウェアが違うため、私は30分ほど溶かしてしまいました。記事にはちゃんと書いてあったので読んでいなかっただけです。。。 ↩︎
Discussion