時系列データ処理についての調査(TimescaleDB)
需要の予測にも使えるってのは考えてなかったなあ。
Timescale にパフォーマンスの測定結果。
AWS Timestreamは多数のIoTデバイスからのINSERTに強いが、少数のサーバからの多量のStreamingデータの書き込みには弱そう。
AWS EC2でTimescaleDBを稼働させる。
基本的に、以下に従う。
AMIは「TimescaleDB 2.6.0 (PostgreSQL 13) - Ubuntu 20.04 (EBS Backed)」を使用。
起動後はSSHでログインして、コンフィグレーション。
sudo timescaledb-tune
# 問い合わせは、全部y
設定変更後に、サービスを再起動
sudo systemctl restart postgresql.service
PostgreSQLに接続
sudo -u postgres psql
tsdbというDatabaeを作成して、TimescaleDB拡張を追加する。
CREATE database tsdb;
\c tsdb
CREATE EXTENSION IF NOT EXISTS timescaledb;
追加結果の確認
tsdb=# \dx
List of installed extensions
Name | Version | Schema | Description
-------------+---------+------------+-------------------------------------------------------------------
plpgsql | 1.0 | pg_catalog | PL/pgSQL procedural language
timescaledb | 2.6.0 | public | Enables scalable inserts and complex queries for time-series data
(2 rows)
Databaseアクセス用のユーザ(例: iot)を作成。
ALTER USER iot WITH PASSWORD 'パスワード';
権限をiotに設定。
GRANT ALL PRIVILEGES ON DATABASE tsdb TO iot;
GRANT ALL PRIVILEGES ON conditions TO iot;
TimescaleDBへのINSERTとクエリ
TimescaleDBにテーブルを作成して、データをINSERTして、検索してみる。
CREATE TABLE conditions (
time TIMESTAMPTZ NOT NULL,
device_id INTEGER NOT NULL,
co2 DOUBLE PRECISION NULL,
temperature DOUBLE PRECISION NULL,
humidity DOUBLE PRECISION NULL
);
TimescaleDBのハイパーテーブルに変更。
SELECT create_hypertable('conditions', 'time');
データを挿入
INSERT INTO conditions VALUES('2022-04-04 12:00:00.000+00', 1, 500, 27, 50),
('2022-04-04 12:01:00.000+00', 1, 700, 28, 51),
('2022-04-04 12:02:00.000+00', 1, 700, 29, 52),
('2022-04-04 12:03:00.000+00', 1, 700, 30, 53),
('2022-04-04 12:04:00.000+00', 1, 800, 28, 54),
('2022-04-04 12:05:00.000+00', 1, 750, 27, 55),
('2022-04-04 12:06:00.000+00', 1, 700, 26, 53),
('2022-04-04 12:07:00.000+00', 1, 650, 25, 52),
('2022-04-04 12:08:00.000+00', 1, 600, 24, 51),
('2022-04-04 12:09:00.000+00', 1, 500, 23, 50),
('2022-04-04 12:10:00.000+00', 1, 550, 25, 49),
('2022-04-04 12:11:00.000+00', 1, 600, 26, 50),
('2022-04-04 12:12:00.000+00', 1, 650, 27, 53),
('2022-04-04 12:13:00.000+00', 1, 700, 29, 55);
5分ごとの平均を求める
tsdb=# SELECT time_bucket('5 minutes', time) AS five_min,
tsdb-# avg(co2), avg(temperature), avg(humidity)
tsdb-# FROM conditions
tsdb-# GROUP BY five_min ORDER BY five_min;
five_min | avg | avg | avg
------------------------+-----+-------+-------
2022-04-04 12:00:00+00 | 680 | 28.4 | 52
2022-04-04 12:05:00+00 | 640 | 25 | 52.2
2022-04-04 12:10:00+00 | 625 | 26.75 | 51.75
(3 rows)
Grafanaを使って可視化
以下を参考にしながら
PostgreSQL側でDBにアクセスするためのuserを作成しておく。
CREATE USER ユーザ名 WITH PASSWORD 'パスワード';
GRANT ALL PRIVILEGES ON DATABASE tsdb TO ユーザ名;
Grafanaのインストール
ここに従う
sudo apt-get update
sudo apt-get install -y adduser libfontconfig1
wget https://dl.grafana.com/oss/release/grafana_8.4.5_amd64.deb
sudo dpkg -i grafana_8.4.5_amd64.deb
サービスの起動
sudo systemctl start grafana-server
SSHポートフォワードでトンネリング
ssh -i .ssh/xxx-key-pair.pem -L 3000:localhost:3000 ubuntu@xxx.xxx.xxx.xxx
以下のURLにアクセス
http://localhost:3000/
デフォルトのusername、passwordは admin
で、ログイン後にパスワードの変更を求められる。
Configuration の Datasource をクリック。
PostgreSQLを選択する。
Host、Database、User、Passwordを入力、TLS/SSL Modeはdisableに、PostgreSQL detailのVersionはTimescaleDBのベースになっているVersionを指定し、Save & Test
をクリックする。
Create の Dashboard を選択。
Add a new panel を選択。
下部の Data source で PostgreSQL を選択し Column を選択して、右上の Apply をクリック。
上部の Save のアイコンをクリックして、ダイアログで Dashboard name を入力して、Saveをクリック
HTTP経由でTimescaleDBにINSERT
HTTPサーバは、node.jsでサーバを立てる。
npm init -y
package.jsonを以下のように編集。
{
"name": "dbproxy",
"version": "1.0.0",
"description": "",
"main": "src/main.js",
"scripts": {
"start": "sudo node ./src/main.js",
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "HORIE Tetsuya",
"license": "ISC",
"dependencies": {
"pg": "^8.7.3"
}
}
src/main.js を以下の通り作成。
const { Client } = require("pg");
var http = require('http');
var server = http.createServer(async (req, res) => {
const buffers = [];
for await (const chunk of req) {
buffers.push(chunk);
}
const data = JSON.parse(Buffer.concat(buffers).toString());
await console.log(data);
const client = new Client({
user: "ユーザ名",
host: "127.0.0.1",
database: "tsdb",
password: "パスワード",
port: 5432,
});
const sql = "INSERT INTO conditions (time, device_id, co2, temperature, humidity) VALUES($1, $2, $3, $4, $5)"
client.connect();
client.query(sql,
[new Date().toISOString(), 1, data.co2, data.temperature, data.humidity],
(err, result) => {
if (err) {
console.log(err);
}
client.end();
res.end();
});
}).listen(80);
以下の通り、サーバを起動。
nohup npm start &
IoTデバイスからTimescaleDBに測定値を記録
IoT端末として、Wio Terminal を、センサにSDC30を使って、室内環境をTimescaleDBに保存。
Aruduinoで以下のコードを作成して、書き込み。
#include <SCD30.h>
#include <rpcWiFi.h>
#include <HTTPClient.h>
#include <TFT_eSPI.h>
const char* ssid = "SSIDを設定";
const char* password = "WiFIのパスワード";
const char* postUrl = "サーバのURL";
TFT_eSPI tft;
void setup() {
// SDC30の初期化
Wire.begin();
scd30.initialize();
// シリアル出力の設定
Serial.begin(115200);
/*
* WiFiのセットアップ
*/
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.println("Connecting..");
}
Serial.print("Connected to the WiFi network with IP: ");
Serial.println(WiFi.localIP());
/*
* LCDディスプレイの固定表示部分のセットアップ
*/
tft.begin();
tft.setRotation(3);
tft.fillScreen(TFT_DARKCYAN);
tft.setTextColor(TFT_WHITE);
tft.setTextSize(2);
tft.drawString("CO2:", 40, 90);
tft.drawString("Temp.:", 40, 115);
tft.drawString("Humidity:", 40, 140);
tft.drawRoundRect(15, 55, 320 - (15 * 2), 240 - (55 * 2), 15, TFT_WHITE);
}
void loop() {
float result[3] = {0};
bool scd30available = scd30.isAvailable();
if (scd30available) {
// SDC30の値を読み込み、LCDの表示
scd30.getCarbonDioxideConcentration(result);
tft.fillRect(160, 90, 140, 80, TFT_DARKCYAN);
tft.drawString(String(result[0], 0) + " ppm", 160, 90);
tft.drawString(String(result[1], 1) + " deg C", 160, 115);
tft.drawString(String(result[2], 1) + " %", 160, 140);
}
if(scd30available) {
/*
* サーバへ値を送信
*/
HTTPClient http;
http.begin(postUrl);
http.addHeader("Content-type", "application/json");
String body = String("{\"co2\": " + String(result[0]) + ", \"temperature\": " + String(result[1], 1) +", \"humidity\": " + String(result[2], 1) + "}");
int httpResponseCode = http.POST(body);
if (httpResponseCode > 0){
Serial.print("HTTP Response Code: ");
} else {
Serial.print("Error on sending request: ");
}
Serial.println(httpResponseCode);
http.end(); //Free resources
} else {
Serial.println("Unable to create client");
}
Serial.println();
Serial.println("Waiting 1s before the next round...");
delay(5000);
}
データ集計する
SQL
1日の平均値、最大値、最小値を求める事ができる。
SELECT time_bucket('1 day', time) AS a_day,
avg(co2), max(co2), min(co2)
FROM conditions
GROUP BY a_day
ORDER BY a_day;
実行結果は以下の通り。
a_day | avg | max | min
------------------------+--------------------+---------+--------
2022-04-04 00:00:00+00 | 650 | 800 | 500
2022-04-19 00:00:00+00 | 1256.9683669574706 | 2636.34 | 500
2022-04-20 00:00:00+00 | 1211.2500919181427 | 2163.81 | 906.27
2022-04-21 00:00:00+00 | 1123.5553359946773 | 1824.32 | 706.44
2022-04-22 00:00:00+00 | 1236.5902098489098 | 2923.44 | 580.34
2022-04-23 00:00:00+00 | 627.415416666667 | 951.75 | 561.21
(6 rows)
PostgreSQL自体にもこのような関数はある。
SELECT date_trunc('day', time) AS a_day,
avg(co2), max(co2), min(co2)
FROM conditions
GROUP BY a_day
ORDER BY a_day;
Grafana
以下のようなクエリを設定する。
SELECT
$__timeGroup("time",'1d'),
avg(co2) AS "avg", max(co2) AS "max", min(co2) AS "min"
FROM conditions
WHERE
$__timeFilter("time")
GROUP BY 1
ORDER BY 1