Cloudflare Workersを使って、TiDB Serverless のインラインキャッシュとしてMomentoを使うAPIサンプル
過去複数の記事でWorkersからMomento CacheやTiDB Serverlessに接続する手法について紹介してきました。
今日はその合体版としてMomento Cache を TiDB Serverless に対するインラインキャッシュとして実装してみます。
インラインキャッシュとは
Amazon DynamoDB に対する DAX (Dynamo DB Accelerator)がインラインキャッシュの代表例です。日本語では「透過的キャッシュ」とも言われます。
GET
オペレーションの場合、以下の流れで動作します。
- Cache にアイテムを取りに行く
1-1.HIT → そのまま結果を戻す
1-2.MISS → DBへ値を取りに行き、結果があれば値をCachにSet
1-3.値を戻す
POST
オペレーションの場合、以下の流れで動作します。
2. DBへ検索
2-2. 検索結果があればUpdate
2-3. 検索結果がNULLであればInsert
2-4. Cache へ値を書き込み
更新系の場合、常にDBの処理を先行させてその後Cache操作を行います。DBは常にアプリケーション側からは高い整合性が期待されますが、Cacheは最悪の場合、リフレッシュができますし、強整合性
をそこまで求められないためです。
このインラインキャッシュは、DBとは別に配置する「サイドキャッシュ」に比べてコーディングがシンプルになり開発生産性を向上させます。
Workers でこのAPIを作ることに意味があるのか?
Workersはエッジコンピューティング基盤であり、一般的にパブリッククラウドの中よりユーザーに近い場所で動作します。このため、より低いレイテンシが求められるキャッシュのデータ操作を行う場所としては最適です。
一方今回のハンズオンシナリオで操作するMomento Cache および TiDB Serverless はAWS上で動作します。このため、Workersからそれらへのアクセスにはネットワークレイテンシが発生するため、Workers「単独で」でこのようなキャッシュ操作を行いたい場合、Workers用Cache APIを用いた方がよいでしょう。
逆にAWS側のアプリケーションにエッジで動作した値を引き渡したい場合は、Workers用Cache APIではなくMomento Cacheの方が適しています。さらに、Workersのコードは依存性が少ないJavaScript標準で実装できますので、サンプルとして皆さんが独自のインラインキャッシュを構築することに挑戦する際に役に立つかもしれません。
さっそくやってみる
手順が複雑で多くなりますので、まずはMomento Cache の実装、TiDB Serverlessの実装。の順番で進めていきます。
1. Momento Cache の実装
まずはこちらを終わらせておきます。
本来MomentoはgRPCを用いることで通信の高速化も配慮されていますが、Workersからの接続はTCP or HTTP なので、今回はHTTP用SDKを用いています。workers.ts
を以下に置換します。
class MomentoFetcher {
private readonly apiToken: string;
private readonly baseurl: string;
constructor(token: string, endpoint: string) {
this.apiToken = token;
this.baseurl = `${endpoint}/cache`;
}
async get(cacheName: string, key: string) {
const resp = await fetch(`${this.baseurl}/${cacheName}?key=${key}&token=${this.apiToken}`);
if (resp.status < 300) {
console.log(`successfully retrieved ${key} from cache`)
} else {
console.log(`failed to retrieve item from cache: ${cacheName}`)
}
return await resp.text();
}
async set(cacheName: string, key: string, value: string, ttl_seconds: number = 30) {
const resp = await fetch(`${this.baseurl}/${cacheName}?key=${key}&token=${this.apiToken}&&ttl_seconds=${ttl_seconds}`, {
method: 'PUT',
body: value
});
if (resp.status < 300) {
console.log(`successfully set ${key} into cache`);
} else {
console.log(`failed to set item into cache message: ${resp.statusText} status: ${resp.status} cache: ${cacheName}`);
}
return;
}
}
export interface Env {
MOMENTO_AUTH_TOKEN: string;
MOMENTO_REST_ENDPOINT: string;
MOMENTO_CACHE_NAME: string;
}
export default {
async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise<Response> {
const client = new MomentoFetcher(env.MOMENTO_AUTH_TOKEN, env.MOMENTO_REST_ENDPOINT);
const cache = env.MOMENTO_CACHE_NAME;
const url = new URL(request.url);
const params = new URLSearchParams(url.search);
const value = params.get('value');
const key = params.get('key');
let rest = params.get('opt');
if (rest == 'POST') {
// setting a value into cache
/* TiDB Serverless へのSELECTを実行。
値があった場合、Updateを実行
値がない場合、Insertを実行
*/
const setResp = await client.set(cache, key, value);
} else if (rest == 'GET') {
// getting a value from cache
const getResp = await client.get(cache, key)
if (getResp.includes('404')) {
console.log("key was not found");
rest = "key was not found at both DB/Cache";
/*
TiDB ServerlessへのSELECTを挿入
*/
/*
値があった場合、Cache Getを実行してリターン
値がない場合、NULLをリターン
*/
}
else {
console.log(getResp);
rest = getResp;
}
} else {
rest = "operation error";
}
return new Response(rest);
},
};
https://<皆さん専用ドメイン>/?key=demo2&value=demo1&opt=GET
で呼び出します。
opt
パラメータはGET
かPOST
を受けとります。それ以外はoperation error
を戻します。
GET
の場合:
-
Cache への GET を実行
値がある場合→そのままResponseで戻す
値がない場合→TiDB ServerlessへのSELECTを実行。
値がある場合→Cache へ SETを実行して、値をReponseで戻す
値がない場合→Responseで'key was not found at both DB/Cache'を戻す -
Cache への POST を実行
TiDB Serverless へのSELECを実行
値がある場合→Updateを実行→CacheへSET
値がない場合→Insertを実行→CacheへSET
CacheはUpdateとInsertの違いはなく、常に上書きが実行可能であり、ACID属性はDBへ担保させています。
2. TiDB Serverless の実装
作成されたbookshop
データベースのusers
テーブルを使用します。
先ほどまで作業していたディレクトリで以下を実行します。
npm install @tidbcloud/serverless
以下の文字列を組み立てておきます。
mysql://xxxxuserid.root:xxxxpassword@gateway01.eu-central-1.prod.aws.tidbcloud.com:4000/bookshop
環境変数に値をセットします。
wrangler secret put DATABASE_URL
workers.ts
を以下の値で置換します。
import { connect } from '@tidbcloud/serverless'
class MomentoFetcher {
private readonly apiToken: string;
private readonly baseurl: string;
constructor(token: string, endpoint: string) {
this.apiToken = token;
this.baseurl = `${endpoint}/cache`;
}
async get(cacheName: string, key: string) {
const resp = await fetch(`${this.baseurl}/${cacheName}?key=${key}&token=${this.apiToken}`);
if (resp.status < 300) {
console.log(`successfully retrieved ${key} from cache`)
} else {
console.log(`failed to retrieve item from cache: ${cacheName}`)
}
return await resp.text();
}
async set(cacheName: string, key: string, value: string, ttl_seconds: number = 30) {
const resp = await fetch(`${this.baseurl}/${cacheName}?key=${key}&token=${this.apiToken}&&ttl_seconds=${ttl_seconds}`, {
method: 'PUT',
body: value
});
if (resp.status < 300) {
console.log(`successfully set ${key} into cache`);
} else {
console.log(`failed to set item into cache message: ${resp.statusText} status: ${resp.status} cache: ${cacheName}`);
}
return;
}
}
export interface Env {
DATABASE_URL: string;
}
export interface Env {
MOMENTO_AUTH_TOKEN: string;
MOMENTO_REST_ENDPOINT: string;
MOMENTO_CACHE_NAME: string;
}
export default {
async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise<Response> {
const conn = connect({url:env.DATABASE_URL})
const resp = await conn.execute("show databases")
console.log(JSON.stringify(resp));
const client = new MomentoFetcher(env.MOMENTO_AUTH_TOKEN, env.MOMENTO_REST_ENDPOINT);
const cache = env.MOMENTO_CACHE_NAME;
const url = new URL(request.url);
const params = new URLSearchParams(url.search);
const value = params.get('value');
const key = params.get('key');
let rest = params.get('opt');
if (rest == 'POST') {
// setting a value into cache
/* TiDB Serverless へのSELECTを実行。
値があった場合、Updateを実行
値がない場合、Insertを実行
*/
const setResp = await client.set(cache, key, value);
} else if (rest == 'GET') {
// getting a value from cache
const getResp = await client.get(cache, key)
if (getResp.includes('404')) {
console.log("key was not found");
rest = "key was not found at both DB/Cache";
/*
TiDB ServerlessへのSELECTを挿入
*/
/*
値があった場合、Cache Getを実行してリターン
値がない場合、NULLをリターン
*/
}
else {
console.log(getResp);
rest = getResp;
}
} else {
rest = "operation error";
}
return new Response(JSON.stringify(resp));
},
};
npx wrangler deploy
でデプロイを行います。
https://<皆さん専用ドメイン>/?key=demo2&value=demo1&opt=GET
で呼び出すと以下が表示されるはずです。
[{"Database":"INFORMATION_SCHEMA"},{"Database":"PERFORMANCE_SCHEMA"},{"Database":"bookshop"},{"Database":"mysql"},{"Database":"test"}]
(値はちょっとぐらい異なっても何か近しいJSONが出力されていれば大丈夫です)
workers.ts
を以下に置換します。
import { connect } from '@tidbcloud/serverless'
class MomentoFetcher {
private readonly apiToken: string;
private readonly baseurl: string;
constructor(token: string, endpoint: string) {
this.apiToken = token;
this.baseurl = `${endpoint}/cache`;
}
async get(cacheName: string, key: string) {
const resp = await fetch(`${this.baseurl}/${cacheName}?key=${key}&token=${this.apiToken}`);
if (resp.status < 300) {
console.log(`successfully retrieved ${key} from cache`)
} else {
console.log(`failed to retrieve item from cache: ${cacheName}`)
}
return await resp.text();
}
async set(cacheName: string, key: string, value: string, ttl_seconds: number = 30) {
const resp = await fetch(`${this.baseurl}/${cacheName}?key=${key}&token=${this.apiToken}&&ttl_seconds=${ttl_seconds}`, {
method: 'PUT',
body: value
});
if (resp.status < 300) {
console.log(`successfully set ${key} into cache`);
} else {
console.log(`failed to set item into cache message: ${resp.statusText} status: ${resp.status} cache: ${cacheName}`);
}
return;
}
}
export interface Env {
DATABASE_URL: string;
}
export interface Env {
MOMENTO_AUTH_TOKEN: string;
MOMENTO_REST_ENDPOINT: string;
MOMENTO_CACHE_NAME: string;
}
export default {
async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise<Response> {
const conn = connect({ url: env.DATABASE_URL });
const client = new MomentoFetcher(env.MOMENTO_AUTH_TOKEN, env.MOMENTO_REST_ENDPOINT);
const cache = env.MOMENTO_CACHE_NAME;
const url = new URL(request.url);
const params = new URLSearchParams(url.search);
const value = params.get('value');
const key = params.get('key');
let rest = params.get('opt');
let result;
if (rest == 'POST') {
/* TiDB Serverless へのSELECTを実行。*/
let sql = "select * from `bookshop`.`users` where nickname ='" + key + "';";
let dbresp = await conn.execute(sql);
result = JSON.stringify(dbresp);
if (result.includes(key)) {
/* 値があった場合、Updateを実行 */
sql = "update bookshop.users set balance = '" + value + "' where nickname = '" + key + "';";
console.log(sql);
dbresp = await conn.execute(sql);
result = "executed sql:" + sql;
}
else {
/* 値がない場合、Insertを実行 */
sql = "insert into bookshop.users (id,nickname,balance) values (" + value + ",'" + key + "'," + value + ");";
console.log(sql)
dbresp = await conn.execute(sql);
result = "executed sql:" + sql;
};
/*DB処理が終わったらCacheを更新*/
const setResp = await client.set(cache, key, value);
} else if (rest == 'GET') {
const getResp = await client.get(cache, key)
let sql;
if (getResp.includes('404')) { /* cache missの場合
console.log("key was not found");
/*
TiDB ServerlessへのSELECTを挿入
*/
sql = "select * from bookshop.users where nickname = '" + key + "';";
console.log(sql);
let dbresp = await conn.execute(sql);
console.log(dbresp)
/*
値があった場合、Cache Setを実行してリターン
値がない場合、NULLをリターン
*/
if (dbresp.length !== 0) {
await client.set(cache, key, value);
result = sql + ": cache set"
} else {
result = sql + ":cache miss";
}
}
else { /* cache hitの場合 */
console.log(getResp);
result = "Cache Hit; value is " + getResp;
}
} else {
result = "operation error";
}
return new Response(JSON.stringify(result));
},
};
GET
,POST
ともに以下の動作をします。
GET
- Cache にアイテムを取りに行く
1-1.HIT → そのまま結果を戻す
1-2.MISS → DBへ値を取りに行き、結果があれば値をCachにSet
1-3.値を戻す
POST
オペレーションの場合、以下の流れで動作します。
2. DBへ検索
2-2. 検索結果があればUpdate
2-3. 検索結果がNULLであればInsert
2-4. Cache へ値を書き込み
Discussion