TimescaleDB Continuous aggregates メモ
Continuous aggregates
Timescale Documentation | Continuous aggregates
- TimescaleDB の hypertable を利用したマテリアライズドビュー
- ここではこれをマテビューと呼ぶ
- 通常のマテビューは使う予定ないので、調べない
CREATE MATERIALIZED VIEW conditions_summary_daily
WITH (timescaledb.continuous) AS
SELECT device,
time_bucket(INTERVAL '1 day', time) AS bucket,
AVG(temperature),
MAX(temperature),
MIN(temperature)
FROM conditions
GROUP BY device, bucket;
with の後に (timescaledb.continuous) を指定することで、リフレッシュを勝手にやってくれる。
元のテーブルにレコードが追加されたり更新されたりしたら反映される。
ハイパーテーブルは更新がメインというより追記がメインなので、あまり更新は考慮しなくて良さそう。
Hierarchical continuous aggregates
- Timescale Documentation | Continuous aggregates on continuous aggregates
- An Incremental Materialized View on Steroids: How We Made Continuous Aggregates Even Better
TimescaleDB 2.9 からマテビューを参照してマテビューを作れるようになる。つまり 1 時間単位でのマテビューを作ったら、それを使って1日単位でのマテビューが作れる。
つまり、 5 分単位での連続集計によるマテビューを作って
CREATE MATERIALIZED VIEW cpu_usage_metrics_avg_5min
WITH (timescaledb.continuous) AS
SELECT
time_bucket('5 minutes', "time") AS "time",
"machine_id",
avg(cpu_usage) AS "avg_cpu_usage",
sum(cpu_usage) AS "sum_cpu_usage",
count(cpu_usage) AS "count_cpu_usage"
FROM cpu_usage_metrics
GROUP BY 1, 2
ORDER BY 1;
そこから 15 分単位でのマテビューが作れるようになる。素晴らしい。
CREATE MATERIALIZED VIEW cpu_usage_metrics_avg_15min
WITH (timescaledb.continuous) AS
SELECT
time_bucket('15 minutes', "time") AS "time",
"machine_id",
sum(sum_cpu_usage) / sum(count_cpu_usage) AS "avg_cpu_usage"
FROM cpu_usage_metrics_avg_5min
GROUP BY 1, 2
ORDER BY 1;
WITH NO DATA
WITH NO DATAオプションの使用
デフォルトでは、初めてビューを作成するとき、そのビューにはデータが入力されています。これは、ハイパーテーブル全体で集約を計算できるようにするためです。テーブルが非常に大きい場合や、新しいデータが継続的に追加される場合など、このような事態を避けたい場合は、データの更新順序を制御することができます。このためには、WITH NO DATA オプションを使用して、連続的な集約ポリシーに手動更新を追加します。WITH NO DATAオプションを使用すると、連続的な集約が即座に作成されるため、データが集約されるのを待つ必要がありません。データは、ポリシーの実行が開始されたときにのみ、入力が開始されます。これは、start_offset時間よりも新しいデータのみが連続的な集約に入力され始めることを意味します。開始_オフセット間隔よりも古い履歴データがある場合、リアルタイム・クエリを効率的に実行できるようにするために、現在の開始_オフセットまで履歴を手動で更新する必要があります。
手動でリフレッシュを実行する。
CALL refresh_continuous_aggregate('example', '2021-05-01', '2021-06-01');
自動リフレッシュ間隔も指定できる
- マテビュー名
- リフレッシュウィンドウの開始位置
- リフレッシュウィンドウの終了位置
SELECT add_continuous_aggregate_policy('cagg_rides_view',
start_offset => INTERVAL '1 week',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '30 minutes');
リアルタイムアグリゲート
デフォルトではリアルタイムにマテビューへ反映される。
Timescale Documentation | Real-time aggregates
連続アグリゲーションでは、基礎となるハイパーターブルからの最新のデータチャンクは含まれません。リアルタイムアグリゲーションは、アグリゲーションされたデータを使用し、最新の生データを追加して、正確で最新の結果を提供するもので、データが書き込まれるときにアグリゲーションする必要はありません。
本来、最新のデータチャンクは集計されないが、リアルタイムアグリゲートを有効にしているとそこもうまいこと集計した状態を返してくれる。
TimescaleDB 1.7以降では、リアルタイムアグリゲートはデフォルトで有効になっています
基本的にはデフォルト有効で悪くなさそう。
How to ensure up-to-date results with Real-Time Aggregation
連続集計の利点は 2 つあります。
クエリのパフォーマンス。 基礎となる生データではなく、事前に計算された結果に対してクエリを実行することにより、継続的な集計はクエリのパフォーマンスを大幅に向上させることができます。
ダウンサンプリングによるストレージの節約。継続的な集計は、多くの場合、より優れたストレージ管理のためにデータ保持ポリシーと組み合わされます。生データは継続的に具体化されたテーブルに集約され、特定の期間に達すると削除されます。そのため、データベースは一定期間 (たとえば 1 週間) の生データのみを保存し、集計データはさらに長期間保存することができます。
リフレッシュポリシー
- リフレッシュポリシーが設定できる
- start_offset
- リフレッシュの開始位置
- null にすると min(timestamp) になり、一番古いデータから全てが対象になる
- end_offset
- フレッシュの終了位置
- null にすると max(timestamp) になり、一番古いデータから全てが対象になる
- schedule_interval
- 更新間隔
SELECT add_continuous_aggregate_policy('conditions_summary_hourly',
start_offset => NULL。
end_offset => INTERVAL '1 h',
schedule_interval => INTERVAL '1 h');
- 開始位置は min(timestamp)
- 終了位置は 1 h なので、過去 1 時間は含まない
- 1 時間間隔で処理する
リアルタイムアグリゲートを有効にしていれば過去の 1 時間も含めて SELECT した際に結果を返してくれる。
sqlc
sqlc で Continuous aggregates を利用する際に、 time_bucket 関数に型を指定する場合は timestamptz を指定する必要がある。timestamp ではエラーになってしまう。これは型を指定しないと interface{} になってしまうので必須。
CREATE MATERIALIZED VIEW conditions_summary_daily
WITH (timescaledb.continuous) AS
SELECT device,
time_bucket(INTERVAL '1 day', time)::timestamptz AS bucket,
AVG(temperature),
MAX(temperature),
MIN(temperature)
FROM conditions
GROUP BY device, bucket;
こちらで教えていただいた。
Compress continuous aggregates
マテビューでも圧縮が利用できる。集計したデータもたまってくれば容量を食うので、それを一定期間が経過したら圧縮してしまうことができる。
圧縮を有効にするのは簡単。
ALTER MATERIALIZED VIEW cagg_name set (timescaledb.compress = true);
あとはどのくらい時間が経過したら圧縮するかを指定するだけ。
SELECT add_compression_policy('cagg_name', compress_after=>'45 days'::interval);
注意点としてオフセットを意識する必要がある。過去 30 日がスタートであれば圧縮は可能だが、start_offset が 60 days とかになっているとエラーになる。
SELECT add_continuous_aggregate_policy('cagg_name',
start_offset => INTERVAL '30 days',
end_offset => INTERVAL '1 day',
schedule_interval => INTERVAL '1 hour');
圧縮ポリシーは、compress_after パラメータが連続集計ポリシーの start_offset パラメータよりも大きい必要があります