Closed11

時系列データ処理についての調査(TimescaleDB)

horie-thorie-t

需要の予測にも使えるってのは考えてなかったなあ。

horie-thorie-t

AWS EC2でTimescaleDBを稼働させる。

基本的に、以下に従う。
https://docs.timescale.com/install/latest/installation-cloud-image/#install-self-hosted-timescaledb-from-a-pre-built-cloud-image

AMIは「TimescaleDB 2.6.0 (PostgreSQL 13) - Ubuntu 20.04 (EBS Backed)」を使用。

起動後はSSHでログインして、コンフィグレーション。

sudo timescaledb-tune
# 問い合わせは、全部y

設定変更後に、サービスを再起動

sudo systemctl restart postgresql.service

PostgreSQLに接続

sudo -u postgres psql

tsdbというDatabaeを作成して、TimescaleDB拡張を追加する。

CREATE database tsdb;
\c tsdb
CREATE EXTENSION IF NOT EXISTS timescaledb;

追加結果の確認

tsdb=# \dx
                                      List of installed extensions
    Name     | Version |   Schema   |                            Description                            
-------------+---------+------------+-------------------------------------------------------------------
 plpgsql     | 1.0     | pg_catalog | PL/pgSQL procedural language
 timescaledb | 2.6.0   | public     | Enables scalable inserts and complex queries for time-series data
(2 rows)

Databaseアクセス用のユーザ(例: iot)を作成。

ALTER USER iot WITH PASSWORD 'パスワード';

権限をiotに設定。

GRANT ALL PRIVILEGES ON DATABASE tsdb TO iot;
GRANT ALL PRIVILEGES ON conditions TO iot;
horie-thorie-t

TimescaleDBへのINSERTとクエリ

TimescaleDBにテーブルを作成して、データをINSERTして、検索してみる。

CREATE TABLE conditions (
  time        TIMESTAMPTZ       NOT NULL,
  device_id   INTEGER           NOT NULL,
  co2 	      DOUBLE PRECISION  NULL,
  temperature DOUBLE PRECISION  NULL,
  humidity    DOUBLE PRECISION  NULL
);

TimescaleDBのハイパーテーブルに変更。

SELECT create_hypertable('conditions', 'time');

データを挿入

INSERT INTO conditions VALUES('2022-04-04 12:00:00.000+00', 1, 500, 27, 50),
       ('2022-04-04 12:01:00.000+00', 1, 700, 28, 51),
       ('2022-04-04 12:02:00.000+00', 1, 700, 29, 52),
       ('2022-04-04 12:03:00.000+00', 1, 700, 30, 53),
       ('2022-04-04 12:04:00.000+00', 1, 800, 28, 54),
       ('2022-04-04 12:05:00.000+00', 1, 750, 27, 55),
       ('2022-04-04 12:06:00.000+00', 1, 700, 26, 53),
       ('2022-04-04 12:07:00.000+00', 1, 650, 25, 52),
       ('2022-04-04 12:08:00.000+00', 1, 600, 24, 51),
       ('2022-04-04 12:09:00.000+00', 1, 500, 23, 50),
       ('2022-04-04 12:10:00.000+00', 1, 550, 25, 49),
       ('2022-04-04 12:11:00.000+00', 1, 600, 26, 50),
       ('2022-04-04 12:12:00.000+00', 1, 650, 27, 53),
       ('2022-04-04 12:13:00.000+00', 1, 700, 29, 55);

5分ごとの平均を求める

tsdb=# SELECT time_bucket('5 minutes', time) AS five_min,
tsdb-#        avg(co2), avg(temperature), avg(humidity)
tsdb-# FROM conditions
tsdb-# GROUP BY five_min ORDER BY five_min;
        five_min        | avg |  avg  |  avg  
------------------------+-----+-------+-------
 2022-04-04 12:00:00+00 | 680 |  28.4 |    52
 2022-04-04 12:05:00+00 | 640 |    25 |  52.2
 2022-04-04 12:10:00+00 | 625 | 26.75 | 51.75
(3 rows)
horie-thorie-t

Grafanaを使って可視化

以下を参考にしながら
https://nozomi1773.hatenablog.com/entry/2019/04/16/010519

PostgreSQL側でDBにアクセスするためのuserを作成しておく。

CREATE USER ユーザ名 WITH PASSWORD 'パスワード';
GRANT ALL PRIVILEGES ON DATABASE tsdb TO ユーザ名;

Grafanaのインストール

ここに従う
https://grafana.com/grafana/download?pg=get&plcmt=selfmanaged-box1-cta1

sudo apt-get update
sudo apt-get install -y adduser libfontconfig1
wget https://dl.grafana.com/oss/release/grafana_8.4.5_amd64.deb
sudo dpkg -i grafana_8.4.5_amd64.deb

サービスの起動

sudo systemctl start grafana-server

SSHポートフォワードでトンネリング

ssh -i .ssh/xxx-key-pair.pem -L 3000:localhost:3000 ubuntu@xxx.xxx.xxx.xxx

以下のURLにアクセス
http://localhost:3000/

デフォルトのusername、passwordは admin で、ログイン後にパスワードの変更を求められる。

ConfigurationDatasource をクリック。

PostgreSQLを選択する。

Host、Database、User、Passwordを入力、TLS/SSL Modeはdisableに、PostgreSQL detailのVersionはTimescaleDBのベースになっているVersionを指定し、Save & Testをクリックする。

CreateDashboard を選択。

Add a new panel を選択。

下部の Data sourcePostgreSQL を選択し Column を選択して、右上の Apply をクリック。

上部の Save のアイコンをクリックして、ダイアログで Dashboard name を入力して、Saveをクリック

horie-thorie-t

HTTP経由でTimescaleDBにINSERT

HTTPサーバは、node.jsでサーバを立てる。

npm init -y

package.jsonを以下のように編集。

{
  "name": "dbproxy",
  "version": "1.0.0",
  "description": "",
  "main": "src/main.js",
  "scripts": {
    "start": "sudo node ./src/main.js",
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "author": "HORIE Tetsuya",
  "license": "ISC",
  "dependencies": {
    "pg": "^8.7.3"
  }
}

src/main.js を以下の通り作成。

const { Client } = require("pg");

var http = require('http');
var server = http.createServer(async (req, res) => {
    const buffers = [];
    for await (const chunk of req) {
	buffers.push(chunk);
    }
    const data = JSON.parse(Buffer.concat(buffers).toString());
    await console.log(data);

    const client = new Client({
	user: "ユーザ名",
	host: "127.0.0.1",
	database: "tsdb",
	password: "パスワード",   
	port: 5432,
    });
    const sql = "INSERT INTO conditions (time, device_id, co2, temperature, humidity) VALUES($1, $2, $3, $4, $5)"
    client.connect();
    client.query(sql,
		 [new Date().toISOString(), 1, data.co2, data.temperature, data.humidity],
		 (err, result) => {
		     if (err) {
			 console.log(err);
		     }
		     client.end();
		     res.end();
		 });
}).listen(80);

以下の通り、サーバを起動。

nohup npm start &
horie-thorie-t

IoTデバイスからTimescaleDBに測定値を記録

IoT端末として、Wio Terminal を、センサにSDC30を使って、室内環境をTimescaleDBに保存。

Aruduinoで以下のコードを作成して、書き込み。

#include <SCD30.h>
#include <rpcWiFi.h>
#include <HTTPClient.h>
#include <TFT_eSPI.h>

const char* ssid = "SSIDを設定";
const char* password =  "WiFIのパスワード";
const char* postUrl =  "サーバのURL";

TFT_eSPI tft;

void setup() {
  // SDC30の初期化
  Wire.begin();
  scd30.initialize();

  // シリアル出力の設定
  Serial.begin(115200);

  /*
   * WiFiのセットアップ
   */
  WiFi.begin(ssid, password);
 
  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.println("Connecting..");
  }
  Serial.print("Connected to the WiFi network with IP: ");
  Serial.println(WiFi.localIP());
  
  /*
   * LCDディスプレイの固定表示部分のセットアップ
   */
  tft.begin();
  tft.setRotation(3);
 
  tft.fillScreen(TFT_DARKCYAN);

  tft.setTextColor(TFT_WHITE);
  tft.setTextSize(2);
  tft.drawString("CO2:",      40,  90);
  tft.drawString("Temp.:",    40, 115);
  tft.drawString("Humidity:", 40, 140);
  tft.drawRoundRect(15, 55, 320 - (15 * 2), 240 - (55 * 2), 15, TFT_WHITE);
}
 
void loop() {
  float result[3] = {0};

  bool scd30available = scd30.isAvailable();
  if (scd30available) {
    // SDC30の値を読み込み、LCDの表示
    scd30.getCarbonDioxideConcentration(result);
    tft.fillRect(160, 90, 140, 80, TFT_DARKCYAN);
    tft.drawString(String(result[0], 0) + " ppm",   160,  90);
    tft.drawString(String(result[1], 1) + " deg C", 160, 115);
    tft.drawString(String(result[2], 1) + " %",     160, 140);
  }

  if(scd30available) {
    /*
     * サーバへ値を送信
     */
    HTTPClient http;
 
    http.begin(postUrl);
    http.addHeader("Content-type", "application/json");

    String body = String("{\"co2\": " + String(result[0]) + ", \"temperature\":  " + String(result[1], 1) +", \"humidity\": " + String(result[2], 1) + "}");
 
    int httpResponseCode = http.POST(body);
    if (httpResponseCode > 0){
      Serial.print("HTTP Response Code: ");
    } else {
      Serial.print("Error on sending request: ");
    }
    Serial.println(httpResponseCode);
 
    http.end();  //Free resources
  } else {
    Serial.println("Unable to create client");
  }
  
  Serial.println();
  Serial.println("Waiting 1s before the next round...");
  delay(5000);
}
horie-thorie-t

データ集計する

SQL

1日の平均値、最大値、最小値を求める事ができる。

SELECT time_bucket('1 day', time) AS a_day,
     avg(co2), max(co2), min(co2)
FROM conditions
GROUP BY a_day
ORDER BY a_day;

実行結果は以下の通り。

         a_day          |        avg         |   max   |  min   
------------------------+--------------------+---------+--------
 2022-04-04 00:00:00+00 |                650 |     800 |    500
 2022-04-19 00:00:00+00 | 1256.9683669574706 | 2636.34 |    500
 2022-04-20 00:00:00+00 | 1211.2500919181427 | 2163.81 | 906.27
 2022-04-21 00:00:00+00 | 1123.5553359946773 | 1824.32 | 706.44
 2022-04-22 00:00:00+00 | 1236.5902098489098 | 2923.44 | 580.34
 2022-04-23 00:00:00+00 |   627.415416666667 |  951.75 | 561.21
(6 rows)

PostgreSQL自体にもこのような関数はある。

SELECT date_trunc('day', time) AS a_day,
     avg(co2), max(co2), min(co2)
FROM conditions
GROUP BY a_day
ORDER BY a_day;

Grafana

以下のようなクエリを設定する。

SELECT
  $__timeGroup("time",'1d'),
  avg(co2) AS "avg", max(co2) AS "max", min(co2) AS "min"
FROM conditions
WHERE
  $__timeFilter("time")
GROUP BY 1
ORDER BY 1

このスクラップは2022/10/15にクローズされました