📚

ラズパイzeroによる3Dプリンタ監視システム

2021/09/16に公開

3Dプリンタを稼働させっぱなしで寝ることがあったけど、なんか不安だな~と思ってたので、プリント完了したら電源を自動で切ってくれる監視システムを作ることにしました。

1. システム構成


OS:Raspberry Pi OS Lite
python 3.7.3

ラズパイの初期設定のやり方は以下の記事などを参考にしてください。
https://qiita.com/shi78ge/items/cb3887096e8f5a2a1c3a
今回はOSにLiteを選んだため、CUIによる操作しかできません。
なのでTera Term経由でラズパイを操作しています。
OSはLiteじゃない普通のやつでもいいですが、私の感覚的にはラズパイzeroをGUIで運用するのはスペック的に厳しい印象です。

2. 動作の様子

https://youtu.be/wxJkGfRjW0c

3. 各種要素の入手先

本システムで使用している各種技術要素の入手先をざっと紹介します。
このあとで実際のコードを掲載するので、そこで各要素をどのように使っているか、コメントで記載します。

3.1 LINE Notify登録

こちらの記事を参考にして登録しました。
https://qiita.com/moriita/items/5b199ac6b14ceaa4f7c9
この方がGithubで公開しているコードもそのまま使わせてもらっています。

3.2 Tapo P105 遠隔操作方法

こちらの記事に、ライブラリのダウンロード先や、サンプルコードが載っています。
https://guminote.sakura.ne.jp/archives/799
※pipで入手するように記載されているライブラリについては、私の環境ではapt-getで入手したほうがうまくいったものがあります。それについてはあとで書きます。

Tapo P105のIPアドレスは自動取得のため、電源入れる度に変わってしまいます。
遠隔操作する上では固定にしてるほうが都合がいいです。
我が家ではTP-Link製のルーターを使っていますが、それだと以下のようにルーター側の設定でIPアドレスの固定化ができました。

①詳細設定→②ネットワーク→③DHCPサーバー
 →④クライアントリストでTapo P105のMACアドレスを確認
 →⑤アドレス予約欄に④で確認したMACアドレスを入力し、予約済みIPアドレス欄に固定化したいアドレスの値を入力する

3.3 USBマイクによる騒音計測

こちらの記事のコードをほぼそのまま使わせて頂きました。
https://hotsmmrblog.com/noise-level-measurement_with_raspberry_pi_and_usb_mic/
ただし、プリント中の騒音を計測したら結構ばらつきがあったので、移動平均処理を入れ、平均化したほうの値で制御しました。
移動平均の実装には、こちらの記事の最後に出てくるアルゴリズムを使わせてもらいました。
https://ehbtj.com/electronics/average-online-algorithm/

3.4 WEBカメラ映像のストリーミング

mjpg-streamerを使いました。これはググれば色々出てきますが、こちらのサイトなどを参考にしました。
https://www.raspberrypirulo.net/entry/mjpg-streamer

5項で掲載するプログラムでは、Pythonからmjpg-streamerを起動するスクリプト(start.sh)を実行しています。
この時にバックグラウンド実行のオプションをつけておかないと、プログラムがそれ以降に進んでくれません。
オプションの付け方は、start.shがあるディレクトリまで移動し、以下のコマンドでnanoエディタでstart.shを開きます。

$ sudo nano start.sh

すると以下のような画面が出てきます。

頭に#がついた文はコメントで、一箇所#がついてない白字の./mjpg_streamerで始まる文があります。これが実際に実行されるコマンドです。
これの末尾に「-b」を追記すればバックグラウンド実行になります。

このような感じ。

ちなみにひとつ前の画像で「cd dirname $0」という文もあり、これも自分で追記したものです。これは、どのディレクトリからでもスクリプトの実行に失敗しなくなる、みたいな効果があるみたいですが、もしかしたらなくてもいいかもしれません。うまくいかない場合に追記してみてください。詳しくは以下の記事参照。
https://qiita.com/bami3/items/3f56ca29ce2df512eb8e

スクリプトファイルに実行権限もつけなきゃいけないかも。ちょっと忘れましたが、実行エラーになる場合は先にstart.shのディレクトリで先に以下のコマンドを打ってみてください。

$ chmod 777 start.sh

3.5 ライブラリの入れ方

ここまで紹介してきたことを実践するには以下のライブラリをインストールする必要があります。
①requests …LINE NotifyとP105操作
②pycryptodome …P105操作
③pkcs7 …P105操作
④pyaudio …USBマイク
⑤numpy …USBマイク

このうち②はpipで入れましたが、①③④⑤はapt-getで入れました。
最初はpipで入れたのですが、importが通らないということがありました。
それぞれ以下のコマンドで入れました。

①sudo apt-get install python3-requests
②pip3 install pycryptodome
③sudo apt-get install python3-pkcs7
④sudo apt-get install python3-pyaudio
⑤sudo apt-get install python3-numpy

mjpg-streamerはリンク先通りの入れ方で問題ないかと思います。

4. ディレクトリ構成

5. ソースコード

3Dprinter_kanshi.py
import pyaudio
import numpy as np
import sys
import time
from line_notify_bot import LINENotifyBot
from PyP100 import PyP100
import logging
import os

#mjpeg-streamerの起動
os.system("/home/pi/mjpg-streamer/mjpg-streamer/mjpg-streamer-experimental/start.sh")

#↓騒音計測データのログを取りたい場合にコメントアウトを解除してください
#logging.basicConfig(filename="app.log",level=logging.DEBUG)

#LINE Notify設定。your_tokenのところに取得したアクセストークンをコピペしてください。
bot = LINENotifyBot(access_token='your_token')

#Tapo P105設定。P100()の中にP105のIPアドレス、TP-Linkのアカウント、パスワードの順番で入力してください。
p105 = PyP100.P100('IP_address', 'account', 'pass') # Creating a P105 plug object
p105.handshake() # Creates the cookies required for further methods
p105.login() # Sends credentials to the plug and creates AES Key and IV for further methods
p105.getMacAddress() # Get MacAddress and Set terminalUUID

#ここから騒音計測のコード
p = pyaudio.PyAudio()

# set prams
#INPUT_DEVICE_INDEX = int(sys.argv[1])
INPUT_DEVICE_INDEX = 0
CHUNK = 1024
FORMAT = pyaudio.paInt16
CHANNELS = int(p.get_device_info_by_index(INPUT_DEVICE_INDEX)["maxInputChannels"])
SAMPLING_RATE = int(p.get_device_info_by_index(INPUT_DEVICE_INDEX)["defaultSampleRate"])
RECORD_SECONDS = 1

# amp to db
def to_db(x, base=1):
    y=20*np.log10(x/base)
    return y

def sampling():
    stream = p.open(format=FORMAT,
                    channels=CHANNELS,
                    rate=SAMPLING_RATE,
                    input=True,
                    frames_per_buffer=CHUNK,
                    input_device_index=INPUT_DEVICE_INDEX
                    )

    # get specified range of data. size of data equals (CHUNK * (SAMPLING_RATE / CHUNK) * RECORD_SECONDS)
    data = np.empty(0)
    for i in range(0, int(SAMPLING_RATE / CHUNK * RECORD_SECONDS)):
        elm = stream.read(CHUNK, exception_on_overflow=False)
        elm = np.frombuffer(elm, dtype="int16") / float((np.power(2, 16) / 2) - 1)
        data = np.hstack([data, elm])
    # calc RMS
    rms = np.sqrt(np.mean([elm * elm for elm in data]))
    # RMS to db
    db = to_db(rms, 20e-6)
    stream.close()
    return db
#騒音計測コード終わり


# main loop
def main():

    
    N = 10  #移動平均タップ数
    cnt = 0 #カウント変数
    cnt_2 = 0 #Nがたまるまで使うカウント変数
    sum = 0 #合計用
    data = np.zeros([N]) #データ格納用リスト

    sound_th = 50 #印刷中判定 音量閾値
    watch = 0 #監視ステータスフラグ(0:監視前、1:監視開始、2:プリンタシャットダウン待ち、3:ラズパイシャットダウン待ち)
    timer_stop = 0 #印刷終了条件判定タイマ
    timer_shutdown = 0 #シャットダウン判定タイマ
    shutdown = 60 #シャットダウン判定時間
    timer_Rshutdown = 0  # ラズパイシャットダウン判定タイマ


    while True:

        #ここから移動平均の演算
        if cnt == N :
            cnt = 0

        sum -= data[cnt]
        data[cnt] = sampling()
        raw_data = data[cnt]
        sum += data[cnt]
        cnt +=1
        cnt_2 +=1

        if cnt_2 > N:
            cnt_2 =N

        if cnt_2 < N :
            ave_data = sum / cnt_2
        else :
            ave_data = sum / N
        #移動平均演算終わり

        #デバッグ用。ave_dataが移動平均あり。raw_dataが移動平均なしの生値
        print("cnt:{}, ave_data:{:.3f}[db], raw_data:{:.3f}[db]".format(cnt, ave_data, raw_data))

        #ログを取りたい場合は以下のコメントアウトを解除
        #logging.debug("cnt:{}, ave_data:{:.3f}[db], raw_data:{:.3f}[db]".format(cnt, ave_data, raw_data))


        #ここから条件分岐
        if ave_data >= sound_th :
            if  watch == 0:
                watch = 1 #監視がスタートしていない場合、監視スタートする
                print("watch status =", watch)
                bot.send(message='監視スタートします。')

            else :
                timer_stop = 0 #タイマリセット

        if ave_data < sound_th :
            if watch == 1 :
                timer_stop += 1 #監視有効状態で、音量が閾値を下回っていたらカウントアップ
                print("timer_stop :", timer_stop)

        if timer_stop >= 60 :
            watch = 2 #閾値未満が約1分間継続したら監視ストップ → シャットダウン待機モード
            print("watch status =", watch)
            bot.send(message='印刷が終了しました。1分後にシャットダウンします。')
            timer_stop = 0

        if watch == 2 :
            timer_shutdown += 1
            print("timer_shutdown :", timer_shutdown)

        if timer_shutdown >= shutdown :
            bot.send(message='プリンタをシャットダウンします。')
            p105.turnOff() #スマートプラグ電源OFF
            watch = 3 #ラズパイシャットダウン待機モード
            bot.send(message='1分後にラズパイもシャットダウンします。')
            timer_shutdown = 0

        if watch == 3 :
            timer_Rshutdown += 1
            print("timer_Rshutdown :", timer_Rshutdown)

        if timer_Rshutdown > 60 :
            os.system("sudo shutdown -h now")

try:
    main()
except KeyboardInterrupt:
    pass
finally:
    p.terminate()

6. 自動起動の設定

あとはラズパイ起動時に監視プログラムが自動起動してくれたら便利です。
こちらの記事を参考に、crontab @rebootの方法を使いました。
https://qiita.com/karaage0703/items/ed18f318a1775b28eab4

@reboot                  python3 /home/pi/3Dprinter_kanshi.py

7. まとめ

今回初めて、実用に耐えうるIoT的なものができた気がします。
もし真似して頂ける方がいれば、フィードバック頂けるとありがたいです。

Discussion