🎃

Cloudflare Workersを使って、TiDB Serverless のインラインキャッシュとしてMomentoを使うAPIサンプル

2023/11/27に公開

https://zenn.dev/kameoncloud/articles/99d3ed9d5ce4fd
https://zenn.dev/kameoncloud/articles/38254ff162c712

過去複数の記事でWorkersからMomento CacheやTiDB Serverlessに接続する手法について紹介してきました。
今日はその合体版としてMomento Cache を TiDB Serverless に対するインラインキャッシュとして実装してみます。

インラインキャッシュとは

Amazon DynamoDB に対する DAX (Dynamo DB Accelerator)がインラインキャッシュの代表例です。日本語では「透過的キャッシュ」とも言われます。
GETオペレーションの場合、以下の流れで動作します。

  1. Cache にアイテムを取りに行く
    1-1.HIT → そのまま結果を戻す
    1-2.MISS → DBへ値を取りに行き、結果があれば値をCachにSet
    1-3.値を戻す

POSTオペレーションの場合、以下の流れで動作します。
2. DBへ検索
2-2. 検索結果があればUpdate
2-3. 検索結果がNULLであればInsert
2-4. Cache へ値を書き込み

更新系の場合、常にDBの処理を先行させてその後Cache操作を行います。DBは常にアプリケーション側からは高い整合性が期待されますが、Cacheは最悪の場合、リフレッシュができますし、強整合性
をそこまで求められないためです。

このインラインキャッシュは、DBとは別に配置する「サイドキャッシュ」に比べてコーディングがシンプルになり開発生産性を向上させます。

Workers でこのAPIを作ることに意味があるのか?

Workersはエッジコンピューティング基盤であり、一般的にパブリッククラウドの中よりユーザーに近い場所で動作します。このため、より低いレイテンシが求められるキャッシュのデータ操作を行う場所としては最適です。

一方今回のハンズオンシナリオで操作するMomento Cache および TiDB Serverless はAWS上で動作します。このため、Workersからそれらへのアクセスにはネットワークレイテンシが発生するため、Workers「単独で」でこのようなキャッシュ操作を行いたい場合、Workers用Cache APIを用いた方がよいでしょう。
https://developers.cloudflare.com/workers/runtime-apis/cache/
逆にAWS側のアプリケーションにエッジで動作した値を引き渡したい場合は、Workers用Cache APIではなくMomento Cacheの方が適しています。

さらに、Workersのコードは依存性が少ないJavaScript標準で実装できますので、サンプルとして皆さんが独自のインラインキャッシュを構築することに挑戦する際に役に立つかもしれません。

さっそくやってみる

手順が複雑で多くなりますので、まずはMomento Cache の実装、TiDB Serverlessの実装。の順番で進めていきます。

1. Momento Cache の実装

まずはこちらを終わらせておきます。
https://zenn.dev/kameoncloud/articles/38254ff162c712
本来MomentoはgRPCを用いることで通信の高速化も配慮されていますが、Workersからの接続はTCP or HTTP なので、今回はHTTP用SDKを用いています。

workers.tsを以下に置換します。

workers.ts
class MomentoFetcher {
	private readonly apiToken: string;
	private readonly baseurl: string;
	constructor(token: string, endpoint: string) {
		this.apiToken = token;
		this.baseurl = `${endpoint}/cache`;
	}

	async get(cacheName: string, key: string) {
		const resp = await fetch(`${this.baseurl}/${cacheName}?key=${key}&token=${this.apiToken}`);
		if (resp.status < 300) {
			console.log(`successfully retrieved ${key} from cache`)
		} else {
			console.log(`failed to retrieve item from cache: ${cacheName}`)
		}
		return await resp.text();
	}

	async set(cacheName: string, key: string, value: string, ttl_seconds: number = 30) {
		const resp = await fetch(`${this.baseurl}/${cacheName}?key=${key}&token=${this.apiToken}&&ttl_seconds=${ttl_seconds}`, {
			method: 'PUT',
			body: value
		});
		if (resp.status < 300) {
			console.log(`successfully set ${key} into cache`);
		} else {
			console.log(`failed to set item into cache message: ${resp.statusText} status: ${resp.status} cache: ${cacheName}`);
		}
		return;
	}
}

export interface Env {
	MOMENTO_AUTH_TOKEN: string;
	MOMENTO_REST_ENDPOINT: string;
	MOMENTO_CACHE_NAME: string;
}

export default {
	async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise<Response> {
		const client = new MomentoFetcher(env.MOMENTO_AUTH_TOKEN, env.MOMENTO_REST_ENDPOINT);
		const cache = env.MOMENTO_CACHE_NAME;

		const url = new URL(request.url);
		const params = new URLSearchParams(url.search);

		const value = params.get('value');
		const key = params.get('key');
		let rest = params.get('opt');

		if (rest == 'POST') {
			// setting a value into cache
			/* TiDB Serverless へのSELECTを実行。
			値があった場合、Updateを実行
			値がない場合、Insertを実行
			*/
			const setResp = await client.set(cache, key, value);
		} else if (rest == 'GET') {
			// getting a value from cache
			const getResp = await client.get(cache, key)
			if (getResp.includes('404')) {
				console.log("key was not found");
				rest = "key was not found at both DB/Cache";
				/*
				TiDB ServerlessへのSELECTを挿入
				*/
				/*
				値があった場合、Cache Getを実行してリターン
				値がない場合、NULLをリターン
				*/
			}
			else {
				console.log(getResp);
				rest = getResp;
			}
		} else {
			rest = "operation error";
		}
		return new Response(rest);
	},
};
https://<皆さん専用ドメイン>/?key=demo2&value=demo1&opt=GET

で呼び出します。
optパラメータはGETPOSTを受けとります。それ以外はoperation errorを戻します。
GETの場合:

  1. Cache への GET を実行
    値がある場合→そのままResponseで戻す
    値がない場合→TiDB ServerlessへのSELECTを実行。
    値がある場合→Cache へ SETを実行して、値をReponseで戻す
    値がない場合→Responseで'key was not found at both DB/Cache'を戻す

  2. Cache への POST を実行
    TiDB Serverless へのSELECを実行
    値がある場合→Updateを実行→CacheへSET
    値がない場合→Insertを実行→CacheへSET

CacheはUpdateとInsertの違いはなく、常に上書きが実行可能であり、ACID属性はDBへ担保させています。

2. TiDB Serverless の実装

https://zenn.dev/kameoncloud/articles/99d3ed9d5ce4fd
まずはこのシナリオを別の環境(別ディレクトリ) で流しておきます。
作成されたbookshopデータベースのusersテーブルを使用します。

先ほどまで作業していたディレクトリで以下を実行します。

npm install @tidbcloud/serverless

以下の文字列を組み立てておきます。

mysql://xxxxuserid.root:xxxxpassword@gateway01.eu-central-1.prod.aws.tidbcloud.com:4000/bookshop

環境変数に値をセットします。

wrangler secret put DATABASE_URL

workers.tsを以下の値で置換します。

import { connect } from '@tidbcloud/serverless'

class MomentoFetcher {
	private readonly apiToken: string;
	private readonly baseurl: string;
	constructor(token: string, endpoint: string) {
		this.apiToken = token;
		this.baseurl = `${endpoint}/cache`;
	}

	async get(cacheName: string, key: string) {
		const resp = await fetch(`${this.baseurl}/${cacheName}?key=${key}&token=${this.apiToken}`);
		if (resp.status < 300) {
			console.log(`successfully retrieved ${key} from cache`)
		} else {
			console.log(`failed to retrieve item from cache: ${cacheName}`)
		}
		return await resp.text();
	}

	async set(cacheName: string, key: string, value: string, ttl_seconds: number = 30) {
		const resp = await fetch(`${this.baseurl}/${cacheName}?key=${key}&token=${this.apiToken}&&ttl_seconds=${ttl_seconds}`, {
			method: 'PUT',
			body: value
		});

		if (resp.status < 300) {
			console.log(`successfully set ${key} into cache`);
		} else {
			console.log(`failed to set item into cache message: ${resp.statusText} status: ${resp.status} cache: ${cacheName}`);
		}
		return;
	}
}

export interface Env {
	DATABASE_URL: string;
 }

export interface Env {
	MOMENTO_AUTH_TOKEN: string;
	MOMENTO_REST_ENDPOINT: string;
	MOMENTO_CACHE_NAME: string;
}

export default {
	async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise<Response> {
		const conn = connect({url:env.DATABASE_URL})
		const resp = await conn.execute("show databases")
		console.log(JSON.stringify(resp));

		const client = new MomentoFetcher(env.MOMENTO_AUTH_TOKEN, env.MOMENTO_REST_ENDPOINT);
		const cache = env.MOMENTO_CACHE_NAME;

		const url = new URL(request.url);
		const params = new URLSearchParams(url.search);

		const value = params.get('value');
		const key = params.get('key');
		let rest = params.get('opt');

		if (rest == 'POST') {
			// setting a value into cache
			/* TiDB Serverless へのSELECTを実行。
			値があった場合、Updateを実行
			値がない場合、Insertを実行
			*/
			const setResp = await client.set(cache, key, value);
		} else if (rest == 'GET') {
			// getting a value from cache
			const getResp = await client.get(cache, key)
			if (getResp.includes('404')) {
				console.log("key was not found");
				rest = "key was not found at both DB/Cache";
				/*
				TiDB ServerlessへのSELECTを挿入
				*/
				/*
				値があった場合、Cache Getを実行してリターン
				値がない場合、NULLをリターン
				*/
			}
			else {
				console.log(getResp);
				rest = getResp;
			}
		} else {
			rest = "operation error";
		}
		return new Response(JSON.stringify(resp));
	},
};

npx wrangler deployでデプロイを行います。
https://<皆さん専用ドメイン>/?key=demo2&value=demo1&opt=GETで呼び出すと以下が表示されるはずです。

[{"Database":"INFORMATION_SCHEMA"},{"Database":"PERFORMANCE_SCHEMA"},{"Database":"bookshop"},{"Database":"mysql"},{"Database":"test"}]

(値はちょっとぐらい異なっても何か近しいJSONが出力されていれば大丈夫です)

workers.tsを以下に置換します。

workers.ts
import { connect } from '@tidbcloud/serverless'

class MomentoFetcher {
	private readonly apiToken: string;
	private readonly baseurl: string;
	constructor(token: string, endpoint: string) {
		this.apiToken = token;
		this.baseurl = `${endpoint}/cache`;
	}

	async get(cacheName: string, key: string) {
		const resp = await fetch(`${this.baseurl}/${cacheName}?key=${key}&token=${this.apiToken}`);
		if (resp.status < 300) {
			console.log(`successfully retrieved ${key} from cache`)
		} else {
			console.log(`failed to retrieve item from cache: ${cacheName}`)
		}
		return await resp.text();
	}

	async set(cacheName: string, key: string, value: string, ttl_seconds: number = 30) {
		const resp = await fetch(`${this.baseurl}/${cacheName}?key=${key}&token=${this.apiToken}&&ttl_seconds=${ttl_seconds}`, {
			method: 'PUT',
			body: value
		});

		if (resp.status < 300) {
			console.log(`successfully set ${key} into cache`);
		} else {
			console.log(`failed to set item into cache message: ${resp.statusText} status: ${resp.status} cache: ${cacheName}`);
		}
		return;
	}
}

export interface Env {
	DATABASE_URL: string;
}

export interface Env {
	MOMENTO_AUTH_TOKEN: string;
	MOMENTO_REST_ENDPOINT: string;
	MOMENTO_CACHE_NAME: string;
}

export default {
	async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise<Response> {
		const conn = connect({ url: env.DATABASE_URL });

		const client = new MomentoFetcher(env.MOMENTO_AUTH_TOKEN, env.MOMENTO_REST_ENDPOINT);
		const cache = env.MOMENTO_CACHE_NAME;

		const url = new URL(request.url);
		const params = new URLSearchParams(url.search);

		const value = params.get('value');
		const key = params.get('key');
		let rest = params.get('opt');
		let result;

		if (rest == 'POST') {
			/* TiDB Serverless へのSELECTを実行。*/
			let sql = "select * from `bookshop`.`users` where nickname ='" + key + "';";
			let dbresp = await conn.execute(sql);
			result = JSON.stringify(dbresp);

			if (result.includes(key)) {
				/* 値があった場合、Updateを実行 */
				sql = "update bookshop.users set balance = '" + value + "' where nickname = '" + key + "';";
				console.log(sql);
				dbresp = await conn.execute(sql);
				result = "executed sql:" + sql;
			}
			else {
				/* 値がない場合、Insertを実行 */
				sql = "insert into bookshop.users (id,nickname,balance) values (" + value + ",'" + key + "'," + value + ");";
				console.log(sql)
				dbresp = await conn.execute(sql);
				result = "executed sql:" + sql;
			};
			/*DB処理が終わったらCacheを更新*/
			const setResp = await client.set(cache, key, value);

		} else if (rest == 'GET') {
			const getResp = await client.get(cache, key)
			let sql;
			if (getResp.includes('404')) { /* cache missの場合
				console.log("key was not found");
				/*
				TiDB ServerlessへのSELECTを挿入
				*/
				sql = "select * from bookshop.users where nickname = '" + key + "';";
				console.log(sql);
				let dbresp = await conn.execute(sql);
				console.log(dbresp)
				/*
				値があった場合、Cache Setを実行してリターン
				値がない場合、NULLをリターン
				*/
				if (dbresp.length !== 0) {
					await client.set(cache, key, value);
					result = sql + ": cache set"
				} else {
					result = sql + ":cache miss";
				}
			}
			else { /* cache hitの場合 */
				console.log(getResp);
				result = "Cache Hit; value is " + getResp;
			}
		} else {
			result = "operation error";
		}
		return new Response(JSON.stringify(result));
	},
};

GET,POSTともに以下の動作をします。
GET

  1. Cache にアイテムを取りに行く
    1-1.HIT → そのまま結果を戻す
    1-2.MISS → DBへ値を取りに行き、結果があれば値をCachにSet
    1-3.値を戻す

POSTオペレーションの場合、以下の流れで動作します。
2. DBへ検索
2-2. 検索結果があればUpdate
2-3. 検索結果がNULLであればInsert
2-4. Cache へ値を書き込み

Discussion