Etcdをアプリケーションで活用する - Lease、Watch、Mutexを使った実践的な実装
はじめに
こんにちは、PortalKey CTOの植森です。
EtcdはKubernetesのバックエンドストレージとして広く知られていますが、アプリケーションレベルでの活用事例を調べてもあまり多くありません。
しかし、Etcdは分散システム向けに設計された強力なKVSであり、Lease、Watch、Mutex、条件付きトランザクションといった特徴的な機能を持っており、これらを活用することで堅牢さと柔軟さを兼ね備えたシステムを実装することができます。
PortalKeyでは、リアルタイム通信アプリケーションのセッション管理や状態監視にEtcdを活用しており、今回はEtcdの特徴的な機能とその実践的な活用方法を紹介します。
Etcdとは
Etcdは、CoreOS(現在はRed Hat)によって開発されたオープンソースの分散Key-Valueストアです。名前の由来は「/etc」ディレクトリと「distributed」を組み合わせたもので、分散システムにおける設定情報の管理を目的として設計されました。
Etcdの主な特徴
Etcdは特にKubernetesのバックエンドストレージとして有名ですが、独自の強みを活かしてアプリケーションの実装にも活用できる汎用的な分散データストレージです。
Redisのような従来のKVSの機能をそのまま利用できるだけでなく、分散システム向けの機能も備えています。
1. 強一貫性
EtcdはRaftコンセンサスアルゴリズムを採用しており、分散環境においても強一貫性を保証します。これにより、すべてのノードで同じデータを読み取ることができます。
2. 高可用性
クラスタ構成により、一部のノードが停止してもサービスを継続できます。一般的に3台または5台の奇数台構成で運用されます。
3. 監視機能(Watch)
特定のキーやキープレフィックスの変更をリアルタイムで監視できる機能を提供します。これにより、設定変更やデータ更新を即座に検知できます。
4. TTL機能(Lease)
データに有効期限を設定し、自動的に削除する機能を提供します。セッション管理や一時的なデータの管理に有効です。
5. 条件付きトランザクション
If/Then/Elseを使った条件付きの操作を原子的に実行できます。楽観的排他制御の実装に活用できます。
6. 分散ロック(Mutex)
分散環境での排他制御を実現するMutex機能を提供します。
これらの機能を活用することによって、他のKVSでは難しい機能の実装を実現できます。
基本的なGet/Put操作
まず、基本的なGet/Putの操作は以下のようなコードです。
// 基本的な接続とGet/Put操作
clientConfig := clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
}
etcd, err := clientv3.New(clientConfig)
if err != nil {
return err
}
// データの保存
_, err = etcd.Put(ctx, "key", "value")
// データの取得
resp, err := etcd.Get(ctx, "key")
Redisとの機能比較
一貫性モデルの違い
Etcdは強一貫性を保証するため、すべてのノードで常に同じデータを読み取ることができます。この一貫性を担保する重要な仕組みがリビジョン(Revision)です。
Etcdでは、クラスタ全体で変更が発生するたびにグローバルなリビジョン番号がインクリメンタルされます。このリビジョンは以下の特徴を持ちます:
- グローバル性: クラスタ全体で共有される単一の値
- 単調増加: 変更のたびに必ず増加し、巻き戻ることがない
- 順序保証: リビジョンの順序は変更の時系列順序と一致
// リビジョンを指定した読み取り例
resp, err := etcd.Get(ctx, "key", clientv3.WithRev(100))
// リビジョン100時点でのデータを取得
// 現在のリビジョンを取得
resp, err := etcd.Get(ctx, "key")
currentRevision := resp.Header.Revision
この仕組みにより、どのノードからアクセスしても同じリビジョンでは必ず同じデータが読み取れることが保証されます。また、条件付きトランザクションでもこのリビジョンを活用して楽観的排他制御を実現しています。
一方、Redisは結果整合性モデルを採用しており、レプリケーション遅延により一時的にノード間でデータが異なる場合があります。これにより、Etcdは金融システムや重要な設定管理など、データの整合性が重要なシステムに適しています。
分散アーキテクチャの違い
Etcdは設計段階から分散システムとして作られており、Raftアルゴリズムによる自動的なリーダー選出やフェイルオーバーを提供します。Redisもクラスタ機能を持ちますが、後から追加された機能であり、運用の複雑さがあります。
変更監視機能の違い
EtcdのWatch機能は特定のキーやプレフィックスの変更を効率的に監視でき、変更前後の値も取得できます。RedisにはEtcdと違いPub/Sub機能が存在し、汎用的なメッセージング機能として利用できますが、メッセージの永続化や配信保証はありません。
TTL管理の違い
EtcdのLease機能では、複数のキーを同一のLeaseに関連付けることができ、KeepAliveによる動的な延長も可能です。RedisのEXPIREは個別のキーに対してのみ設定でき、より単純な仕組みです。
トランザクションの違い
Etcdの条件付きトランザクションは、キーの存在やリビジョンを条件として複雑な操作をアトミックに実行できます。Redisにはトランザクションの代替として複数のコマンドを一括で実行するパイプライン機能としてMULTI/EXECが存在しており、これは単純なコマンドの一括実行に適しています。
Lease機能によるTTL管理
ここからはEtcdの各機能を一つずつ見ていきます。
Etcdの最も特徴的な機能の一つがLease機能です。これは、データに自動的な有効期限(TTL: Time To Live)を設定し、期限が切れると自動的にデータを削除する仕組みです。
TTLの機能は他KVSにも存在しますが、Etcdの場合Leaseというキーとは別のリソースで管理されます。
RedisなどのKVSと異なる点として、Leaseには複数のキーを紐づけることが可能で、Leaseが削除されたり期限が切れたりした場合、紐づけられているすべてのキーが同時に削除されます。
Leaseの基本概念
Grant(リース作成)
LeaseはGrant操作によって作成され、秒単位でTTLを指定します。作成されたLeaseには一意のIDが割り当てられます。
Lease ID
各Leaseには一意のIDが割り当てられ、複数のキーを同一のLeaseに関連付けることができます。Leaseが期限切れになると、関連付けられたすべてのキーが自動的に削除されます。
TTL(Time To Live)
Leaseの有効期限で、秒単位で指定します。TTLが0になるとLeaseは期限切れとなり、関連するデータが削除されます。
基本的な使用例
// 1. Leaseを作成(TTL: 30秒)
leaseResp, err := etcd.Grant(ctx, 30)
if err != nil {
return err
}
leaseID := leaseResp.ID
// 2. データをLeaseと関連付けて保存
_, err = etcd.Put(ctx, "session/user123", "session_data", clientv3.WithLease(leaseID))
if err != nil {
return err
}
// 3. 複数のキーを同一のLeaseに関連付け
_, err = etcd.Put(ctx, "session/user123/workspace1", "workspace_data", clientv3.WithLease(leaseID))
_, err = etcd.Put(ctx, "session/user123/workspace2", "workspace_data", clientv3.WithLease(leaseID))
// 30秒後、すべてのキーが自動的に削除される
Leaseの管理操作
// Leaseの残り時間を確認
ttlResp, err := etcd.TimeToLive(ctx, leaseID)
if err != nil {
return err
}
fmt.Printf("残り時間: %d秒\n", ttlResp.TTL)
// Leaseを手動で削除(関連するすべてのキーも削除される)
_, err = etcd.Revoke(ctx, leaseID)
if err != nil {
return err
}
Leaseの利点
1. 自動的なリソース管理
TTLによる自動削除により、明示的な削除処理を忘れてもリソースリークを防げます。
2. 複数キーの一括管理
一つのLeaseに複数のキーを関連付けることで、関連するデータを一括で管理できます。
3. アトミックな削除
Leaseの期限切れまたは削除により、関連するすべてのキーが原子的に削除されます。
この機能は、セッション管理、一時的なロック、キャッシュデータなど、有効期限のあるデータの管理に非常に有効です。
KeepAliveによるライフサイクル管理
KeepAlive機能は、Lease機能と組み合わせて使用される重要な機能です。
Leaseに対してKeepAliveを発行すると、LeaseのTTLを動的に延長し続けることが可能となり、継続的にリソースを利用することが出来るようになります。
KeepAliveの基本概念
自動的なTTL延長
KeepAliveを開始すると、Etcdクライアントが定期的にサーバにリクエストを送信し、LeaseのTTLを自動的に延長します。これにより、アクティブなリソースが意図せず削除されることを防げます。
KeepAliveチャンネル
KeepAliveは非同期で動作し、延長の成功・失敗を通知するチャンネルを返します。このチャンネルを監視することで、Leaseの状態を把握できます。
自動停止
クライアントが停止したり、ネットワークが切断されたりすると、KeepAliveも自動的に停止し、Leaseは期限切れになります。
基本的な使用例
// 1. Leaseを作成(TTL: 30秒)
leaseResp, err := etcd.Grant(ctx, 30)
if err != nil {
return err
}
leaseID := leaseResp.ID
// 2. データを保存
_, err = etcd.Put(ctx, "active/service", "service_data", clientv3.WithLease(leaseID))
if err != nil {
return err
}
// 3. KeepAliveを開始
// このcontextをcancelすることによって、KeepAliveがキャンセルされてTTLの延長が停止する
keepAliveCtx, cancel := context.WithCancel(context.Background())
keepAliveCh, err := etcd.KeepAlive(keepAliveCtx, leaseID)
if err != nil {
return err
}
// 4. KeepAliveレスポンスを監視
go func() {
for ka := range keepAliveCh {
if ka == nil {
// KeepAliveが停止した
fmt.Println("KeepAlive stopped")
return
}
// TTLが延長された
fmt.Printf("TTL renewed: %d seconds\n", ka.TTL)
}
}()
KeepAliveOnce(一回限りの延長)
// 一回だけTTLを延長したい場合
keepAliveResp, err := etcd.KeepAliveOnce(ctx, leaseID)
if err != nil {
return err
}
fmt.Printf("TTL extended to: %d seconds\n", keepAliveResp.TTL)
KeepAliveの利点
1. 自動的なライフサイクル管理
アプリケーションが動作している間は自動的にTTLが延長され、停止すると自動的にリソースが解放されます。
2. 障害検知
ネットワーク障害やアプリケーション停止を自動的に検知し、適切にリソースをクリーンアップできます。
3. 簡単な実装
複雑な生存確認ロジックを実装する必要がなく、Etcdが自動的に管理してくれます。
実用的な活用例
// サービス登録とヘルスチェックの例
func registerService(ctx context.Context, etcd *clientv3.Client, serviceName, serviceAddr string) error {
// 30秒のLeaseを作成
lease, err := etcd.Grant(ctx, 30)
if err != nil {
return err
}
// サービス情報を登録
key := fmt.Sprintf("/services/%s/%s", serviceName, serviceAddr)
_, err = etcd.Put(ctx, key, serviceAddr, clientv3.WithLease(lease.ID))
if err != nil {
return err
}
// KeepAliveでサービスの生存を維持
// サービスの生存を停止したい場合はctxをキャンセルすることでKeepAliveが停止される
keepAliveCh, err := etcd.KeepAlive(ctx, lease.ID)
if err != nil {
return err
}
// バックグラウンドでKeepAliveを監視
go func() {
for {
select {
case <-ctx.Done():
// コンテキストがキャンセルされた場合
fmt.Printf("Service %s registration cancelled\n", serviceName)
return
case ka, ok := <-keepAliveCh:
if !ok {
// KeepAliveチャンネルが閉じられた場合
fmt.Printf("Service %s unregistered\n", serviceName)
return
}
if ka == nil {
// KeepAliveが失敗した場合
fmt.Printf("Service %s keepalive failed\n", serviceName)
return
}
// TTLが正常に延長された場合は何もしない
}
}
}()
return nil
}
// 使用例
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
etcd, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
})
if err != nil {
log.Fatal(err)
}
defer etcd.Close()
// サービスを登録
err = registerService(ctx, etcd, "my-service", "192.168.1.100:8080")
if err != nil {
log.Fatal(err)
}
// 10秒後にサービス登録を停止
time.Sleep(10 * time.Second)
cancel() // これによりKeepAliveが停止し、Leaseが期限切れになる
}
この機能により、サービスディスカバリ、セッション管理、分散ロックなど、継続的な生存確認が必要なシステムを簡単に実装できます。
Watch機能による変更検知
Etcdの強力な機能の一つがWatch機能です。これにより、特定のキーやキープレフィックスの変更をリアルタイムで検知できます。
Watchの基本概念
リアルタイム変更検知
Watchは指定したキーまたはキープレフィックスに対する変更(作成、更新、削除)をリアルタイムで監視します。変更が発生すると即座にイベントが通知されます。
イベントタイプ
Watchで検知できるイベントは以下の通りです:
-
PUT: キーの作成または更新 -
DELETE: キーの削除
履歴の取得
特定のリビジョンから監視を開始することで、過去の変更履歴も取得できます。
プレフィックス監視
キープレフィックスを指定することで、複数のキーを一括で監視できます。
基本的な使用例
// 単一キーの監視
func watchSingleKey(ctx context.Context, etcd *clientv3.Client, key string) {
watchCh := etcd.Watch(ctx, key)
for watchResp := range watchCh {
if watchResp.Canceled {
fmt.Println("Watch was cancelled")
return
}
for _, event := range watchResp.Events {
switch event.Type {
case clientv3.EventTypePut:
fmt.Printf("Key %s was updated: %s\n", event.Kv.Key, event.Kv.Value)
case clientv3.EventTypeDelete:
fmt.Printf("Key %s was deleted\n", event.Kv.Key)
}
}
}
}
Watchの利点
1. リアルタイム性
変更が発生すると即座に通知されるため、遅延の少ないシステムを構築できます。
2. 効率性
ポーリングと比較して、ネットワーク負荷とCPU使用量を大幅に削減できます。
3. 履歴の取得
特定のリビジョンから監視を開始することで、見逃した変更も取得できます。
4. 柔軟な監視範囲
単一キーからプレフィックス監視まで、用途に応じて監視範囲を調整できます。
この機能により、設定の動的更新、サービスディスカバリ、分散システムでの状態同期など、様々なリアルタイムシステムを効率的に実装できます。
条件付きトランザクション
Etcdでは、If/Then/Elseを使った条件付きトランザクションを実行できます。これにより、楽観的排他制御を実装できます。
条件付きトランザクションの基本概念
If/Then/Else構造
Etcdのトランザクションは以下の構造を持ちます:
- If: 条件を指定(複数の条件をAND結合可能)
- Then: 条件が真の場合に実行する操作
- Else: 条件が偽の場合に実行する操作
比較条件の種類
以下のような条件を指定できます:
-
CreateRevision: キーの作成リビジョン -
ModRevision: キーの更新リビジョン -
Version: キーのバージョン -
Value: キーの値 -
Lease: キーに関連付けられたLease
アトミック性
トランザクション内のすべての操作は原子的に実行されます。
基本的な使用例
// キーが存在しない場合のみ作成
func createIfNotExists(ctx context.Context, etcd *clientv3.Client, key, value string) error {
txn := etcd.Txn(ctx)
// 条件:キーが存在しない(CreateRevision == 0)
cmp := clientv3.Compare(clientv3.CreateRevision(key), "=", 0)
// Then:キーを作成
putOp := clientv3.OpPut(key, value)
// Else:何もしない
resp, err := txn.If(cmp).Then(putOp).Commit()
if err != nil {
return err
}
// トランザクションが成功したかどうかを確認
if !resp.Succeeded {
return fmt.Errorf("key already exists")
}
return nil
}
トランザクションを活用したCAS(Compare and Swap)の実装
このように、Etcdのトランザクションでは非常に柔軟な条件分岐処理をしながら、アトミックな操作を実現できます。
実際に、etcdの分散ロック(Mutex)の内部実装でも、このような柔軟なトランザクションが使用されています。etcdのMutex実装では、以下のようなトランザクションでCAS操作を実現しています:
// etcdのMutex実装の実際のコード(簡略化)
cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
// Then: 自分をロック待機者として登録し、現在の所有者を取得
put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
// Else: 既存のキーを取得し、現在の所有者も取得
get := v3.OpGet(m.myKey)
resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
CASのような条件付きのアトミックな操作は他のKVSでは実装が難しい処理です。
こういった処理を楽に実装できるのがetcdのトランザクションの強みです。
また、IfやThen、Elseには複数の条件や操作を指定できるため、より複雑な処理を実現できます。
// 複数データの一貫性を保った更新
func updateUserProfile(ctx context.Context, etcd *clientv3.Client, userID string, name, email string, version int64) error {
txn := etcd.Txn(ctx)
versionKey := fmt.Sprintf("/users/%s/version", userID)
nameKey := fmt.Sprintf("/users/%s/name", userID)
emailKey := fmt.Sprintf("/users/%s/email", userID)
statusKey := fmt.Sprintf("/users/%s/status", userID)
// 複数の条件をAND結合
cmp1 := clientv3.Compare(clientv3.Value(versionKey), "=", strconv.FormatInt(version, 10)) // バージョンが期待値と一致
cmp2 := clientv3.Compare(clientv3.CreateRevision(nameKey), ">", 0) // ユーザーが存在する
cmp3 := clientv3.Compare(clientv3.Value(statusKey), "!=", "deleted") // 削除されていない
// Then:すべてのデータを一貫性を保って更新
newVersion := version + 1
putOps := []clientv3.Op{
clientv3.OpPut(versionKey, strconv.FormatInt(newVersion, 10)),
clientv3.OpPut(nameKey, name),
clientv3.OpPut(emailKey, email),
clientv3.OpPut(fmt.Sprintf("/users/%s/updated_at", userID), fmt.Sprintf("%d", time.Now().Unix())),
}
// Else:エラー情報を記録
elseOps := []clientv3.Op{
clientv3.OpGet(versionKey), // 現在のバージョンを取得
clientv3.OpGet(statusKey), // 現在のステータスを取得
}
resp, err := txn.If(cmp1, cmp2, cmp3).Then(putOps...).Else(elseOps...).Commit()
if err != nil {
return err
}
// トランザクションが失敗した場合、Elseの実行結果を取得して処理可能
if !resp.Succeeded {
// Elseの結果から詳細なエラー情報を取得
var currentVersion, currentStatus string
for _, response := range resp.Responses {
if rangeResp := response.GetResponseRange(); rangeResp != nil && len(rangeResp.Kvs) > 0 {
key := string(rangeResp.Kvs[0].Key)
value := string(rangeResp.Kvs[0].Value)
if strings.Contains(key, "version") {
currentVersion = value
} else if strings.Contains(key, "status") {
currentStatus = value
}
}
}
return fmt.Errorf("update failed: current_version=%s, current_status=%s", currentVersion, currentStatus)
}
return nil
}
条件(If)のバリエーション
Ifには以下のような条件を指定できます。
// 1. キーの存在チェック
cmp := clientv3.Compare(clientv3.CreateRevision(key), "=", 0) // キーが存在しない
cmp := clientv3.Compare(clientv3.CreateRevision(key), ">", 0) // キーが存在する
// 2. 値による条件
cmp := clientv3.Compare(clientv3.Value(key), "=", "expected") // 値が一致
cmp := clientv3.Compare(clientv3.Value(key), "!=", "old") // 値が不一致
cmp := clientv3.Compare(clientv3.Value(key), ">", "100") // 値が大きい
// 3. リビジョンによる条件
cmp := clientv3.Compare(clientv3.ModRevision(key), "=", 123) // 特定のリビジョン
cmp := clientv3.Compare(clientv3.ModRevision(key), ">", 100) // リビジョンが新しい
// 4. バージョンによる条件
cmp := clientv3.Compare(clientv3.Version(key), "=", 5) // 特定のバージョン
cmp := clientv3.Compare(clientv3.Version(key), "<", 10) // バージョンが小さい
// 5. Leaseによる条件
cmp := clientv3.Compare(clientv3.LeaseValue(key), "=", leaseID) // 特定のLeaseに関連付け
cmp := clientv3.Compare(clientv3.LeaseValue(key), "=", 0) // Leaseに関連付けられていない
Mutexによる分散ロック
前述のトランザクションは楽観ロックの実装でしたが、EtcdではMutexを使った排他制御も実装できます。
分散システムでは、複数のプロセスが同じリソースに同時にアクセスすることを防ぐ必要があり、EtcdのMutex機能を使用することで、安全な分散ロックを実装できます。
これにより、RDBMSの悲観ロックを実装することも可能です。
Mutexの基本概念
分散ロック
Mutexは分散環境において、複数のプロセスやサーバが同じリソースに同時にアクセスすることを防ぐ仕組みです。楽観ロックとは異なり、処理開始前にロックを取得し、他のプロセスの同時実行を完全に防ぎます。
セッション
Mutexはセッション(Session)と組み合わせて使用されます。セッションはクライアントの生存状態を管理し、クライアントが異常終了した場合に自動的にロックを解放します。
デッドロック回避
適切なタイムアウト設定により、デッドロックを回避できます。
基本的な使用例
import (
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/concurrency"
)
// 基本的な分散ロックの例
func basicMutexExample(ctx context.Context, etcd *clientv3.Client, lockKey string) error {
// セッションを作成
session, err := concurrency.NewSession(etcd)
if err != nil {
return err
}
defer session.Close()
// Mutexを作成
mutex := concurrency.NewMutex(session, lockKey)
// ロックを取得
if err := mutex.Lock(ctx); err != nil {
return err
}
defer mutex.Unlock(ctx)
// クリティカルセクション(排他制御が必要な処理)
fmt.Println("Critical section: processing...")
time.Sleep(2 * time.Second)
fmt.Println("Critical section: completed")
return nil
}
// タイムアウト付きロックの例
func mutexWithTimeout(ctx context.Context, etcd *clientv3.Client, lockKey string) error {
// 5秒でタイムアウトするコンテキスト
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
session, err := concurrency.NewSession(etcd)
if err != nil {
return err
}
defer session.Close()
mutex := concurrency.NewMutex(session, lockKey)
// タイムアウト付きでロックを取得
if err := mutex.Lock(ctx); err != nil {
if ctx.Err() == context.DeadlineExceeded {
return fmt.Errorf("failed to acquire lock: timeout")
}
return err
}
defer mutex.Unlock(context.Background()) // Unlockは別のコンテキストで実行
// 処理を実行
fmt.Println("Lock acquired, processing...")
return nil
}
サンプルコードではロックを取得した後に特に何もしていませんが、ここでetcdに対し読み書きや前述したトランザクションを実行することで、悲観ロックと同等の排他制御を実現できます。
実践的な設計パターン
次に、実践的な設計パターンを紹介します。
1. セッション管理パターン(Lease + KeepAlive)
Etcdを使ったセッション管理は、従来のRedisなどによるセッション管理と比較して以下のような処理を楽に実現することが出来ます。
- Lease:
- 同一のライフサイクルを持つキーを一括で管理できる
- セッションの期限切れ時に一時データを同時に削除する
- KeepAlive:
- WebSocketのようなリアルタイム接続において、接続中のセッションを維持する
type Session struct {
ID string
UserID string
Data map[string]interface{}
LeaseID clientv3.LeaseID
KeepAliveCh <-chan *clientv3.LeaseKeepAliveResponse
KeepAliveCancel context.CancelFunc
}
func NewSessionManager(client *clientv3.Client, ttl int64) *SessionManager {
return &SessionManager{
client: client,
ttl: ttl,
}
}
func (sm *SessionManager) CreateSession(ctx context.Context, userID string, data map[string]interface{}) (*Session, error) {
// 1. Leaseを作成
lease, err := sm.client.Grant(ctx, sm.ttl)
if err != nil {
return nil, fmt.Errorf("failed to create lease: %w", err)
}
sessionID := uuid.New().String()
// 2. セッションデータを準備
sessionData, err := json.Marshal(data)
if err != nil {
return nil, fmt.Errorf("failed to marshal session data: %w", err)
}
metaData := map[string]interface{}{
"created_at": time.Now().Unix(),
"user_id": userID,
"lease_id": lease.ID,
}
metaJSON, err := json.Marshal(metaData)
if err != nil {
return nil, fmt.Errorf("failed to marshal meta data: %w", err)
}
// 3. トランザクションを使って一貫性を保ちながらセッションを作成
sessionKey := fmt.Sprintf("/sessions/%s", sessionID)
metaKey := fmt.Sprintf("/sessions/%s/meta", sessionID)
txn := sm.client.Txn(ctx)
// 条件:既存のセッションが存在しない、または期限切れ
cmp := clientv3.Compare(clientv3.CreateRevision(sessionKey), "=", 0)
// Then:新しいセッションを作成
thenOps := []clientv3.Op{
clientv3.OpPut(sessionKey, string(sessionData), clientv3.WithLease(lease.ID)),
clientv3.OpPut(metaKey, string(metaJSON), clientv3.WithLease(lease.ID)),
}
resp, err := txn.If(cmp).Then(thenOps...).Commit()
if err != nil {
// Leaseを削除してクリーンアップ
if err := sm.client.Revoke(ctx, lease.ID); err != nil {
return nil, fmt.Errorf("failed to revoke lease: %w", err)
}
return nil, fmt.Errorf("failed to create session transaction: %w", err)
}
if !resp.Succeeded {
// 既存のセッションが存在する場合
if err := sm.client.Revoke(ctx, lease.ID); err != nil {
return nil, fmt.Errorf("failed to revoke lease: %w", err)
}
return nil, fmt.Errorf("session already exists for user %s", userID)
}
// 4. (必要に応じて)KeepAliveを開始
keepAliveCtx, keepAliveCancel := context.WithCancel(context.Background())
keepAliveCh, err := sm.client.KeepAlive(keepAliveCtx, lease.ID)
if err != nil {
// KeepAlive開始に失敗した場合、作成したセッションを削除
if err := sm.client.Revoke(ctx, lease.ID); err != nil {
return nil, fmt.Errorf("failed to revoke lease: %w", err)
}
return nil, fmt.Errorf("failed to start keepalive: %w", err)
}
return &Session{
ID: sessionID,
UserID: userID,
Data: data,
LeaseID: lease.ID,
KeepAliveCh: keepAliveCh,
KeepAliveCancel: keepAliveCancel,
}, nil
}
2. 分散ロックパターン
前述したとおり、Mutexを使うことにより悲観ロックと同等の排他制御を実現できます。
これによって他のKVSでは難しい一貫性のある実装をすることが比較的容易で、データの整合性を保つことができます。
サンプルコードは前述しているため省略します。
3. イベント通知パターン(Watch)
Watchを使うことでキーの作成・更新・削除を検知することが出来ます。
これを利用して、TTLが経過したことを検知して処理を行ったり、キーやデータの変更をリアルタイムで検知して適切な処理を実行するなどが可能です。
例えば、セッションの期限切れやデータの変更をリアルタイムで検知し、適切な処理を実行するなどです。
以下は、セッションの期限切れを検知して通知する実装例です。
import (
"context"
"encoding/json"
"fmt"
"log"
"strings"
"time"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/api/v3/mvccpb"
)
type SessionEventNotifier struct {
client *clientv3.Client
callbacks map[string]func(SessionEvent)
}
type SessionEvent struct {
Type string // "expired", "created", "updated"
SessionID string
UserID string
Timestamp time.Time
Data map[string]interface{}
}
type SessionEventCallback func(SessionEvent)
func NewSessionEventNotifier(client *clientv3.Client) *SessionEventNotifier {
return &SessionEventNotifier{
client: client,
callbacks: make(map[string]func(SessionEvent)),
}
}
// セッション期限切れイベントのコールバックを登録
func (sen *SessionEventNotifier) OnSessionExpired(callback SessionEventCallback) {
sen.callbacks["expired"] = callback
}
// セッション作成イベントのコールバックを登録
func (sen *SessionEventNotifier) OnSessionCreated(callback SessionEventCallback) {
sen.callbacks["created"] = callback
}
// セッションの変更を監視
func (sen *SessionEventNotifier) WatchSessions(ctx context.Context) error {
// セッションキーのプレフィックスを監視
watchCh := sen.client.Watch(ctx, "/sessions/", clientv3.WithPrefix())
for {
select {
case <-ctx.Done():
return ctx.Err()
case watchResp, ok := <-watchCh:
if !ok {
return fmt.Errorf("watch channel closed")
}
if watchResp.Canceled {
return fmt.Errorf("watch was cancelled: %v", watchResp.Err())
}
for _, event := range watchResp.Events {
if err := sen.handleSessionEvent(event); err != nil {
log.Printf("Error handling session event: %v", err)
}
}
}
}
}
func (sen *SessionEventNotifier) handleSessionEvent(event *clientv3.Event) error {
key := string(event.Kv.Key)
// メタデータキーは無視
if strings.Contains(key, "/meta") {
return nil
}
// セッションIDを抽出
parts := strings.Split(key, "/")
if len(parts) < 3 {
return nil
}
sessionID := parts[2]
switch event.Type {
case clientv3.EventTypePut:
return sen.handleSessionCreatedOrUpdated(sessionID, event.Kv.Value)
case clientv3.EventTypeDelete:
return sen.handleSessionExpired(sessionID, event.PrevKv)
}
return nil
}
func (sen *SessionEventNotifier) handleSessionCreatedOrUpdated(sessionID string, value []byte) error {
var sessionData map[string]interface{}
if err := json.Unmarshal(value, &sessionData); err != nil {
return fmt.Errorf("failed to unmarshal session data: %w", err)
}
// ユーザーIDを取得(セッションデータから)
userID, _ := sessionData["user_id"].(string)
event := SessionEvent{
Type: "created", // 実際には作成と更新を区別する場合は追加のロジックが必要
SessionID: sessionID,
UserID: userID,
Timestamp: time.Now(),
Data: sessionData,
}
if callback, exists := sen.callbacks["created"]; exists {
callback(event)
}
return nil
}
func (sen *SessionEventNotifier) handleSessionExpired(sessionID string, prevKv *mvccpb.KeyValue) error {
var sessionData map[string]interface{}
var userID string
// 削除前のデータがある場合は解析
if prevKv != nil {
if err := json.Unmarshal(prevKv.Value, &sessionData); err == nil {
userID, _ = sessionData["user_id"].(string)
}
}
// セッションIDからユーザーIDを取得できない場合は、メタデータから取得を試行
if userID == "" {
metaKey := fmt.Sprintf("/sessions/%s/meta", sessionID)
resp, err := sen.client.Get(context.Background(), metaKey)
if err == nil && len(resp.Kvs) > 0 {
var metaData map[string]interface{}
if err := json.Unmarshal(resp.Kvs[0].Value, &metaData); err == nil {
userID, _ = metaData["user_id"].(string)
}
}
}
event := SessionEvent{
Type: "expired",
SessionID: sessionID,
UserID: userID,
Timestamp: time.Now(),
Data: sessionData,
}
if callback, exists := sen.callbacks["expired"]; exists {
callback(event)
}
return nil
}
// 使用例
func ExampleSessionNotification() {
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
notifier := NewSessionEventNotifier(client)
// セッション期限切れ時の処理を登録
// これを利用することで、セッションの期限切れをクライアントに通知してログアウト画面を表示するなどの処理が可能
notifier.OnSessionExpired(func(event SessionEvent) {
log.Printf("Session expired: SessionID=%s, UserID=%s, Time=%v",
event.SessionID, event.UserID, event.Timestamp)
// 例:WebSocketでクライアントに通知
// websocketManager.NotifySessionExpired(event.UserID, event.SessionID)
// 例:メール通知
// emailService.SendSessionExpiredNotification(event.UserID)
})
// セッション作成時の処理を登録
notifier.OnSessionCreated(func(event SessionEvent) {
log.Printf("Session created: SessionID=%s, UserID=%s",
event.SessionID, event.UserID)
// 例:アクティブユーザー数の更新
// metricsService.IncrementActiveUsers()
})
// 監視を開始
ctx := context.Background()
if err := notifier.WatchSessions(ctx); err != nil {
log.Printf("Watch sessions error: %v", err)
}
}
運用面での注意点
現在、PortalKeyではプロダクトが本番運用していないため注意点についてはあまり確かなことが言えませんが、従来のKVSと比較して以下の点に注意が必要です:
クラスタ構成
- 奇数台(3台、5台)でのクラスタ構成を推奨
- ネットワーク分断に対する耐性を考慮
パフォーマンス
- 書き込み性能は単一リーダーによる制限あり
- 読み込みは各ノードで分散可能
詳細な運用ガイドについては、公式ドキュメントを参照してください:
まとめ
Etcdは単なるKVSではなく、分散システム向けの強力な機能を持ったデータストアです。特に以下のような用途に適しています:
- セッション管理: Lease + KeepAliveによる自動的なライフサイクル管理
- 分散ロック: Mutexによる安全な排他制御
- リアルタイム通知: Watchによる変更検知
- 条件付き処理: トランザクションによる楽観的排他制御
ただ、RedisなどのKVSに比べて機能や運用面では複雑になるため、適切な用途に応じて使い分けることが重要です。
PortalKeyではクライアントに対しリアルタイムなデータ更新を重要視していますが、etcdを利用することによってこれらの機能を比較的楽に実装出来ています。
RedisなどのKVSでは実現の難しい機能の実装が必要になった場合は、etcdを検討してみてください。
Discussion