💡

ArduinoとRaspberry Piで心拍数と湿度に応じた照明制御を実現した

2024/12/13に公開

群馬イノベーションアワードのファイナル審査で使用するために、Arduino Uno R3とRaspberry Piを組み合わせ、心拍数と湿度のデータに基づいてPhilips Hue Goの照明色を制御するプロジェクト「KokoroColor」を開発しました。本記事では、その構成と設定方法について詳しく解説します。

プロジェクトの概要

KokoroColorは、Arduino Uno R3で取得した心拍数と湿度のデータをRaspberry Piに送信し、Raspberry Piがそのデータを解析してBLE(Bluetooth Low Energy)を介してPhilips Hue Goの色を制御するシステムです。

構成要素

  • arduino.ino: Arduino Uno R3用のコードで、センサーデータを取得し、シリアル通信でRaspberry Piに送信します。
  • main.py: Raspberry Pi上で動作するPythonスクリプトで、Arduinoから受信したデータを解析し、Philips Hue Goの色を制御します。

以下に、各コードの詳細を示します。

arduino.ino

#include <PulseSensorPlayground.h>
#include <DHT.h>
#include <DHT_U.h>
#define DHTPIN 2
#define DHTTYPE DHT22

DHT dht(DHTPIN, DHTTYPE);


int PulseSensorPurplePin = 0;  // Pulse Sensor PURPLE WIRE connected to ANALOG PIN 0
int Signal;                    // holds the incoming raw data. Signal value can range from 0-1024
int Threshold = 570;           // Determine which Signal to "count as a beat", and which to ignore.
int beforeaverage = 0;   
int sum = 0;                  // Initialize sum to 0
int fix2 = 0;                 // Initialize fix2 to 0
int sa = 0;                   // Initialize sa to 0


void setup() {
  Serial.begin(9600);  
  dht.begin();
}

void loop() {
  int ave, fix;

  int h = dht.readHumidity();

  sum++; // Increment sum each time loop is executed

  Signal = analogRead(PulseSensorPurplePin);  
  fix = Signal / 4;  // Scale down the signal

  Serial.print(fix);
  Serial.print(",");
  Serial.println(h);
  Serial.flush();
  delay(2000); // Small delay between readings
}

main.py

import serial
import threading
import time
import sys
import asyncio
from bleak import BleakClient, BleakScanner
from bleak.exc import BleakError

SERIAL_PORT = "/dev/ttyACM0"  # USBシリアルポート
BAUD_RATE = 9600

DEVICE_NAME = "Hue Go"  # 実際のデバイス名に置き換えてください

SERVICE_UUID = "932c32bd-0000-47a2-835a-a8d455b859dd"
CHARACTERISTIC_ON_OFF = "932c32bd-0002-47a2-835a-a8d455b859dd"
CHARACTERISTIC_COLOR = "932c32bd-0005-47a2-835a-a8d455b859dd"

RECONNECT_DELAY = 5.0  # 再接続前に待つ時間(秒)


def map_color(heart_rate: int, humidity: int) -> tuple:
    # 心拍数と湿度から対応するRGB色を返す
    if heart_rate >= 130:
        if humidity >= 90:
            return (255, 0, 0)    # 赤
        elif humidity >= 80:
            return (255, 69, 0)   # 赤橙
        elif humidity >= 70:
            return (255, 165, 0)  # 橙
        elif humidity >= 60:
            return (255, 215, 0)  # 黄橙
        else:
            return (0, 255, 0)
    elif heart_rate >= 100:
        if humidity >= 90:
            return (199, 21, 133) # 赤紫
        elif humidity >= 80:
            return (255, 105, 180)# パステル赤
        elif humidity >= 70:
            return (255, 182, 193)# パステル橙
        elif humidity >= 60:
            return (255, 255, 0)  # 黄
        else:
            return (0, 255, 0)
    elif heart_rate >= 70:
        if humidity >= 90:
            return (128, 0, 128)   # 紫
        elif humidity >= 80:
            return (173, 216, 230) # パステル青
        elif humidity >= 70:
            return (144, 238, 144) # パステル緑
        elif humidity >= 60:
            return (0, 255, 127)   # きみどり
        else:
            return (0, 255, 0)
    elif heart_rate >= 40:
        if humidity >= 90:
            return (138, 43, 226)  # 青紫
        elif humidity >= 80:
            return (0, 0, 255)     # 青
        elif humidity >= 70:
            return (0, 255, 255)   # 青緑
        elif humidity >= 60:
            return (0, 255, 0)     # 緑
        else:
            return (0, 255, 0)
    else:
        return (0, 255, 0)


async def scan_for_device(name: str) -> str:
    #特定の名前を持つBLEデバイスをスキャンし、アドレスを返す
    print(f"Scanning for BLE devices named '{name}'...")
    devices = await BleakScanner.discover(timeout=10.0)
    for device in devices:
        print(f"Found device: {device.name}, Address: {device.address}")
        if device.name == name:
            print(f"Device '{name}' found with address: {device.address}")
            return device.address
    print(f"Device '{name}' not found.")
    return None


def read_serial(loop: asyncio.AbstractEventLoop, data_queue: asyncio.Queue):
    #シリアルポートからデータを読み取り、キューに格納するスレッド関数
    try:
        ser = serial.Serial(SERIAL_PORT, BAUD_RATE, timeout=1)
        print("Serial port connected.")
        while True:
            if ser.in_waiting > 0:
                line = ser.readline().decode('utf-8').strip()
                if not line:
                    continue
                parts = line.split(',')
                if len(parts) < 2:
                    print(f"Invalid data format: {line}")
                    continue
                try:
                    heart_rate = int(parts[0])
                    humidity = int(parts[1])
                    asyncio.run_coroutine_threadsafe(
                        data_queue.put((heart_rate, humidity)),
                        loop
                    )
                except ValueError:
                    print(f"Invalid data values: '{line}'")
            time.sleep(0.1)
    except serial.SerialException as e:
        print(f"Serial error: {e}")
    except KeyboardInterrupt:
        if 'ser' in locals() and ser.is_open:
            ser.close()
        sys.exit()


class HueGoController:
    def __init__(self, address: str, data_queue: asyncio.Queue):
        #Hue Goデバイス制御用クラス
        self.address = address
        self.data_queue = data_queue
        self.last_color = (0, 255, 0)  # 初期色(例)
        self._disconnected_event = None

    async def fade_to_color(self, client: BleakClient, old_color: tuple, new_color: tuple, steps: int = 30, interval: float = 0.05):
        #旧色から新色へ徐々にフェードする
        old_r, old_g, old_b = old_color
        new_r, new_g, new_b = new_color

        for i in range(1, steps + 1):
            r = int(old_r + (new_r - old_r) * (i / steps))
            g = int(old_g + (new_g - old_g) * (i / steps))
            b = int(old_b + (new_b - old_b) * (i / steps))

            color_command = bytearray([1, r, g, b])
            try:
                await client.write_gatt_char(CHARACTERISTIC_COLOR, color_command, response=False)
            except BleakError as e:
                print(f"Failed to update color in fade: {e}")
                break
            await asyncio.sleep(interval)

    def disconnected_callback(self, _client: BleakClient):
        #デバイス切断時に呼ばれるコールバック
        print("Device disconnected!")
        if self._disconnected_event:
            self._disconnected_event.set()

    async def connect_and_run(self):
        #デバイスに接続し、シリアルデータに応じて色をフェードで変更する  
        #切断時は自動再接続を試みる
        while True:
            try:
                async with BleakClient(self.address) as client:
                    connected = await client.is_connected()
                    if not connected:
                        print("Failed to connect to the device.")
                        await asyncio.sleep(RECONNECT_DELAY)
                        continue

                    print(f"Connected to {DEVICE_NAME}")

                    self._disconnected_event = asyncio.Event()
                    client.set_disconnected_callback(self.disconnected_callback)

                    while not self._disconnected_event.is_set():
                        heart_rate, humidity = await self.data_queue.get()
                        print(f"Received data - Heart Rate: {heart_rate}, Humidity: {humidity}")
                        new_color = map_color(heart_rate, humidity)
                        await self.fade_to_color(client, self.last_color, new_color, steps=50, interval=0.05)
                        self.last_color = new_color

                # withを抜けた時点で接続がなくなったため再接続試行
                print("Connection lost, attempting to reconnect...")

            except BleakError as e:
                print(f"BLE connection error: {e}")

            await asyncio.sleep(RECONNECT_DELAY)


async def main():
    data_queue = asyncio.Queue()
    loop = asyncio.get_running_loop()

    # シリアル読み取りスレッドの起動
    serial_thread = threading.Thread(
        target=read_serial,
        args=(loop, data_queue),
        daemon=True
    )
    serial_thread.start()

    # デバイススキャン
    address = await scan_for_device(DEVICE_NAME)
    if not address:
        print("Device not found. Exiting.")
        return

    # HueGoコントローラ生成・実行
    controller = HueGoController(address, data_queue)
    await controller.connect_and_run()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("Exiting.")

Raspberry Piでの自動起動設定

Raspberry Piの起動時にmain.pyを自動的に実行するには、systemdを使用します。以下の手順で設定を行います。

  1. サービスファイルの作成:

    sudo vim /etc/systemd/system/kokorocolor.service
    

    以下の内容を入力します。

    [Unit]
    Description=KokoroColor Service
    
    [Service]
    ExecStart=/usr/bin/python3.11 /home/pi/path/to/main.py
    
    [Install]
    WantedBy=multi-user.target
    

    ExecStartのパスはmain.pyの実際のパスに置き換えてください。

  2. サービスの有効化と起動:

    sudo systemctl enable kokorocolor.service
    sudo systemctl start kokorocolor.service
    

    これで、Raspberry Piの起動時にmain.pyが自動的に実行されるようになります。

詳細は、以下の記事を参考にしてください。
Zenn記事: systemdを使ったRaspberry Piの自動起動設定

Philips Hue GoとのBLE接続

Philips Hue GoとBLE接続する際、接続が切断される問題が報告されています。この問題を回避するため、以下の手順でデバイスを設定します。pairは他のデバイスで過去に接続したことがある場合、一度hueアプリからリセットをかけた後に実行する必要があります。

  1. デバイスのスキャン:

    bluetoothctl
    scan on
    

    目的のデバイスのMACアドレスを確認します。

  2. デバイスのペアリング:

    pair <MACアドレス>
    
  3. デバイスへの接続:

    connect <MACアドレス>
    
  4. デバイスの信頼設定:

    trust <MACアドレス>
    

これらの手順により、接続の安定性が向上します。詳細は、以下のStack Overflowの投稿を参照してください。
Can't send command to BLE device (Philips Hue bulb): Connection drops


まとめ

KokoroColorは、心拍数と湿度のデータに基づいて照明の色を動的に変化させることで、環境や感情に合った照明演出を可能にします。ArduinoとRaspberry Piを組み合わせたIoTプロジェクトとして、個人プロジェクトや教育用途に適しているだけでなく、プレゼンテーションや展示会での効果的なデモにも使用できます。

ぜひ、このプロジェクトを参考に、独自のアイデアを実現してみてください!ご質問や改善案があれば、気軽にコメントをお寄せください。(この記事はgpt-4oで作成されました。)


リンクと参考資料

Discussion