【Rails】ActiveRecordでXAトランザクションを無理やり使う
概要
シャーディングを使うマルチDB環境において、シャードを跨る更新処理の整合性を高めたい。
Railsで実現するなら、トランザクションのネストが常套手段と思います。
トランザクションをネストする場合、タイミング次第で一方のDBはコミット、一方のDBはロールバックといった不整合な状態になり得ます。
更に精度を高める手段として分散トランザクションがあります。
分散トランザクションは、複数の独立したデータベースやリソースマネージャに対して一貫したトランザクション処理を保証するための技術です。
MySQLには分散トランザクションとしてXAトランザクションという機能があり、Railsから使ってみました。
先に結論から記載すると、
- ActiveRecordはXAトランザクションに非対応です。
- 生クエリを発行すればXAトランザクションを利用できます。
- ActiveRecordの場合、ORMの旨味がなくなるので、おすすめしません。
環境
- Ruby 3.3.5
- Rails 7.2.1
- MySQL 8.0.39
XAトランザクションの使い方
MySQL 8.0では標準で利用できます。
リファレンスに以下の記載がありますが、今回の例ではデフォルトのREPEATABLE READ
のままにしています。
アプリケーションが読取り現象に敏感な場合は、SERIALIZABLE をお薦めします。
実際にXAトランザクションを利用するには、頭にXA
を付けた専用のクエリを発行します。
- xid の発行
- xidはXAトランザクションの識別子で、自前で生成する
- xidは
gtrid [, bqual [, formatID ]]
で構成される - gtridは各シャードのトランザクションで同じ識別子を使用する
- bqualは各シャードのトランザクションで別の識別子を使用する
- formatIDは詳細用途不明、省略しても動作した
- XA [BEGIN|START] xid
- トランザクションをACTIVE状態にする
- DB操作
- トランザクションが必要なDB操作
- XA END xid
- ACTIVEなトランザクションをIDLE状態にする
- XA PREPARE xid
- IDLEなトランザクションをPREPARED状態にする
- XA COMMIT xid
- PREPAREDなトランザクションをCOMMITして終了する
- XA ROLLBACK xid
- IDLE/PREPAREDなトランザクションをROLLBACKして終了する
処理フロー
上記の使い方を踏まえた処理フローは以下の通りです。
実装
シャードを跨る更新処理としてユーザーのフレンド管理を想定した実装を行いました。
ユーザーのフレンド情報を以下のテーブルで管理する想定です。
CREATE TABLE `user_friends` (
`user_id` int unsigned NOT NULL COMMENT 'ユーザーID',
`friend_user_id` int unsigned NOT NULL COMMENT 'フレンドユーザーID',
`created_at` datetime NOT NULL COMMENT '登録日時',
`updated_at` datetime NOT NULL COMMENT '更新日時',
PRIMARY KEY (`user_id`,`friend_user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT='ユーザーフレンド情報';
ユーザーAとユーザーBがフレンドになった場合、双方向のレコードを登録する想定です。
user_id friend_user_id created_at updated_at
1 2 2024-09-13 17:02:18 2024-09-13 17:02:18
2 1 2024-09-13 17:02:18 2024-09-13 17:02:18
以下はXAトランザクションの機能をConcernとして実装したものです。
xa_transactionメソッドに渡したコネクション配列に対して、XAトランザクション内でブロック引数を実行します。
xidを構成するgtridは複数のシャードで同一識別子、bqualは異なる識別子となるようSecureRandomで生成しています。
SecureRandomでは厳密な一意性は担保されませんが、redisなどで排他制御を入れることで厳密に一意性を担保できます。
# frozen_string_literal: true
# XAトランザクションを利用するためのConcern
module XaTransactionConcern
extend ActiveSupport::Concern
class_methods do
def xa_transaction(connections, &block)
gtrid = generate_gtrid
xids = connections.map { generate_xid(gtrid:, bqual: generate_bqual) }
begin
# トランザクションの開始
connections.each.with_index do |conn, i|
conn.execute("XA BEGIN #{xids[i]}")
end
# XAトランザクション内での処理
yield
# トランザクションの終了
connections.each.with_index do |conn, i|
conn.execute("XA END #{xids[i]}")
end
# コミット準備
connections.each.with_index do |conn, i|
conn.execute("XA PREPARE #{xids[i]}")
end
# コミット
connections.each.with_index do |conn, i|
conn.execute("XA COMMIT #{xids[i]}")
end
rescue => e
# エラーが発生した場合のロールバック処理
puts "Error occurred: #{e.message}"
connections.each.with_index do |conn, i|
begin
# 既にXA ENDが呼ばれてるかもしれないのでrescue
conn.execute("XA END #{xids[i]}") rescue nil
conn.execute("XA ROLLBACK #{xids[i]}")
rescue => rollback_e
puts "Error during rollback: #{rollback_e.message}"
end
end
raise e
end
end
# DBコネクションを取得する
# current_role,current_shard以外のコネクションをクラスメソッドで取得しにくいのでメソッド化
# @param [Symbol] role 接続するロール
# @param [Symbol] shard 接続するシャード
# @return [ActiveRecord::ConnectionAdapters::AbstractAdapter] DBコネクション
def xa_connection(role: current_role, shard: current_shard)
connection_handler.retrieve_connection(connection_specification_name, role:, shard:)
end
private
def generate_gtrid
SecureRandom.hex(32)
end
def generate_bqual
SecureRandom.hex(32)
end
def generate_xid(gtrid:, bqual:)
"'#{gtrid}', '#{bqual}'"
end
end
end
以下はシャーディングするユーザーDBに属するModelの基底クラスです。
RedisCache::UserShardId.fetch(user_id)
でユーザーIDに対応するシャードを取得できるものとします。
# frozen_string_literal: true
module UserDb
# user_shardに属するModelの基底クラス
class Base < ActiveRecord::Base
self.abstract_class = true
include XaTransactionConcern
# DB接続定義を全てのシャード識別子に対して定義
connects_to shards: {
user_shard_01: {writing: :user_shard_01, reading: :user_shard_01_replica},
user_shard_02: {writing: :user_shard_02, reading: :user_shard_02_replica}
}
class << self
# ユーザーIDに応じたシャードに対してブロックを実行する
# @param [Integer] user_id ユーザーID
# @param [Symbol] role 接続するロール
# @return [Object] ブロックの戻り値
def on_user_shard(user_id, role: :writing, &block)
shard_id = RedisCache::UserShardId.fetch(user_id)
raise "shard_id is blank. user_id: #{user_id}" if shard_id.blank?
connected_to(role:, shard: shard_id, &block)
end
# ユーザーID配列に応じたDBコネクション配列を取得する
# 重複するシャードIDは1つにまとめる
# @param [Arrat<Integer>] user_ids ユーザーID配列
# @param [Symbol] role 接続するロール
# @return [Array<ActiveRecord::ConnectionAdapters::AbstractAdapter>] DBコネクション配列
def user_shard_connections(user_ids, role: :writing)
shard_ids = user_ids.map { |user_id| RedisCache::UserShardId.fetch(user_id) }.sort.uniq
shard_ids.map { |shard_id| xa_connection(role:, shard: shard_id) }
end
end
end
end
上記を用いて、user_friendsにデータを登録するコードは以下の通り。
生クエリでINSERTしていますが、理由は後述します。
user_id = 1
friend_user_id = 2
UserDb::Base.xa_transaction(UserDb::Base.user_shard_connections([ user_id, friend_user_id ])) do
timestamp = Time.current.strftime('%Y-%m-%d %H:%M:%S')
UserDb::Base.on_user_shard(user_id) do
query = "insert into user_friends (user_id, friend_user_id, created_at, updated_at) values (#{user_id}, #{friend_user_id}, '#{timestamp}', '#{timestamp}')"
UserDb::Base.connection.update(query)
end
UserDb::Base.on_user_shard(friend_user_id) do
query = "insert into user_friends (user_id, friend_user_id, created_at, updated_at) values (#{friend_user_id}, #{user_id}, '#{timestamp}', '#{timestamp}')"
UserDb::Base.connection.update(query)
end
end
mysqlに送られたクエリは以下の通りです。(general_log)
2024-09-17T01:48:42.975714Z 914 Query XA BEGIN 'ef6473dc41d792d43b42ba8ebaae91c5c86d3dc18a11d69e27753c4000bdc57c', '50ecd7178a84f0d77622876791a19531e186bfd77c55845e4628a88020c6fbdc'
2024-09-17T01:48:42.976216Z 915 Query XA BEGIN 'ef6473dc41d792d43b42ba8ebaae91c5c86d3dc18a11d69e27753c4000bdc57c', '285327488bb1af8e0e4013414b22ee815f91cb4ad285491486a40e41af786ffe'
2024-09-17T01:48:42.976798Z 914 Query insert into user_friends (user_id, friend_user_id, created_at, updated_at) values (1, 2, '2024-09-17 10:48:42', '2024-09-17 10:48:42')
2024-09-17T01:48:42.977667Z 915 Query insert into user_friends (user_id, friend_user_id, created_at, updated_at) values (2, 1, '2024-09-17 10:48:42', '2024-09-17 10:48:42')
2024-09-17T01:48:42.978032Z 914 Query XA END 'ef6473dc41d792d43b42ba8ebaae91c5c86d3dc18a11d69e27753c4000bdc57c', '50ecd7178a84f0d77622876791a19531e186bfd77c55845e4628a88020c6fbdc'
2024-09-17T01:48:42.978234Z 915 Query XA END 'ef6473dc41d792d43b42ba8ebaae91c5c86d3dc18a11d69e27753c4000bdc57c', '285327488bb1af8e0e4013414b22ee815f91cb4ad285491486a40e41af786ffe'
2024-09-17T01:48:42.978449Z 914 Query XA PREPARE 'ef6473dc41d792d43b42ba8ebaae91c5c86d3dc18a11d69e27753c4000bdc57c', '50ecd7178a84f0d77622876791a19531e186bfd77c55845e4628a88020c6fbdc'
2024-09-17T01:48:42.979191Z 915 Query XA PREPARE 'ef6473dc41d792d43b42ba8ebaae91c5c86d3dc18a11d69e27753c4000bdc57c', '285327488bb1af8e0e4013414b22ee815f91cb4ad285491486a40e41af786ffe'
2024-09-17T01:48:42.979733Z 914 Query XA COMMIT 'ef6473dc41d792d43b42ba8ebaae91c5c86d3dc18a11d69e27753c4000bdc57c', '50ecd7178a84f0d77622876791a19531e186bfd77c55845e4628a88020c6fbdc'
2024-09-17T01:48:42.980379Z 915 Query XA COMMIT 'ef6473dc41d792d43b42ba8ebaae91c5c86d3dc18a11d69e27753c4000bdc57c', '285327488bb1af8e0e4013414b22ee815f91cb4ad285491486a40e41af786ffe'
それぞれのシャードにデータが登録されています。
mysql> use base_user_shard_01
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
mysql> select * from user_friends;
+---------+----------------+---------------------+---------------------+
| user_id | friend_user_id | created_at | updated_at |
+---------+----------------+---------------------+---------------------+
| 1 | 2 | 2024-09-17 10:48:42 | 2024-09-17 10:48:42 |
+---------+----------------+---------------------+---------------------+
1 row in set (0.00 sec)
mysql> use base_user_shard_02
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
mysql> select * from user_friends;
+---------+----------------+---------------------+---------------------+
| user_id | friend_user_id | created_at | updated_at |
+---------+----------------+---------------------+---------------------+
| 2 | 1 | 2024-09-17 10:48:42 | 2024-09-17 10:48:42 |
+---------+----------------+---------------------+---------------------+
1 row in set (0.01 sec)
ActiveRecordを使っていれば、通常、以下のようなcreateやsaveでデータを登録しますが、createやsaveはtransactionメソッドを使っていない場合でも、内部的にトランザクションを利用しています。
XAトランザクションとトランザクションは相互に排他的な関係で同時に利用できません。
ActiveRecordによる内部的なトランザクションの発行を制御できなかったため生クエリを使いました。
(ActiveRecordの振る舞いを上書きすれば可能と思いますが、、、)
UserDb::UserFriend.create!(user_id: user_id, friend_user_id: friend_user_id)
計測
トランザクションをネストした場合とXAトランザクションを利用した場合の処理時間が気になったので計測しました。
以下はベンチマークコードです。
(トランザクションのネストは可読性悪いですね。)
# frozen_string_literal: true
loop_cnt = 1000
# トランザクションをネストしてuser_friendsを登録する
def create_user_friends(user_id, friend_user_id)
UserDb::Base.on_user_shard(user_id) do
UserDb::Base.transaction do
UserDb::Base.on_user_shard(friend_user_id) do
UserDb::Base.transaction do
UserDb::Base.on_user_shard(user_id) do
UserDb::UserFriend.create!(user_id: user_id, friend_user_id: friend_user_id)
end
UserDb::Base.on_user_shard(friend_user_id) do
UserDb::UserFriend.create!(user_id: friend_user_id, friend_user_id: user_id)
end
end
end
end
end
end
# XAトランザクションでuser_friendsを登録する
def xa_create_user_friends(user_id, friend_user_id)
UserDb::Base.xa_transaction(UserDb::Base.user_shard_connections([ user_id, friend_user_id ])) do
timestamp = Time.current.strftime('%Y-%m-%d %H:%M:%S')
UserDb::Base.on_user_shard(user_id) do
query = "insert into user_friends (user_id, friend_user_id, created_at, updated_at) values (#{user_id}, #{friend_user_id}, '#{timestamp}', '#{timestamp}')"
UserDb::Base.connection.update(query)
end
UserDb::Base.on_user_shard(friend_user_id) do
query = "insert into user_friends (user_id, friend_user_id, created_at, updated_at) values (#{friend_user_id}, #{user_id}, '#{timestamp}', '#{timestamp}')"
UserDb::Base.connection.update(query)
end
end
end
def delete_user_friends
UserDb::Base.on_each_shard do
UserDb::UserFriend.delete_all
end
end
Benchmark.bm 20 do |r|
delete_user_friends
r.report 'nested transaction' do
loop_cnt.times do |i|
user_id = i * 2 + 1
friend_user_id = (i + 1) * 2
create_user_friends(user_id, friend_user_id)
end
end
delete_user_friends
r.report 'xa transaction' do
loop_cnt.times do |i|
user_id = i * 2 + 1
friend_user_id = (i + 1) * 2
xa_create_user_friends(user_id, friend_user_id)
end
end
end
以下は計測結果です。
トランザクションをネストした場合とそれほど変わりませんでした。
今回は全てのクエリが直列だったのですが、並列で動いたら変わってくるかもしれません。
user system total real
nested transaction 1.256326 0.221786 1.478112 ( 2.479285)
xa transaction 1.110660 0.237675 1.348335 ( 2.519679)
総括
XAトランザクションは知識としては持っていましたが、実際に使うのは初めてだったので、勉強になりました。
RailsでのXAトランザクションの利用事例は探しても情報が見当たらないですね・・・
ActiveRecordの恩恵を受けづらいこともあり、Railsではトランザクションのネストを使ったほうが良いかと思いました。
トランザクションをネストする場合、データ不整合が生じる可能性があるため、ロールバック時には何かしらログを残してリスク対策を講じるのが良いと思います。
Discussion