スマートメーターBルートで得たデーターをRust+Polars+Plottersでグラフでみえる化する。
グラフをかいて、みえる化してみる
前々回, 前回とスマートメーターのBルートから現在の消費電力を確認した。
まあ関西民にはサービスブレーカーが落ちる経験がないので現在の電力とか電流を気にする必要もない(もともと無いもんが落ちるわけ無いもんな)。
それはとにかく今回はこのM5StickC + WiSUN HATで1分毎にDynamoDBに記録しているテレメトリをグラフにしてみます。
python + numpy + pandas で同じことをしているけど、今回はRustで高速化を目指すことにする。
実行するとこんなのが作られる。
ピークパワーは2828W 18時16分とわかる。
エアコンは運転開始時に最大の消費電力になるわけだから煖房運転を始めた時間が見た目でわかるし、だいたい30分で定常運転になったことがわかるな。
GitHubリポジトリ
plot/ディレクトリの内容がこの記事の物。
test/ディレクトリの内容が前の記事の物。
Rustで書き換えるついでに2つの実行ファイルに分離しました。
- DynamoDBにクエリーを発行してCSVファイルを作るretrieve_plotdata
- CSVファイルからグラフを作るplotcharts
credentialsファイルを事前に準備する
[default]
aws_access_key_id=YOUR-ACCESS-KEY
aws_secret_access_key=YOUR-SECRET-KEY
region=YOUR-DEFAULT-REGION
リンク先の通り接続情報を~/.aws/credentialsとかに用意してください。
DynamoDBにクエリーを発行してCSVファイルを作るretrieve_plotdata
このコマンドはDynamoDBからテレメトリデーターをCSVファイルでダウンロードするまでを行う。
Amazon DynamoDBを扱うのにaws_sdk_dynamodbを使います。
データーフレームを扱うのにPolarsを使います。
データーベースの最初と最後のアイテムを得る
def take_first_and_last_items(table):
params = {
'KeyConditionExpression': Key('device_id').eq(DEVICE_ID),
'FilterExpression': Key('data.sensor_id').eq(SENSOR_ID),
'Limit': 1,
'ScanIndexForward': True
}
params['ScanIndexForward'] = True
response = table.query(**params)
first = response['Items'][0]
params['ScanIndexForward'] = False
response = table.query(**params)
last = response['Items'][0]
return (first['data'], last['data'])
ここでやっていることは
パーティションキーのdevice_idとプログラムのDEVICE_IDが一致して
data.sensor_idとプログラムのSENSOR_IDとが一致するレコードを1件取り出す。
timestampというAWSが追加したUTC時間がソートキーであるために
'ScanIndexForward'を True / False に切り替えることで先頭と最後をそれぞれ得られるしかけ。
これをRustで書き換える。
// DynamoDBより最初と最後のレコードを得る
async fn get_first_and_last_item(
client: &Client,
) -> anyhow::Result<(AttributeValue, AttributeValue)> {
let query = client
.query()
.table_name(TABLE_NAME)
.key_condition_expression("device_id = :device_id")
.filter_expression("#data.sensor_id = :sensor_id")
.expression_attribute_names("#data", "data")
.expression_attribute_values(":device_id", AttributeValue::S(DEVICE_ID.to_owned()))
.expression_attribute_values(":sensor_id", AttributeValue::S(SENSOR_ID.to_owned()))
.limit(1);
let responce_first_item = query.clone().scan_index_forward(true).send().await?;
let responce_last_item = query.scan_index_forward(false).send().await?;
let opt_first_item = responce_first_item.items().and_then(|a| a.first());
let opt_last_item = responce_last_item.items().and_then(|a| a.first());
if let (Some(first), Some(last)) = (opt_first_item, opt_last_item) {
let result = (first["data"].to_owned(), last["data"].to_owned());
Ok(result)
} else {
Err(anyhow!("error: take_first_and_last_items()"))
}
}
ざっくり同じ。
dataは予約語なので#dataとかにしないとエラーとなる。
.expression_attribute_names("#data", "data")
データーベースから一日分のデーターを得る
def take_items_from_table(table, begin, end, tz):
begin_ = math.floor(begin.astimezone(timezone('UTC')).timestamp())
end_ = math.ceil(end.astimezone(timezone('UTC')).timestamp())
print("{} -> {}".format(begin, end))
params = {
'KeyConditionExpression': Key('device_id').eq(DEVICE_ID) & Key('timestamp').between(begin_, end_),
'FilterExpression': Key('data.sensor_id').eq(SENSOR_ID),
}
response = table.query(**params)
item_list = response['Items']
while 'LastEvaluatedKey' in response:
params['ExclusiveStartKey'] = response['LastEvaluatedKey']
response = table.query(**params)
item_list += response['Items']
return time_sequential_data_frame([x["data"] for x in item_list], tz)
timestampはUTC時間なのでbegin,endをUTC時間に変換したbegin_, end_の範囲をBETWEENで指定してクエリーを発行する。
クエリーによっては分割して得られるためにずらしながらwhileループで取得している。
これをRustで書き換える。
// timestamp指定でDBの"data"レコードを得る
async fn get_items_from_table(
client: &Client,
timestamp: Range<i64>,
) -> anyhow::Result<Vec<HashMap<String, AttributeValue>>> {
let query = client
.query()
.table_name(TABLE_NAME)
.key_condition_expression("device_id = :device_id AND #timestamp BETWEEN :tstart AND :tend")
.filter_expression("#data.sensor_id = :sensor_id")
.expression_attribute_names("#data", "data")
.expression_attribute_names("#timestamp", "timestamp")
.expression_attribute_values(":device_id", AttributeValue::S(DEVICE_ID.to_owned()))
.expression_attribute_values(":sensor_id", AttributeValue::S(SENSOR_ID.to_owned()))
.expression_attribute_values(":tstart", AttributeValue::N(timestamp.start.to_string()))
.expression_attribute_values(":tend", AttributeValue::N(timestamp.end.to_string()));
let results = query.send().await?;
if let Some(items) = results.items {
let data = items
.iter()
.map(|item| {
item["data"]
.as_m()
.map(|hm| hm.to_owned())
.or(Err(anyhow!("datetime conversion error.")))
})
.collect::<anyhow::Result<Vec<HashMap<String, AttributeValue>>>>()?;
Ok(data)
} else {
Ok(vec![])
}
}
ざっくり同じ。
気を付けるのはAttributeValueを適切なものにするくらいかな。
AttributeValueはDynamoDBでテーブルを作るときに指定したもの。
パーティションキー | device_id |
パーティションキーのタイプ | STRING |
これであるのでAttributeValue::Sと指定する。
.expression_attribute_values(":device_id", AttributeValue::S(DEVICE_ID.to_owned()))
データーフレームを作る
// dataframe
fn time_sequential_dataframe(
items: Vec<HashMap<String, AttributeValue>>,
) -> anyhow::Result<polars::prelude::DataFrame> {
// データ型の指定
const PAIR: [(&'static str, DataType); 7] = [
("measured_at", DataType::Utf8),
("sensor_id", DataType::Utf8),
("message_id", DataType::UInt32),
("cumlative_kwh", DataType::Float64),
("instant_watt", DataType::Float64),
("instant_ampere_R", DataType::Float64),
("instant_ampere_T", DataType::Float64),
];
// 並び順
let ording = PAIR.iter().map(|p| p.0).collect::<Vec<&str>>();
//
let series = PAIR
.iter()
.map(|pair| series_from_items(&items, pair))
.collect::<anyhow::Result<Vec<Series>>>()?;
// 列を作る
let df = DataFrame::new(series)?;
// 列を並び変える
let df = df.select(ording)?;
// measured_at列をiso8601形式にする
let datetimes = df["measured_at"]
.utf8()?
.into_iter()
.map(|opt| {
opt.and_then(|s| {
// パースに失敗したらNone(NaN)値にする
match parse_iso8601_to_jst(s) {
Ok(jst) => Some(jst),
Err(e) => {
eprintln!("WARNING: \"{:?}\" discarded.", e);
None
}
}
})
})
.collect::<Vec<Option<DateTime<FixedOffset>>>>();
// 秒の情報は不要なので
let datetimes = datetimes
.iter()
.map(|opt| {
opt.map(|v| {
let naivetime = v.naive_local();
let sec = Duration::seconds(naivetime.second() as i64);
let nsec = Duration::nanoseconds(naivetime.nanosecond() as i64);
let dt: DateTime<FixedOffset> = v - sec - nsec;
dt
})
})
.collect::<Vec<Option<DateTime<FixedOffset>>>>();
let datetimes_str = datetimes
.iter()
.map(|opt_dt| opt_dt.map(|dt| dt.to_rfc3339()))
.collect::<Vec<Option<String>>>();
let datetime_series = Series::new("measured_at", datetimes_str).cast(&DataType::Utf8)?;
//
let mut df2 = df.to_owned();
let df = df2.with_column(datetime_series).map(|a| a.to_owned())?;
Ok(df)
}
"measured_at"にDataType::Utf8を指定しているのはそのままCSVファイルにするため。
データーフレームからCSVファイルを作る。
このために polars::prelude::CsvWriter が用意されているのでそれを使う。
let mut file = std::fs::File::create(outfilepath)?;
CsvWriter::new(&mut file).finish(&mut df)?;
CSVファイルからグラフを作るplotcharts
ここから別のコマンド。
CSVファイルからデーターフレームを作る。
上のコマンドで作ったCSVファイルはRFC3339形式にしていて問題ないことを確認している。
.map(|opt_dt| opt_dt.map(|dt| dt.to_rfc3339()))
読み込んだCSVファイルのmeasured_at列はIS08601形式?RFC3339形式?どっちかわからないけど、datetime型に変換するところで失敗するとパーになるのでよろしく。
// CSVファイルを読み込んでデータフレームを作る
fn read_csv<P: AsRef<Path>>(path: P) -> Result<LazyFrame, PolarsError> {
let ldf = LazyCsvReader::new(path).has_header(true).finish()?;
// measured_at列をstr型からdatetime型に変換する
let expr: Expr = col(colname::MEASURED_AT)
.str()
.strptime(
DataType::Datetime(TimeUnit::Milliseconds, None),
StrptimeOptions {
format: Some("%Y-%m-%dT%H:%M:%S%z".into()),
strict: false,
..Default::default()
},
)
.alias(colname::MEASURED_AT); // 変換済みの列で上書きする
Ok(ldf.with_column(expr))
}
プロットする
データーフレームをプロットするのにはplottersを使います。
じつは例を参考にしただけであまりわかってないので、何をやっているのか説明するだけにする。
// csvファイルからグラフを作る
fn run<P: AsRef<Path>>(
infilepath: P,
overwrite: bool,
plotareasize: (u32, u32),
chart_file_type: ChartFileType,
) -> anyhow::Result<String> {
// 出力するファイル名は入力ファイルの.csvを.png/.svgに変えたもの
let infilepath_string = format!("{:?}", infilepath.as_ref().as_os_str());
let mut outfilepath: PathBuf = PathBuf::from(infilepath.as_ref());
outfilepath.set_extension(match chart_file_type {
ChartFileType::Png => "png",
ChartFileType::Svg => "svg",
});
// 出力するファイルの存在確認
if outfilepath.is_file() && !overwrite {
let outfilepath_string = format!("{:?}", outfilepath.as_os_str());
Err(anyhow!("{} file is already exist!", outfilepath_string))?;
}
// CSVファイルからデーターフレームを作る
let df: DataFrame = read_csv(infilepath)?
.sort(colname::MEASURED_AT, SortOptions::default())
.collect()?;
//
match chart_file_type {
ChartFileType::Png => {
let root_area = BitMapBackend::new(&outfilepath, plotareasize).into_drawing_area();
plot(root_area, df.clone())?;
}
ChartFileType::Svg => {
let root_area = SVGBackend::new(&outfilepath, plotareasize).into_drawing_area();
plot(root_area, df.clone())?;
}
};
// 結果を返す
Ok(format!("inputfile -> {}\n{:?}", infilepath_string, df))
}
グラフエリアを縦に3分割する。
積算電力量(30分値)をグラフ化する
// 積算電力量グラフを作る
fn plot_cumlative_kilo_watt_hour<DB: DrawingBackend>(
area: &DrawingArea<DB, plotters::coord::Shift>,
df: &DataFrame,
line_style: ShapeStyle,
box_style: ShapeStyle,
) -> anyhow::Result<()>
where
DB::ErrorType: 'static,
{
// X軸の日付時間
let (datetimes, range_datetime) = as_datetime_vector(&df[colname::MEASURED_AT], Tokyo)?;
// Y軸の測定値
let (values, range_value) = as_values_vector(&df[colname::CUMLATIVE_KWH])?;
// XYの値
let dataset: Vec<(&NaiveDateTime, &f64)> = datetimes.iter().zip(values.iter()).collect();
//
let mut chart = ChartBuilder::on(area)
.caption("積算電力量測定値(30分値)", ("sans-serif", 16).into_font())
.margin(10)
.x_label_area_size(40)
.y_label_area_size(60)
.build_cartesian_2d(range_datetime.clone(), range_value)?;
// 軸ラベルとか
chart
.configure_mesh()
.x_labels(24)
.x_label_formatter(&|t: &NaiveDateTime| t.format("%H").to_string())
.x_desc(range_datetime.range().start.format("%F %A").to_string())
.y_desc("積算電力量30分値(kWh)")
.draw()?;
// 積算電力量を高さと30分の横幅の四角で表現する
let rectangles = dataset.iter().copied().map(|(datetime, value)| {
let start_x = *datetime;
let end_x = start_x.checked_add_signed(Duration::minutes(30)).unwrap();
let mut bar = Rectangle::new([(start_x, 0.0), (end_x, *value)], box_style);
bar.set_margin(0, 0, 0, 0);
bar
});
chart.draw_series(rectangles)?;
// 積算電力量を折れ線で表現する
chart.draw_series(LineSeries::new(
dataset
.iter()
.copied()
.map(|(datetime, value)| (*datetime, *value)),
line_style,
))?;
// 積算電力量を点で表現する
chart.draw_series(
dataset
.iter()
.copied()
.map(|(x, y)| Circle::new((*x, *y), 2, line_style)),
)?;
Ok(())
}
横軸が時間、縦軸が積算電力量瞬時電力(30分値)の棒グラフと折れ線グラフを作っている。
瞬時電力(1分値)をグラフ化する
// 瞬時電力グラフを作る
fn plot_instant_watt<DB: DrawingBackend>(
area: &DrawingArea<DB, plotters::coord::Shift>,
df: &DataFrame,
box_style: ShapeStyle,
) -> anyhow::Result<()>
where
DB::ErrorType: 'static,
{
// X軸の日付時間
let (datetimes, range_datetime) = as_datetime_vector(&df[colname::MEASURED_AT], Tokyo)?;
// Y軸の測定値
let (values, range_value) = as_values_vector(&df[colname::INSTANT_WATT])?;
// XYの値
let dataset: Vec<(&NaiveDateTime, &f64)> = datetimes.iter().zip(values.iter()).collect();
// 最大値
let peak: (&NaiveDateTime, &f64) = datetimes
.iter()
.zip(values.iter())
.max_by(|a, b| (a.1).total_cmp(b.1))
.ok_or(anyhow!("fail to compare!"))?;
//
let mut chart = ChartBuilder::on(area)
.caption("瞬時電力測定値(1分値)", ("sans-serif", 16).into_font())
.margin(10)
.x_label_area_size(40)
.y_label_area_size(60)
.build_cartesian_2d(
range_datetime.clone(),
range_value.start.min(0.0)..(500.0 * ((range_value.end / 500.0).ceil())),
)?;
// 軸ラベルとか
chart
.configure_mesh()
.x_labels(24)
.x_label_formatter(&|t: &NaiveDateTime| t.format("%H").to_string())
.x_desc(range_datetime.range().start.format("%F %A").to_string())
.y_desc("瞬時電力1分値(W)")
.draw()?;
// 瞬時電力量を高さと1分の横幅の四角で表現する
chart
.draw_series(dataset.iter().copied().map(|(datetime, value)| {
let start_x = *datetime;
let end_x = start_x.checked_add_signed(Duration::minutes(1)).unwrap();
let mut bar = Rectangle::new([(start_x, 0.0), (end_x, *value)], box_style);
bar.set_margin(0, 0, 0, 0);
bar
}))?
// ピークパワーの凡例
.label(format!("Peak {}W({})", peak.1, peak.0.format("%H:%M")))
.legend(|coord| EmptyElement::at(coord) + Circle::new((0, 0), 3, RED.filled()));
//
chart.draw_series(PointSeries::of_element(
vec![(*peak.0, *peak.1)],
3,
ShapeStyle::from(&RED),
&|coord, size, style| {
let text_style = ("sans-serif", 16)
.into_font()
.color(&RED)
.pos(Pos::new(HPos::Center, VPos::Bottom));
EmptyElement::at(coord)
+ Circle::new((0, 0), size, style)
+ Text::new(format!("{}W", coord.1), (0, -1), text_style)
},
))?;
// 凡例
chart
.configure_series_labels()
.background_style(WHITE.mix(0.5))
.border_style(BLACK)
.draw()?;
Ok(())
}
横軸が時間、縦軸が瞬時電力(1分値)の棒グラフを作っている。
瞬時電流(1分値)をグラフ化する
// 瞬時電流グラフを作る
fn plot_instant_ampere<DB: DrawingBackend>(
area: &DrawingArea<DB, plotters::coord::Shift>,
df: &DataFrame,
r_box_style: ShapeStyle,
t_box_style: ShapeStyle,
) -> anyhow::Result<()>
where
DB::ErrorType: 'static,
{
// X軸の日付時間
let (datetimes, range_datetime) = as_datetime_vector(&df[colname::MEASURED_AT], Tokyo)?;
// Y軸のR相電流測定値
let (values_r, _) = as_values_vector(&df[colname::INSTANT_AMPERE_R])?;
// Y軸のT相電流測定値
let (values_t, _) = as_values_vector(&df[colname::INSTANT_AMPERE_T])?;
// R相電流とT相電流を加算する
let accumlated = values_r
.iter()
.zip(values_t.iter())
.map(|(a, b)| a + b)
.collect::<Vec<f64>>();
// R相電流とT相電流を加算した値の最大値
let max_value = accumlated
.iter()
.max_by(|a, b| a.total_cmp(b))
.ok_or(NoData("datetime".into()))?;
// R相電流とT相電流を加算した値の最小値
let min_value = accumlated
.iter()
.min_by(|a, b| a.total_cmp(b))
.ok_or(NoData("datetime".into()))?;
// R相電流とT相電流を加算した値の範囲
let range_value = (*min_value)..(*max_value);
// (X, R相Y, T相Y)の値
let dataset: Vec<(&NaiveDateTime, &f64, &f64)> =
itertools::izip!(&datetimes, &values_r, &values_t).collect();
//
let mut chart = ChartBuilder::on(area)
.caption("瞬時電流測定値(1分値)", ("sans-serif", 16).into_font())
.margin(10)
.x_label_area_size(40)
.y_label_area_size(60)
.build_cartesian_2d(
range_datetime.clone(),
range_value.start.min(0.0)..range_value.end,
)?;
// 軸ラベルとか
chart
.configure_mesh()
.x_labels(24)
.x_label_formatter(&|t: &NaiveDateTime| t.format("%H").to_string())
.x_desc(range_datetime.range().start.format("%F %A").to_string())
.y_desc("瞬時電流1分値(A)")
.draw()?;
// R相電流を高さと1分の横幅の四角で表現する
chart
.draw_series(dataset.iter().copied().map(|(datetime, value_r, _)| {
let start_x = *datetime;
let end_x = start_x.checked_add_signed(Duration::minutes(1)).unwrap();
let start_y = 0.0; // R相電流の高さは0から始める
let end_y = start_y + *value_r;
let mut bar = Rectangle::new([(start_x, start_y), (end_x, end_y)], r_box_style);
bar.set_margin(0, 0, 0, 0); // 隙間なく並べる
bar
}))?
// R相電流の凡例
.label("R相電流")
.legend(|(x, y)| PathElement::new(vec![(x, y), (x + 20, y)], r_box_style));
// T相電流を高さと1分の横幅の四角で表現する
// T相電流はR相電流の上に積む
chart
.draw_series(dataset.iter().copied().map(|(datetime, value_r, value_t)| {
let start_x = *datetime;
let end_x = start_x.checked_add_signed(Duration::minutes(1)).unwrap();
let start_y = *value_r; // T相電流の高さはR相電流の高さの分加算する
let end_y = start_y + *value_t;
let mut bar = Rectangle::new([(start_x, start_y), (end_x, end_y)], t_box_style);
bar.set_margin(0, 0, 0, 0); // 隙間なく並べる
bar
}))?
// T相電流の凡例
.label("T相電流")
.legend(|(x, y)| PathElement::new(vec![(x, y), (x + 20, y)], t_box_style));
// 凡例
chart
.configure_series_labels()
.background_style(WHITE.mix(0.5))
.border_style(BLACK)
.draw()?;
Ok(())
}
横軸が時間、縦軸がR相とT相の瞬時電流(1分値)の積み上げ棒グラフを作っている。
Rustのソースファイルでコメント多めにしておいたので、ソースファイルも見てください。
感想
6月19日のグラフを例にするとNA値(欠損)が目立つ。
ピークパワーが浮いているのは横幅が狭いので省略されているだけなのでplotcharts -x1200とかで出力すると出てくる。
生データがこれ。
2023-06-19T15:27:00+09:00,smartmeter,65744,,860.0,,
2023-06-19T15:27:00+09:00,smartmeter,65745,,,7.3,1.8
2023-06-19T15:28:00+09:00,smartmeter,65746,,860.0,,
2023-06-19T15:28:00+09:00,smartmeter,65747,,,7.3,1.8
2023-06-19T15:32:00+09:00,smartmeter,65750,,848.0,,
2023-06-19T15:32:00+09:00,smartmeter,65751,,,7.2,1.7
2023-06-19T15:30:00+09:00,smartmeter,65752,6439.82,,,
2023-06-19T15:34:00+09:00,smartmeter,65753,,848.0,,
2023-06-19T15:34:00+09:00,smartmeter,65754,,,7.3,1.7
2023-06-19T15:36:00+09:00,smartmeter,65755,,936.0,,
2023-06-19T15:36:00+09:00,smartmeter,65756,,,7.4,2.5
2023-06-19T15:38:00+09:00,smartmeter,65757,,1696.0,,
2023-06-19T15:38:00+09:00,smartmeter,65758,,,12.1,5.8
2023-06-19T15:38:00+09:00,smartmeter,65759,,1688.0,,
2023-06-19T15:38:00+09:00,smartmeter,65760,,,16.5,1.9
2023-06-19T15:39:00+09:00,smartmeter,65761,,1688.0,,
2023-06-19T15:39:00+09:00,smartmeter,65762,,,16.5,1.9
2023-06-19T15:40:00+09:00,smartmeter,65763,,1716.0,,
2023-06-19T15:41:00+09:00,smartmeter,65765,,1728.0,,
2023-06-19T15:41:00+09:00,smartmeter,65766,,,16.5,2.2
2023-06-19T15:28:00+09:00,smartmeter,65747,,,7.3,1.8
65748 欠損。
65749 欠損
2023-06-19T15:32:00+09:00,smartmeter,65750,,848.0,,
本来はこうであるはず。
2023-06-19T15:29:00+09:00,smartmeter,65748,,瞬時電力,,
2023-06-19T15:29:00+09:00,smartmeter,65749,,,R相電流値,T相電流値
再起動があるとmessege_idが0に戻るので再起動はしていないので、
スマートメーターに出した要求が破棄されているか1分以上遅延しているようで。
ECHONET TIDもDBに記録していたらわかるかもだけど。
それか
通信パケットが破棄されているとか。
MQTT Quality of Serviceが0つまり再送なしになっているのか?
QoSは1に指定しているんだが?。
AWS IoT CoreはQoS 0または1をサポートしているわけで。
ちょっとよくわからない。
欠損値が気になるならスマート分電盤に交換したほうがいいようだ。
plottersのことがよくわかっていないので、わかったら追記するつもりです。
Discussion