Raspberry Piで測定したCO2濃度をAWS IoT Core経由でTimestreamに保存しGrafana Cloudで可視化
前回、Raspberry Pi Zero WHにCO2センサとLCDディスプレイを繋いで、室内のCO2濃度、温度、湿度を測定して表示しました。
今回は測定データを
- AWS IoT Coreに送信し
- AWS IoT Coreがデータ受信イベント発生時にAWS Lambdaを呼び出し
- AWS LambdaがAmazon Timestreamにデータを保存する
ようにします。そして、
- Grafana Cloud上で、Timestreamの時系列データを可視化する
ようにします。
では、やっていきましょう。
AWS IoT Coreでモノを作成
「モノ」ってなんやねんw まあ単純に「Things」の訳なのでしょうが。「ブツ」って訳すとヤバげですしね。物理的なデバイスだけではなくて論理的なモノも含めるからモノって表現とするしかないのでしょう。ただ、AWSコンソール上ではモノとデバイスとの表現がごちゃまぜですが…
ここではRasbrerry Piの物理デバイスのデータの送信先としてモノを、以下の手順で作成します。
- AWSコンソールのAWS IoT Coreの画面を表示します。
- 画面左のナビゲーション・ペインから
管理-すべてのデバイス-モノを選択します。 右側のモノを作成ボタンをクリックします。
-
モノを作成画面で1 つのモノを作成を選択し次へをクリックします。
-
モノのプロパティを指定画面のモノの名前にraspberrypi-room-conditionと入力し次へをクリックします。
-
デバイス証明書を設定画面で新しい証明書を自動生成を選択し次へをクリックします。
-
証明書にポリシーをアタッチ画面でポリシーを作成をクリックします。
- 別タブで
ポリシーを作成画面が表示されます。ポリシー名に``raspberrypi-room-condition`と入力します。
-
ポリシードキュメントでJSONをクリックし、ポリシードキュメントに以下を入力次へをクリックします。(XXXXXXXXXXXXはAWSアカウントIDです。リージョンは適宜変更します){ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "iot:Connect", "Resource": "arn:aws:iot:us-west-2:XXXXXXXXXXXX:client/raspberrypi-room-condition" }, { "Effect": "Allow", "Action": "iot:Publish", "Resource": "arn:aws:iot:us-west-2:XXXXXXXXXXXX:topic/room_condition" } ] } -
証明書にポリシーをアタッチ画面に戻り、ポリシーを作成ボタンの左の更新ボタンをクリックして作成したポリシーを選択しモノを作成ボタンをクリックします。 - 画面左で
設定を選択しデバイスデータエンドポイントの値(XXXXXXXXXXXXXX-XXX.iot.us-west-2.amazonaws.com)を控えておきます。
これでAWS IoT Core側の設定は完了です。
Raspberry Piから測定データをAWS IoT Coreへ送信
受信側(AWS IoT Core)の準備が出来たので、送信側(Raspberry Pi)からデータを送信する部分を実装していきます。
パッケージのインストール
Raspberry Piで以下のコマンドを実行して、AWS IoT SDK for Python v2 をインストールします。
sudo pip install awsiotsdk
証明書の格納
Raspberry Piで以下のコマンドを実行し、証明書等を格納するディレクトリを作成します。
mkdir certificates
証明書等をダンロードしたPCから、必要なファイルをRaspberry Piにコピーします。(XXXXXXXXXXXXXXXXXXXXの部分はダウンロードしたファイルに合わせます)
scp XXXXXXXXXXXXXXXXXXXX-certificate.pem.crt pi@raspberrypi.local:~/certificates
scp XXXXXXXXXXXXXXXXXXXX-private.pem.key pi@raspberrypi.local:certificates
scp AmazonRootCA1.pem pi@raspberrypi.local:certificates
データ送信を実装
前回作成したファイルから以下の部分を追加します。XXXXの部分は適宜修正します。(全体のコードはこちら)
#!/usr/bin/env python3
import time
import json
import sys
import digitalio
import board
# (中略)
from scd30_i2c import SCD30
from awscrt import io, mqtt, auth, http
from awsiot import mqtt_connection_builder
# Define ENDPOINT, CLIENT_ID, PATH_TO_CERTIFICATE, PATH_TO_PRIVATE_KEY, PATH_TO_AMAZON_ROOT_CA_1, MESSAGE, TOPIC, and RANGE
ENDPOINT = "XXXXXXXXXXXXXX-XXX.iot.us-west-2.amazonaws.com"
CLIENT_ID = "raspberrypi-room-condition"
PATH_TO_CERTIFICATE = "certificates/XXXXXXXXXXXXXXXXXXXX-certificate.pem.crt"
PATH_TO_PRIVATE_KEY = "certificates/XXXXXXXXXXXXXXXXXXXX-private.pem.key"
PATH_TO_AMAZON_ROOT_CA_1 = "certificates/AmazonRootCA1.pem"
TOPIC = "room_condition"
def init_display():
"""Initialize display, and return display object"""
# Configuration for CS and DC pins (these are PiTFT defaults):
# (中略)
text_color1 = "#D9E5FF"
# Spin up resources
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
mqtt_connection = mqtt_connection_builder.mtls_from_path(
endpoint=ENDPOINT,
cert_filepath=PATH_TO_CERTIFICATE,
pri_key_filepath=PATH_TO_PRIVATE_KEY,
client_bootstrap=client_bootstrap,
ca_filepath=PATH_TO_AMAZON_ROOT_CA_1,
client_id=CLIENT_ID,
clean_session=False,
keep_alive_secs=6
)
print("Connecting to {} with client ID '{}'...".format(ENDPOINT, CLIENT_ID), file=sys.stderr)
# Make the connect() call
connect_future = mqtt_connection.connect()
# Future.result() waits until a result is available
connect_future.result()
print("Connected!", file=sys.stderr)
# Setup SCD30
scd30 = SCD30()
measurment_interval_sec = 10
# (中略)
disp.image(image)
message = {"temperature" : temp, "co2": co2, "humidity": humi}
mqtt_connection.publish(topic=TOPIC, payload=json.dumps(message), qos=mqtt.QoS.AT_LEAST_ONCE)
print("Published: '" + json.dumps(message) + "' to the topic: '" + TOPIC + "'", file=sys.stderr)
time.sleep(measurment_interval_sec)
前回作成した自動起動スクリプトを以下のように書き換えます。
#! /bin/sh
### BEGIN INIT INFO
# Provides: scd30d
# Required-Start: $remote_fs $syslog $network $named
# Required-Stop: $remote_fs $syslog
# Default-Start: 2 3 4 5
# Default-Stop:
# Short-Description: Measure room condition
### END INIT INFO
(cd /home/pi && ./measure_room_condition.py)
AWS側で受信できているか、以下のように確認します。
- AWSコンソール画面の左側で、
テスト-MQTT テストクライアントを選択します。 -
MQTT テストクライアント画面のトピックのフィルターにroom_conditionを入力しサブスクライブボタンをクリックします。
- 画面下部の
サブスクリプションで、10秒おきに受信データが表示されます。
IoT Coreでデータ受信時のAWS Lambda呼び出し
IoT Coreでデータ受信できたので、このデータを引数にAWS Lambdaを呼び出してみます。
ここでは、AWS LambdaのコードはServerless Frameworkを使ってデプロイします。
Serverless Frameworkのインストール
Serverless Frameworkのインストール方法はインターネット上に多数あるので、こちら等を参考にインストールしてください。
Serverless Frameworkのプロジェクト作成
以下のコマンドを実行して、プロジェクト用のディレクトリを作成します。
mkdir aws_lambda
cd aws_lambda/
以下のコマンドを使って、Serverless Frameworkのプロジェクトを作成します。
serverless create --template aws-python3
以下の2つのファイルが作成されます。
$ ls
handler.py serverless.yml
データ受信処理の実装
ファイルを編集して、AWS IoT Coreからのデータを受け取れるようにします。
import json
def hello(event, context):
print(event)
return {
"message": "Go Serverless v1.0! Your function executed successfully!",
"event": event
}
service: aws-lambda
provider:
name: aws
runtime: python3.8
region: us-west-2
functions:
hello:
handler: handler.hello
events:
- iot:
sql: "SELECT * FROM 'room_condition'"
デプロイ
以下のコマンドを実行してデプロイします。
$ serverless deploy
Deploying aws-lambda to stage dev (us-west-2)
✔ Service deployed to stack aws-lambda-dev (49s)
functions:
hello: aws-lambda-dev-hello (270 B)
CloudWatchのログに、温度、CO2濃度、湿度が表示されます。

Amazon Timestreamのデータベース、テーブルの作成
AWS Lambdaがデータを受け取れるようになったので、Lambdaのデータ送信先のTimestreamをセットアップします。
データベース、テーブルを以下の手順で作成します。
- AWSコンソールのTimestreamの画面を開きます。
- ナビゲーション・ペインで
Databaseを選択します。 -
Create databaseをクリックします。 -
create databaseページで、次のように操作します。-
Choose a configurationで、Standard databaseを選択します。 -
Nameでデータベース名を入力(例: RoomCondition)します。 -
Create databaseをクリックします。
-
- ナビゲーション・ペインで
Tablesを選択します。 -
Create tableをクリックします。 -
Create tableページで、以下のように操作します。-
Database nameで、作成したデータベースを選択します。 -
Table nameで、テーブル名を入力(例: conditions)します。 -
Data retentionで保存期間について、Memory store retention、Magnetic store retentionを設定します。 -
create tableをクリックします。
-
AWS LambdaからTimestreamに書き込み
Timestream側でテーブルの作成ができたので、Lambdaからデータを書き込みます。
IAMロールの作成
Timestreamのデータベースに書き込むための権限が必要なので、IAMロールを作成します。
- AWSコンソールのIAM画面を開きます。
- 画面の左のペインで
ロールを選択し、ロール画面でロールを作成ボタンをクリックします。
-
信頼されたエンティティを選択画面で信頼されたエンティティタイプはAWS のサービスを選択し、ユースケースでLambdaを選択し、次へボタンをクリックします。
-
許可を追加画面の許可ポリシーの検索ボックスでAmazonTimestreamFullAccessを入力し検索結果のポリシーにチェックをし次へボタンをクリックします。
-
名前、確認、および作成画面でロール名にlambda-timestream-roleを入力しロールを作成ボタンをクリックします。
コードの修正
Lambdaのファイルを修正して、Timestreamにデータを転送するようにします。
import boto3
import time
DatabaseName = 'RoomCondition'
TableName = 'conditions'
def current_milli_time():
return round(time.time() * 1000)
client = boto3.client('timestream-write', region_name='us-west-2')
def write_record(event, context):
print(event)
temperature = event['temperature']
co2 = event['co2']
humidity = event['humidity']
current_time = str(current_milli_time())
dimensions = [
{'Name': 'deviceId', 'Value': '1'},
]
co2_record = {
'Dimensions': dimensions,
'MeasureName': 'co2',
'MeasureValue': str(co2),
'MeasureValueType': 'DOUBLE',
'Time': current_time
}
temperature_record = {
'Dimensions': dimensions,
'MeasureName': 'temperature',
'MeasureValue': str(temperature),
'MeasureValueType': 'DOUBLE',
'Time': current_time
}
humidity_record = {
'Dimensions': dimensions,
'MeasureName': 'humidity',
'MeasureValue': str(humidity),
'MeasureValueType': 'DOUBLE',
'Time': current_time
}
records = [co2_record, temperature_record, humidity_record]
result = client.write_records(DatabaseName=DatabaseName, TableName=TableName, Records=records, CommonAttributes={})
return result
YAMLファイルも修正します。
service: aws-lambda
frameworkVersion: '3'
provider:
name: aws
runtime: python3.8
region: us-west-2
iam:
role: arn:aws:iam::XXXXXXXXXXXX:role/lambda-timestream-role
functions:
room_condition:
handler: handler.write_record
events:
- iot:
sql: "SELECT * FROM 'room_condition'"
XXXXXXXXXXXXはAWSのアカウントIDを設定します。
デプロイ
再度デプロイします。
serverless deploy
AWSコンソールのTimesteamの画面で、以下のようにテーブルにデータが書き込まれている事を確認します。
- AWSコンソールのTimestreamの画面を開きます。
- ナビゲーション・ペインで
管理ツール-クエリエディタを選択します。 -
クエリエディタ画面でデータベースでRoomConditionを選択し、Query 1に以下のクエリを入力し、実行ボタンをクリックします。SELECT BIN(time, 1m) as binned_time, avg(measure_value::double) as avg_co2, max(measure_value::double) as max_co2, min(measure_value::double) as min_co2 FROM "RoomCondition"."conditions" WHERE measure_name = 'co2' GROUP BY BIN(time, 1m) ORDER BY binned_time DESC
Grafana Cloudで可視化
Amazon Timestreamにデータを保存できたので、Grafana Cloudで測定データを可視化します。
アカウントの作成
Grafanaのサイトでfree acountでアカウントを作成します。
Timestreamへの接続先の調査
以下のコマンドを実行して、Timestreamの接続エンドポイントを調べます。リージョンは、適宜変更します。
aws timestream-query describe-endpoints --region us-west-2 | jq -r '.Endpoints[].Address'
Timestream用プラグインのインストール
Timestream用プラグインを、以下の手順にてインストールします。
- ダッシュボード画面を表示します。
- 画面の左下の、
Configration-Pluginsを選択します。
-
Configuration画面の検索ボックスでamazon timestreamを入力し、検索結果のAmazon Timestreamをクリックします。
-
Install via grafana.comをクリックします。
- 別タブで
Install pluginをクリックします。
Timestreamへの接続
Timestreamのプラグインが使えるようになったので、以下の手順にてTimestreamに接続する設定をします。
- 元の画面に戻って、画面の左下の、
Configration-Data sourcesを選択します。
-
Configuration画面でAdd data sourceをクリックします。
-
Add data source画面の検索ボックスでamazon timestreamを入力し、検索結果のAmazon Timestreamをクリックします。
-
Data Sources画面で必要事項を入力します。

Authentication Provider は、Access & secret key を選択し、以下のように設定し、Save & Testをクリックします。項目 値 Authentication Provider Access & secret key Access Key ID IAMユーザーのアクセスキーID Secret Access Key IAMユーザーのシークレットアクセスキー Assume Role ARN 空欄のまま External ID 空欄のまま Endpoint AWS CLIで調べたqueryエンドポイント Default Region us-west-2
ダッシュボードの作成
Timesteamにアクセスできるようになったので、以下の手順にて、ダッシュボードを作成して測定結果を表示するパネルを追加します。
-
Dashboards-+ New dashboardをクリックします。
-
Add a new panelをクリックします。
-
Edit Panel画面の下部でDatabaseはRoomConditionを選択し、Tableはconditionsを選択し、Measureはco2を選択します。

クエリ入力部に以下を入力し、右上のApplyをクリックします。SELECT time, measure_name, measure_value::double as co2 FROM "RoomCondition"."conditions" WHERE measure_name = 'co2' ORDER BY time - 同様に,、温度、湿度もパネルに追加すると以下のようになります。
Discussion