Timestream + Lambdaでクエリ結果をキャッシュしてページネーションしたい

2022/09/12に公開

動機

Amazon Timestreamでは、クエリのスキャンデータ量に応じて課金が発生するので、効率的なクエリをしたいです。取得結果をページングするとき、都度スキャンして課金されることを回避したいので、Lambdaでキャッシュできないか、やってみました。

前提

  • データはLambda(Python)でクエリする
    • Lambda Python 3.8 で確認
  • 別ページを取得するときは、Lambdaを再実行する。その際、Lambdaのキャッシュを再利用してスキャンを回避する
  • 1度のLambda実行の中でのページングは今回は対象外

未検証

  • キャッシュの保持時間
  • メモリ使用量
  • クライアントの同一性(他の人が同じLambdaでアクセスするとどうなるか)

課金

微少に課金されるかもしれません。検証が終わったらTimestreamは削除をお忘れなく。

Timestream準備

DB, Table

公式のチュートリアルのように、TimestreamのSample Databaseを使います。 Create database -> Sample database Name: sampleDB -> IoT, Multi-measure recordsの順に選びました。

データベース名: 'sampleDB'
テーブル名: 'IoTMulti'

Query

サンプルデータのtimeの期間は、自分がそのサンプルを作った時間に連動しています。

クエリエディタの例では、between ago(15m) and now()が最初に出てきますが、時間が経つと合わなくなるので、タイムスタンプで直接指定したいです。

timeの最初と最後を、クエリエディタ見てみます。(400行なので全クエリ)

SELECT * FROM "sampleDB"."IoTMulti" ORDER BY time DESC 

今回は、この期間のデータが入っていました。

2022-09-10 06:13:23.406000000
2022-09-10 11:24:51.489000000

クエリをタイムスタンプで書くと、このようになります。クォーテーションのダブルとシングルに注意。あまり時間幅はいらないので、15分だけの区間にしています。

SELECT * FROM "sampleDB"."IoTMulti" WHERE time BETWEEN TIMESTAMP '2022-09-10 11:00:00' AND TIMESTAMP '2022-09-10 11:25:00' ORDER BY time DESC LIMIT 3

クエリエディタで実行すると、この3レコードが取れます。

この3つのレコードをLambda関数で取り出すことを考えます。

Lambda

Lambdaを作成します。(Python3.8)

権限の追加
Configuration -> Permissions -> Roleを選択 -> Add policy -> と進んで、AmazonTimestreamFullAccess を追加

コード

  • コードの大部分は公式githubから拝借しています。
  • キャッシュはこちらを参考にしました。

実行方法

ページングのパラメータ

任意です。今回は単純にするため以下のようにしています。

  • page_size (1ページの項目数)は1
  • クエリのLIMITは3

TestのEventJSON

Lambdaデプロイ後、TestのEvent JSONにpage_numを入れて実行します。

Event JSON
{
  "page_num": 1
}
  • 注意: ページ番号は0ではなく、1から始まります

Lambda出力

以下は、page_numを変えたときのコンソールのLambdaの出力(Function Logs)部分です。

  • page_numは1,2,3を何回やってもそれぞれ同じ結果になります
  • Data: の部分のデータは上記のTimestreamのクエリエディタでの結果のキャプチャと対応しており、所望のデータが1つずつ取り出せています
  • page_num=1 でキャッシュ作成(++++ create cache ++++を通る)、page_num=2,3ではキャッシュを見つけて(==== cache found! ====を通る)、それを使っています
  • <class 'botocore.paginate.PageIterator'> のキャッシュを保存していますが、それを使うと消えてしまうので、複製を itertools.teeで作ってreturn、使用しています
  • cache idは使っているキャッシュのid、QueryId以下Data scanned so farData metered so farデータは最初にクエリした結果のものを使うので同じものが出てきていて正しい結果だとわかります
  • 課金されるのは、page_iterator = self.paginator.paginate( QueryString=query_string, PaginationConfig={'PageSize': 1})を実行したときのはずなので、キャッシュ後はそこを回避できています
page_num=1
Function Logs
START RequestId: 09b23942-4992-4f6a-baf9-96486551d3e1 Version: $LATEST
++++ create cache ++++
cache id: 0x7f652add7e40
page count:  1
QueryId: AEIACANG4RNC4QWVCKUQWTNAXLD4VRRONVW5AEFGBZTHPYSA2BU5IRBP6ABNT5A
Query progress so far: 100.0%
Data scanned so far: 0.0028438568115234375 MB
Data metered so far: 9.5367431640625 MB
Metadata: [{'Name': 'fleet', 'Type': {'ScalarType': 'VARCHAR'}}, {'Name': 'truck_id', 'Type': {'ScalarType': 'VARCHAR'}}, {'Name': 'fuel_capacity', 'Type': {'ScalarType': 'VARCHAR'}}, {'Name': 'model', 'Type': {'ScalarType': 'VARCHAR'}}, {'Name': 'load_capacity', 'Type': {'ScalarType': 'VARCHAR'}}, {'Name': 'make', 'Type': {'ScalarType': 'VARCHAR'}}, {'Name': 'measure_name', 'Type': {'ScalarType': 'VARCHAR'}}, {'Name': 'time', 'Type': {'ScalarType': 'TIMESTAMP'}}, {'Name': 'load', 'Type': {'ScalarType': 'DOUBLE'}}, {'Name': 'fuel-reading', 'Type': {'ScalarType': 'DOUBLE'}}, {'Name': 'location', 'Type': {'ScalarType': 'VARCHAR'}}, {'Name': 'speed', 'Type': {'ScalarType': 'DOUBLE'}}]
Data: 
{['fleet=Alpha', 'truck_id=3496454495', 'fuel_capacity=150', 'model=W925', 'load_capacity=1000', 'make=Kenworth', 'measure_name=IoTMulti-stats', 'time=2022-09-10 11:24:51.489000000', 'load=898.0', 'fuel-reading=30.46256571501942', 'location=36.1627° N, 86.7816° W', 'speed=1.0']}
END RequestId: 09b23942-4992-4f6a-baf9-96486551d3e1
REPORT RequestId: 09b23942-4992-4f6a-baf9-96486551d3e1	Duration: 2651.98 ms	Billed Duration: 2652 ms	Memory Size: 128 MB	Max Memory Used: 65 MB	Init Duration: 265.41 ms
page_num=2
Function Logs
START RequestId: be7ebdcc-a9a1-4d51-96e5-2abe2bd6e433 Version: $LATEST
==== cache found! ==== 
cache id: 0x7f652add7e40
page count:  2
QueryId: AEIACANG4RNC4QWVCKUQWTNAXLD4VRRONVW5AEFGBZTHPYSA2BU5IRBP6ABNT5A
Query progress so far: 100.0%
Data scanned so far: 0.0028438568115234375 MB
Data metered so far: 9.5367431640625 MB
Metadata: [{'Name': 'fleet', 'Type': {'ScalarType': 'VARCHAR'}}, {'Name': 'truck_id', 'Type': {'ScalarType': 'VARCHAR'}}, {'Name': 'fuel_capacity', 'Type': {'ScalarType': 'VARCHAR'}}, {'Name': 'model', 'Type': {'ScalarType': 'VARCHAR'}}, {'Name': 'load_capacity', 'Type': {'ScalarType': 'VARCHAR'}}, {'Name': 'make', 'Type': {'ScalarType': 'VARCHAR'}}, {'Name': 'measure_name', 'Type': {'ScalarType': 'VARCHAR'}}, {'Name': 'time', 'Type': {'ScalarType': 'TIMESTAMP'}}, {'Name': 'load', 'Type': {'ScalarType': 'DOUBLE'}}, {'Name': 'fuel-reading', 'Type': {'ScalarType': 'DOUBLE'}}, {'Name': 'location', 'Type': {'ScalarType': 'VARCHAR'}}, {'Name': 'speed', 'Type': {'ScalarType': 'DOUBLE'}}]
Data: 
{['fleet=Alpha', 'truck_id=1234546252', 'fuel_capacity=150', 'model=W925', 'load_capacity=1000', 'make=Kenworth', 'measure_name=IoTMulti-stats', 'time=2022-09-10 11:22:08.592000000', 'load=253.0', 'fuel-reading=59.18411447916667', 'location=44.9537° N, 93.0900° W', 'speed=65.0']}
END RequestId: be7ebdcc-a9a1-4d51-96e5-2abe2bd6e433
REPORT RequestId: be7ebdcc-a9a1-4d51-96e5-2abe2bd6e433	Duration: 133.03 ms	Billed Duration: 134 ms	Memory Size: 128 MB	Max Memory Used: 66 MB
page_num=3
Function Logs
START RequestId: f185c4c0-b035-42ba-a703-04e1e799aad6 Version: $LATEST
==== cache found! ==== 
cache id: 0x7f652add7e40
page count:  3
QueryId: AEIACANG4RNC4QWVCKUQWTNAXLD4VRRONVW5AEFGBZTHPYSA2BU5IRBP6ABNT5A
Query progress so far: 100.0%
Data scanned so far: 0.0028438568115234375 MB
Data metered so far: 9.5367431640625 MB
Metadata: [{'Name': 'fleet', 'Type': {'ScalarType': 'VARCHAR'}}, {'Name': 'truck_id', 'Type': {'ScalarType': 'VARCHAR'}}, {'Name': 'fuel_capacity', 'Type': {'ScalarType': 'VARCHAR'}}, {'Name': 'model', 'Type': {'ScalarType': 'VARCHAR'}}, {'Name': 'load_capacity', 'Type': {'ScalarType': 'VARCHAR'}}, {'Name': 'make', 'Type': {'ScalarType': 'VARCHAR'}}, {'Name': 'measure_name', 'Type': {'ScalarType': 'VARCHAR'}}, {'Name': 'time', 'Type': {'ScalarType': 'TIMESTAMP'}}, {'Name': 'load', 'Type': {'ScalarType': 'DOUBLE'}}, {'Name': 'fuel-reading', 'Type': {'ScalarType': 'DOUBLE'}}, {'Name': 'location', 'Type': {'ScalarType': 'VARCHAR'}}, {'Name': 'speed', 'Type': {'ScalarType': 'DOUBLE'}}]
Data: 
{['fleet=Alpha', 'truck_id=2062792987', 'fuel_capacity=150', 'model=C-600', 'load_capacity=1000', 'make=Ford', 'measure_name=IoTMulti-stats', 'time=2022-09-10 11:22:03.132000000', 'load=388.0', 'fuel-reading=131.05155378799626', 'location=46.8083° N, 100.7837° W', 'speed=21.0']}
END RequestId: f185c4c0-b035-42ba-a703-04e1e799aad6
REPORT RequestId: f185c4c0-b035-42ba-a703-04e1e799aad6	Duration: 148.70 ms	Billed Duration: 149 ms	Memory Size: 128 MB	Max Memory Used: 66 MB

まとめ

  • TimestreamのデータをLabmdaで取ってきて、ページネーションのためにキャッシュすることができました

その後

技としては面白いと思ったのですが、いらなくなりました。

TODO

  • 本当にできているか(課金が回避されているか)検証が必要
  • 理想としては、最小の課金範囲の10MBクエリをして、その結果をページングしたい そもそもクエリの上限が1MBでした。

Discussion