🔑

トランザクション分離性と書き込みスキューを具体例で理解する

2023/08/06に公開

背景

DDIAの"第7章 トランザクション"を読み、トランザクションの並行制御に関する勉強をしています。
その過程で、トランザクションの分離レベルとそれに対応して生じる例外について自分で手を動かすことで理解が深まったので、その際に用いたコードと自分が学んだことをシェアしたいと思います。
今回はタイトルにある通り書き込みスキューについて勉強します。

前提

  • ソースコードはJava17で記述しています
  • RedisとPostgreSQLを例として用います
    • Redisは7.0, PostgreSQLは14.8
    • 理由は自分が現在業務で使用しており、より理解を深めたかったからです
  • RedisクライアントにはLettuceを利用しています

書き込みスキューとは

はじめに書き込みスキューについて簡単に理解しておきましょう。
DDIAでは以下のように書かれています。

トランザクションが何かを読み取り、その値に基づいて判断を下し、その結果をデータベースに書き込みます。この状況で、書き込みが行われた時点で判断の根拠となったプレミスが真ではなくなっている場合を指します。

言い換えると、値を読み込み・その値に基づいて判断し・書き込む処理があった際に、他のクライアントによって値が上書きされた結果、判断の部分の条件が真ではなくなってしまう場合に発生する例外です。
この説明を具体的にイメージするために、以下の例について考えてみましょう。

具体的なシナリオ: 銀行口座預金管理アプリケーション

銀行の口座預金管理アプリケーションを想定してみましょう。

  • このアプリケーションはマルチスレッドで実行されます。
  • ある口座には預金が現在50あるとします。
  • スレッド1とスレッド2が同時に口座から50の預金を引き出そうとしています。
  • 口座預金はマイナスになることができないので、片方のスレッドの更新のみ成功し、もう片方のスレッドの更新は失敗するのが期待される挙動です。
  • 従って、最終的な口座預金残高は0になるのが期待値です。

このようなシナリオをシュミレートするコードを作成してみましょう。
以下では意図的に書き込みスキューを発生させるコードを記述しています。

書き込みスキューが生じるコード(PostgreSQL)

まずはPostgreSQLの例を見てみましょう。
まず必要となるaccountテーブルを作成します。

init_account_table.sql
CREATE TABLE accounts (
    id SERIAL PRIMARY KEY,
    balance INT
);
INSERT INTO accounts balance VALUES 50;

では以下のJavaコードで複数スレッドから口座残高を更新してみましょう。[1]

PostgresTransactionRaceConditionExample.java
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

public class PostgresTransactionRaceConditionFailureExample {

    private static final String url = System.getenv("DB_URL");
    private static final String user = System.getenv("DB_USER");
    private static final String password = System.getenv("DB_PASSWORD");

    public static void main(String[] args) {

        // Setup database to initial state
        try (Connection connection = DriverManager.getConnection(url, user, password);
             Statement st = connection.createStatement()) {
            st.executeUpdate("UPDATE accounts SET balance = 50 where id = 1");

            // Simulate the situation in which multiple thread access to the same record
            // and try to update the record
            Thread thread1 = new Thread(PostgresTransactionRaceConditionFailureExample::transaction);
            Thread thread2 = new Thread(PostgresTransactionRaceConditionFailureExample::transaction);
            thread1.start();
            thread2.start();

            try {
                thread1.join();
                thread2.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            try (ResultSet rs = st.executeQuery("SELECT balance FROM accounts WHERE id = 1")) {
                if (rs.next()) {
                    System.out.println("Final account balance: " + rs.getInt(1));
                }
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    private static void transaction() {
        try (Connection con = DriverManager.getConnection(url, user, password)) {
            // Use Read Committed isolation level, which is the default setting for PostgreSQL
            con.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
            con.setAutoCommit(false);
            try (Statement st = con.createStatement();
                 ResultSet rs = st.executeQuery("SELECT balance FROM accounts WHERE id = 1")) {
                if (rs.next()) {
                    int balance = rs.getInt(1);
                    // If the remained balance exceeds or is equal to 50, withdrawn 50
                    if (balance >= 50) {
                        st.executeUpdate("UPDATE accounts SET balance = balance - 50 WHERE id = 1");
                        con.commit();
                    } else {
                        con.rollback();
                    }
                }
            } catch (SQLException sqlException) {
                con.rollback();
                sqlException.printStackTrace();
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

このコードを実行してみると、以下のアウトプットとともにアプリケーションが終了します。

Final account balance: -50

これは期待値とは異なります。
各スレッドが値を読みっとった後にデータが更新されているのですが各スレッドがそれに気づかずに口座預金を更新してしまっています。

書き込みスキューが生じるコード(Redis)

次にRedisでの例を見てみましょう。
以下のコードでは書き込みスキューが生じます。
このコードはトランザクション[2]もLuaスクリプトも使わずに実装されています。
account_balanceというkeyに口座残高を格納しています。

RedisTransactionRaceConditionFailureExample.java
import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;

public class RedisTransactionRaceConditionFailureExample {

    private static final String redisUrl = System.getenv("REDIS_URL");
    private static final String balanceKey = "account_balance";

    public static void main(String[] args) {
        try (RedisClient redisClient = RedisClient.create(redisUrl);
             StatefulRedisConnection<String, String> setupConnection = redisClient.connect()) {
            RedisCommands<String, String> syncCommands = setupConnection.sync();
            // Set up the Redis to the initial state
            syncCommands.set(balanceKey, "50");

            Thread thread1 = new Thread(RedisTransactionRaceConditionFailureExample::transaction);
            Thread thread2 = new Thread(RedisTransactionRaceConditionFailureExample::transaction);
            thread1.start();
            thread2.start();

            try {
                thread1.join();
                thread2.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Final account balance: " + syncCommands.get(balanceKey));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void transaction() {
        try (RedisClient redisClient = RedisClient.create(redisUrl);
             StatefulRedisConnection<String, String> threadConnection = redisClient.connect()) {
            RedisCommands<String, String> threadCommands = threadConnection.sync();
            int balance = Integer.parseInt(threadCommands.get(balanceKey));
            if (balance >= 50) {
                threadCommands.decrby(balanceKey, 50);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

上記のコードを実行した際もPostgreSQLの時と同様に以下のように残高が-50になってしまいます。

Final account balance: -50

解決策

適切に同時実行制御されたコード(PostgreSQL)

その1: ロックを取得する

PostgreSQLを用いる例では、更新対象とするレコードを取得しているSELECT文で行レベルのLockを取得することでこの問題を解決できます。
すなわち、transactionメソッド内のSELECT文を以下のように書き換えます。

ResultSet rs = st.executeQuery("SELECT balance FROM accounts WHERE id = 1")

ロックを取得するSQL.java
ResultSet rs = st.executeQuery("SELECT balance FROM accounts WHERE id = 1 FOR UPDATE")

に書き換えることで、他のトランザクションが同一の行を更新することを防ぐことができ、期待通りの挙動となります。

Final account balance: 0

FOR UPDATEにより行レベルのロックを取得する方法はDDIAでも紹介されています。

その2: トランザクションの分離レベルを変更する

もう一つのオプションとしては、トランザクションの分離レベルをデフォルトのREAD_COMMITTEDからSERIALIZABLEに変更することです。
つまり以下のようにコードを変更します。

con.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);

を以下のように書き換えます。

トランザクションの分離レベルを変更.java
con.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);

DDIAでも書き込みスキューを回避する方法として直列化可能分離レベルを利用する方法が紹介されています。
この変更を加えてコードを実行すると、以下のアウトプットとともに、期待通りの挙動をしていることが確認できます。

ERROR: could not serialize access due to concurrent update.
...(略)
Final account balance: 0

トランザクションの分離レベルをSERIALIZABLEにしたことで片方のスレッドからのデータの更新処理が失敗しています。
(当然ながら本来であればこのErrorハンドルを考慮したアプリケーションの実装が必要となります。)

なお、PostgreSQLを利用する場合は、トランザクション分離レベルをREPEATABLE READに設定しても本記事で取り上げている例では同じ挙動になります。
なぜならPostgreSQLのREPEATABLE READでは、トランザクションが開始された後に別のトランザクションによって更新されたデータは変更またはロックすることができないためです。
詳しくはこちらを参照ください。

適切に同時実行制御されたコード(Redis)

その1: WATCHコマンドを使用する

Redisはシングルスレッドで動作し、一連の操作をアトミックに実行することができます。
Redisはトランザクションを提供しており、MULTI/EXECコマンドで一連の操作を実装できます。[3]
そしてRedisはWATCHコマンドによるcheck-and-setを用いた楽観的なロックの機能を提供しており、これを利用することで本記事のユースケースを適切に実装することができます。
以下の記述は公式ドキュメントからの引用です。

WATCH is used to provide a check-and-set (CAS) behavior to Redis transactions.
WATCHed keys are monitored in order to detect changes against them. If at least one watched key is modified before the EXEC command, the whole transaction aborts, and EXEC returns a Null reply to notify that the transaction failed.

(なお楽観的な並行制御に関してはDDIAでも触れられています。)

具体的には、もとのソースコードのtransactionメソッドを以下のように書き換えることで正しい挙動が実装できます。

UseWatch.java
private static void transaction() {
    try (RedisClient redisClient = RedisClient.create(redisUrl);
         StatefulRedisConnection<String, String> threadConnection = redisClient.connect()) {
        RedisCommands<String, String> threadCommands = threadConnection.sync();
        threadCommands.watch(balanceKey);
        int balance = Integer.parseInt(threadCommands.get(balanceKey));
        threadCommands.multi();
        if (balance >= 50) {
            threadCommands.decrby(balanceKey, 50);
        }
        TransactionResult result = threadCommands.exec();
        if (result.wasDiscarded()) {
            System.out.println("The transaction was aborted because WATCH detected the update of the target value");
        }
    }
}

アウトプットは以下のようになり、期待する挙動となります。

The transaction was aborted because WATCH detected the update of the target value.
Final account balance: 0

その2: Luaスクリプトで実装

ReisではLuaスクリプトを用いることでも複数の操作をAtomicに実装することが可能です。
Redis公式ドキュメントにも以下の記述があります。

Redis guarantees the script's atomic execution. While executing the script, all server activities are blocked during its entire runtime. These semantics mean that all of the script's effects either have yet to happen or had already happened.

本記事のシナリオについては、transactionメソッドを以下のように書き換えることで実装できます。

UseLuaScript.java
private static void transaction() {
    try (RedisClient redisClient = RedisClient.create(redisUrl);
         StatefulRedisConnection<String, String> threadConnection = redisClient.connect()) {
        RedisCommands<String, String> threadCommands = threadConnection.sync();
        String luaScript =
                "local balance = tonumber(redis.call('GET', KEYS[1])) " +
                        "if balance >= 50 then " +
                        "redis.call('DECRBY', KEYS[1], 50) " +
                        "end";
        threadCommands.eval(luaScript, ScriptOutputType.INTEGER, balanceKey);
    }
}

なお、この実装ではあるスレッドのLuaスクリプトが実行されている間は他のスレッドの処理を中断してしまうため、パフォーマンスの観点で注意が必要です。

まとめ

このように、"データを読み込んで条件を判定し、更新して書き戻す"というよくあるユースケースでも正しくトランザクションの分離性を理解していないとバグを埋め込んでしまうのだということを学びました。
なお、今回の例では既存のレコードに対する更新を扱いましたが、新規レコードの作成によりスキューが発生する場合はまた違った対応が必要になるはずです。(詳しくはDDIAを読んでみてください)
書き込みスキューを完全に防ぐためには直列化可能分離レベルを利用するほかないと思います。

これらの並行性のバグはそもそも気づくこと、そしてテストが難しいために注意が必要だなと感じました。
実際にDDIAでも以下のように述べられています。

並行性のバグはタイミングが悪い場合にのみ生じるものなので、テストで見つけることが困難です。そういったタイミングの問題は滅多に起こらない場合もあり、再現させるのも難しいものです。また、並行性は把握することが非常に難しいものであり、特にデータベースにどういったコードがアクセスしているかを把握しきれないような大規模なアプリケーションの場合は難解です。アプリケーションの開発は、一度に1人のユーザーしかないようなものでも十分に難しいものであり、並行に大量のユーザーがアクセスするのであれば、どのデータがいつ予想外に変更されるかもしれないことからさらに難しさは増すことになります。

よい機会なのでトランザクションの分離性と生じうるAnomalyについて他のパターンでもまた機会があれば手を動かして試してみたいと思います。
また、誤りに気付いた方はご指摘いただけますと幸いです🙇

脚注
  1. なお本記事の例ではコネクションプールは利用せずにDB接続を実装しています。 ↩︎

  2. ここで述べいてるトランザクションはRedisの文脈でのトランザクションです。 ↩︎

  3. MULTIとEXECコマンドの間でGETコマンドを使って値を読み込むことができるものの、その結果はEXECが実行された後にクライアントに返されます。したがって、MULTIとEXECの間でGETした結果に基づいて処理を行うことはできません。 ↩︎

Discussion