MongoDBのTTL Indexesでスレッド間の処理重複を防ぐ
経緯
Goでの開発で複数スレッドを用意しつまれたタスクを高速+大量に実行するということをしています。
スレッド同士のタスクの取り合いでちょくちょく同一のタスクを取り合うことがありました。
pub/subのようなものを使えばこのようなことはないとは思うのですが予算の関係で使えません。
そこで、処理するタスクを一時的に保存し、ある程度時間が経つと再度そのタスクを実行してもよいというようにしたいと考えました。
そこで、現在利用しているmongoDBのTTL Indexesを利用してそれを実現するまでの流れです。
MongoDBのTTL Indexes
TTL (Time To Live) Indexesは、MongoDBで特定の時間が経過したドキュメントを自動的に削除する機能です。
主な特徴
データ型の制約:
Date型のフィールドのみに適用可能
削除間隔:
1分(60秒)ごとにチェックされ、最大で59秒の誤差が生じる可能性がある
制限事項:
Cappedコレクションでは使用不可
複合インデックスには使用できない
既存のインデックスがあるフィールドには適用不可
注意点として、削除はremoveクエリで実行されるため即時削除が保証されるわけではないのと、バックグラウンドオプションを使用すると、インデックス作成中に対象を削除できるようです。
詳しくはこちらから
実装
こちらがGoでの実装コードになります。
180となっている部分が秒数で、この場合は3分経つとデータが消えるという設定としています。
indexModel := mongo.IndexModel{
Keys: bson.M{"created_at": 1},
Options: options.Index().SetExpireAfterSeconds(180),
}
_, err = [対象のコレクション].Indexes().CreateOne(context.Background(), indexModel)
if err != nil {
return nil, fmt.Errorf("failed to create TTL index: %w", err)
}
このコードをmongoDBのコネクションをしたあと、コレクションを取得するときに実行しています。
処理が正常に機能するとこのようにTTLのindexが追加されているのを確認できます。
ただ、indexを追加する処理は1度だけで十分なので、コードではなく管理画面などから手動で追加するのも良いかと思います。
処理が重複しているかどうかの確認や登録についてはこちら
_, err := [対象のコレクション].InsertOne(context.Background(), &Task{
Task: [TaskID],
CreatedAt: time.Now(),
})
if err != nil {
fmt.Println("MongoDBへのRecentlyPostログの書き込みに失敗しました")
return true
}
return false
とりあえずタスクを保存してみて保存できたら重複のないタスクを実行しているのでOK,保存できない場合は重複したのでNGというざっくりとしたコードで判定しています。
感想など
そもそもとしてpub/subのようなものが使えるのが一番いいのですが低予算な開発ではなるべくコストのかからない実装を考えることがどうしても必要になってきています。
その点でmongoDBはAtlasという無料のサービスを用意してくれているので非常に助かります。
また、TTLという点ではRedisのようなものをつかうのも良いかと思うのですが、MongoDBをせっかく使っているのでできることは全てMongoDBでやってしまおうという気持ちです。
まだまだ使っていない機能がたくさんあるので、何かあれば取り入れていこうかと思います。
Discussion