📘

Multi Storage Transaction サンプルを試す

2024/08/01に公開

今日は以下のチュートリアルをStep by Stepで見ていきたいと思います。
https://scalardb.scalar-labs.com/docs/latest/scalardb-samples/multi-storage-transaction-sample/

環境構築

まずは Amazon Linux 2023 をt2.xlargeのEBS 50GBで起動します。
起動できたらこちらのBlogの内容をもとに必要なツール群をインストールしていきます。
https://zenn.dev/kamethemis/articles/ae3d4495d0ad06

  • Java 実行環境
  • Git ツール群
  • Docker と Docker Compose
    をインストールしてください。
    全て上記のブログにコピペ用コマンドが載っています。

次にCassandraへアクセスするために以下のコマンドでツールをインストールします。

sudo yum install pip
pip install cqlsh

最後にMySQL用クライアントをインストールしますが、歴史的な背景からAWSではMySQLではなく互換のMariaDBを用いることを推奨していますのでそちらをインストールします。

sudo dnf -y install mariadb105

以上で環境構築は完了です。

ScalarDB Sample セットアップ

https://scalardb.scalar-labs.com/docs/latest/scalardb-samples/multi-storage-transaction-sample/
の内容に従い作業を進めていきます。以下のような環境が構築されます。

まずScalarDBの学習用に準備されているサンプルのレポジトリをクローンし、移動します。

git clone https://github.com/scalar-labs/scalardb-samples
cd scalardb-samples/multi-storage-transaction-sample

環境はコンテナ形式で準備されているのでコンテナを起動します。

sudo docker-compose up -d
sudo docker ps
CONTAINER ID   IMAGE            COMMAND                  CREATED          STATUS          PORTS                                                                          NAMES
21ce621184e7   mysql:8.0        "docker-entrypoint.s…"   38 seconds ago   Up 31 seconds   0.0.0.0:3306->3306/tcp, :::3306->3306/tcp, 33060/tcp                           mysql-1
9676eb1252e9   cassandra:3.11   "docker-entrypoint.s…"   38 seconds ago   Up 31 seconds   7000-7001/tcp, 7199/tcp, 9160/tcp, 0.0.0.0:9042->9042/tcp, :::9042->9042/tcp   cassandra-1

mysqlcassandraが起動していることがわかります。

一旦ここでcassandraの中身を見ておきます。

cqlsh
SELECT * FROM system_schema.keyspaces;

 keyspace_name      | durable_writes | replication
--------------------+----------------+-------------------------------------------------------------------------------------
        system_auth |           True | {'class': 'org.apache.cassandra.locator.SimpleStrategy', 'replication_factor': '1'}
      system_schema |           True |                             {'class': 'org.apache.cassandra.locator.LocalStrategy'}
 system_distributed |           True | {'class': 'org.apache.cassandra.locator.SimpleStrategy', 'replication_factor': '3'}
             system |           True |                             {'class': 'org.apache.cassandra.locator.LocalStrategy'}
      system_traces |           True | {'class': 'org.apache.cassandra.locator.SimpleStrategy', 'replication_factor': '2'}

(5 rows)

存在しているKeyspaceはシステム用のみでまだ何のスキーマやデータも投入されていません。
同様にMySQLもみておきます。

mariadb -u root -p -h 127.0.0.1

パスワードはmysqlです。

show databases;
+--------------------+
| Database           |
+--------------------+
| information_schema |
| mysql              |
| performance_schema |
| sys                |
+--------------------+
4 rows in set (0.002 sec)

同様にまだ何も含まれていません。

スキーマ投入

ではここにScalarDBの機能を用いてまずはスキーマを投入していきます。

wget https://github.com/scalar-labs/scalardb/releases/download/v3.13.0/scalardb-schema-loader-3.13.0.jar
java -jar scalardb-schema-loader-3.13.0.jar --config database.properties --schema-file schema.json --coordinator
[main] INFO com.scalar.db.schemaloader.command.SchemaLoaderCommand - Config path: database.properties
[main] INFO com.scalar.db.schemaloader.command.SchemaLoaderCommand - Schema path: schema.json
[main] INFO com.datastax.driver.core - DataStax Java driver 3.11.5 for Apache Cassandra
[main] INFO com.datastax.driver.core.GuavaCompatibility - Detected Guava >= 19 in the classpath, using modern compatibility layer
[main] INFO com.datastax.driver.core.ClockFactory - Using native clock to generate timestamps.
[main] INFO com.datastax.driver.core.NettyUtil - Found Netty's native epoll transport in the classpath, using it
[main] INFO com.datastax.driver.core.Cluster - Cannot connect with protocol version V5, trying with V4
[main] INFO com.datastax.driver.core.policies.DCAwareRoundRobinPolicy - Using data-center name 'datacenter1' for DCAwareRoundRobinPolicy (if this is incorrect, please provide the correct datacenter name with DCAwareRoundRobinPolicy constructor)
[main] INFO com.datastax.driver.core.Cluster - New Cassandra host localhost/127.0.0.1:9042 added
[main] INFO com.scalar.db.storage.cassandra.ClusterManager - Session to the cluster is created
[main] INFO com.scalar.db.schemaloader.SchemaOperator - Creating the table customers in the namespace customer succeeded
[main] INFO com.scalar.db.schemaloader.SchemaOperator - Creating the table orders in the namespace order succeeded
[main] INFO com.scalar.db.schemaloader.SchemaOperator - Creating the table statements in the namespace order succeeded
[main] INFO com.scalar.db.schemaloader.SchemaOperator - Creating the table items in the namespace order succeeded
[main] INFO com.scalar.db.schemaloader.SchemaOperator - Creating the coordinator tables succeeded

基本的には前回と同様ですが、database.propertiesで指定されたデータベース(今回でいうとCassandaraとMySQL)に接続を行い、schema.jsonの内容に従いスキーマを作成します。

schema.json
{
  "customer.customers": {
    "transaction": true,
    "partition-key": [
      "customer_id"
    ],
    "columns": {
      "customer_id": "INT",
      "name": "TEXT",
      "credit_limit": "INT",
      "credit_total": "INT"
    }
  },
  "order.orders": {
    "transaction": true,
    "partition-key": [
      "customer_id"
    ],
    "clustering-key": [
      "timestamp"
    ],
    "secondary-index": [
      "order_id"
    ],
    "columns": {
      "order_id": "TEXT",
      "customer_id": "INT",
      "timestamp": "BIGINT"
    }
  },
  "order.statements": {
    "transaction": true,
    "partition-key": [
      "order_id"
    ],
    "clustering-key": [
      "item_id"
    ],
    "columns": {
      "order_id": "TEXT",
      "customer_id": "INT",
      "timestamp": "BIGINT"
    }
  },
  "order.statements": {
    "transaction": true,
    "partition-key": [
      "order_id"
    ],
    "clustering-key": [
      "item_id"
    ],
    "columns": {
      "order_id": "TEXT",
      "item_id": "INT",
      "count": "INT"
    }
  },
  "order.items": {
    "transaction": true,
    "partition-key": [
      "item_id"
    ],
    "columns": {
      "item_id": "INT",
      "name": "TEXT",
      "price": "INT"
    }
  }
}

ではこの状態でもう一度両方のDBを確認します。

cqlsh
SELECT * FROM system_schema.keyspaces;
 keyspace_name      | durable_writes | replication
--------------------+----------------+-------------------------------------------------------------------------------------
<snip>
        coordinator |           True | {'class': 'org.apache.cassandra.locator.SimpleStrategy', 'replication_factor': '1'}
              order |           True | {'class': 'org.apache.cassandra.locator.SimpleStrategy', 'replication_factor': '1'}
<snip>

coordinatororderが出来ていますね。

SELECT table_name FROM system_schema.tables WHERE keyspace_name = 'coordinator';

 table_name
------------
      state

(1 rows)
SELECT table_name FROM system_schema.tables WHERE keyspace_name = 'order';

 table_name
------------
      items
     orders
 statements

(3 rows)

それぞれのKeyspcaeにテーブルが出来ています。なおこの3つは実際のテーブルではなくViewとなっています。

SELECT *  FROM system_schema.views  WHERE keyspace_name = 'order' AND view_name = 'items';
 keyspace_name | view_name | base_table_id | base_table_name | bloom_filter_fp_chance | caching | cdc | comment | compaction | compression | crc_check_chance | dclocal_read_repair_chance | default_time_to_live | extensions | gc_grace_seconds | id | include_all_columns | max_index_interval | memtable_flush_period_in_ms | min_index_interval | read_repair_chance | speculative_retry | where_clause
---------------+-----------+---------------+-----------------+------------------------+---------+-----+---------+------------+-------------+------------------+----------------------------+----------------------+------------+------------------+----+---------------------+--------------------+-----------------------------+--------------------+--------------------+-------------------+--------------

coordonator.stateの中身を見ておきます。

select * from coordinator.state;

 tx_id | tx_created_at | tx_state
-------+---------------+----------

(0 rows)

このようにテーブルの中にまだデータは格納されていません。他3つのデーブルも同じです。
同様にMySQLもみておきます。

mariadb -u root -p -h 127.0.0.1
show databases;
+--------------------+
| Database           |
+--------------------+
| customer|
<snip>
| scalardb           |
<snip>

scalardbcustomerが増えています。

SHOW TABLES FROM scalardb;
+--------------------+
| Tables_in_scalardb |
+--------------------+
| metadata           |
+--------------------+
1 row in set (0.002 sec)
select * from scalardb.metadata;
+--------------------+------------------------+-----------+-----------+------------------+---------+------------------+
| full_table_name    | column_name            | data_type | key_type  | clustering_order | indexed | ordinal_position |
+--------------------+------------------------+-----------+-----------+------------------+---------+------------------+
| customer.customers | before_credit_limit    | INT       | NULL      | NULL             |       0 |               16 |
| customer.customers | before_credit_total    | INT       | NULL      | NULL             |       0 |               17 |
| customer.customers | before_name            | TEXT      | NULL      | NULL             |       0 |               15 |
| customer.customers | before_tx_committed_at | BIGINT    | NULL      | NULL             |       0 |               14 |
| customer.customers | before_tx_id           | TEXT      | NULL      | NULL             |       0 |               10 |
| customer.customers | before_tx_prepared_at  | BIGINT    | NULL      | NULL             |       0 |               13 |
| customer.customers | before_tx_state        | INT       | NULL      | NULL             |       0 |               11 |
| customer.customers | before_tx_version      | INT       | NULL      | NULL             |       0 |               12 |
| customer.customers | credit_limit           | INT       | NULL      | NULL             |       0 |                3 |
| customer.customers | credit_total           | INT       | NULL      | NULL             |       0 |                4 |
| customer.customers | customer_id            | INT       | PARTITION | NULL             |       0 |                1 |
| customer.customers | name                   | TEXT      | NULL      | NULL             |       0 |                2 |
| customer.customers | tx_committed_at        | BIGINT    | NULL      | NULL             |       0 |                9 |
| customer.customers | tx_id                  | TEXT      | NULL      | NULL             |       0 |                5 |
| customer.customers | tx_prepared_at         | BIGINT    | NULL      | NULL             |       0 |                8 |
| customer.customers | tx_state               | INT       | NULL      | NULL             |       0 |                6 |
| customer.customers | tx_version             | INT       | NULL      | NULL             |       0 |                7 |
+--------------------+------------------------+-----------+-----------+------------------+---------+------------------+
17 rows in set (0.001 sec)
SHOW TABLES FROM customer;
+--------------------+
| Tables_in_customer |
+--------------------+
| customers          |
+--------------------+
1 row in set (0.003 sec)

現時点ではCustomerテーブルの中身はまだ空です。

図にするとこうなります。

データ投入

では次にgradlewというツールを用いてデータを投入してきます。

./gradlew run --args="LoadInitialData"
BUILD SUCCESSFUL in xxs

と表示されれば完了です。
まずcassandraのcoordinator.stateには以下の通りデータが1件投入されています。

select * from coordinator.state;

 tx_id                                | tx_created_at | tx_state
--------------------------------------+---------------+----------
 58f2ac0a-b1f1-470d-96c1-469babef89d4 | 1722144719272 |        3

(1 rows)

items,のViewにもデータが投入されました。(まだorders,statementsにはデータが書き込まれていません)ではその中身を見ていきます。なおCassandraではorderは予約語になるためSQLで指定する場合はダブルクォーテーション(")を付ける必要があります。

SELECT * FROM "order".items;
 item_id | before_name | before_price | before_tx_committed_at | before_tx_id | before_tx_prepared_at | before_tx_state | before_tx_version | name   | price | tx_committed_at | tx_id                                | tx_prepared_at | tx_state | tx_version
---------+-------------+--------------+------------------------+--------------+-----------------------+-----------------+-------------------+--------+-------+-----------------+--------------------------------------+----------------+----------+------------
       5 |        null |         null |                   null |         null |                  null |            null |              null |  Melon |  3000 |   1722144719293 | 58f2ac0a-b1f1-470d-96c1-469babef89d4 |  1722144719134 |        3 |          1
       1 |        null |         null |                   null |         null |                  null |            null |              null |  Apple |  1000 |   1722144719293 | 58f2ac0a-b1f1-470d-96c1-469babef89d4 |  1722144719134 |        3 |          1
       2 |        null |         null |                   null |         null |                  null |            null |              null | Orange |  2000 |   1722144719293 | 58f2ac0a-b1f1-470d-96c1-469babef89d4 |  1722144719134 |        3 |          1
       4 |        null |         null |                   null |         null |                  null |            null |              null |  Mango |  5000 |   1722144719293 | 58f2ac0a-b1f1-470d-96c1-469babef89d4 |  1722144719134 |        3 |          1
       3 |        null |         null |                   null |         null |                  null |            null |              null |  Grape |  2500 |   1722144719293 | 58f2ac0a-b1f1-470d-96c1-469babef89d4 |  1722144719134 |        3 |          1

tx_idの値がcoordonator.stateの値と一致していることに注目してください。gradlewがデータを投入したタイミングで付与されたものです。これによりトランザクションの順番がわかるようになっています。次にカラムのtx_prepared_atはデータが投入されたUNIX時間です。これによりトランザクションの順番がわかります。つまり、1回のトランザクションでデータが5件投入され、各Viewでは5のデータを保持していますが、coordonator.stateではそれをデータ単位ではなく1つのトランザクションとして管理しているためデータは1個になります。

同様にMySQLも見ておきます。

show databases;
+--------------------+
| Database           |
+--------------------+
| customer           |
<snip>| 
| scalardb           |
<snip>
6 rows in set (0.001 sec)
SHOW TABLES FROM customer;
+--------------------+
| Tables_in_customer |
+--------------------+
| customers          |
+--------------------+
1 row in set (0.001 sec)
select * from customer.customers;
+-------------+---------------+--------------+--------------+--------------------------------------+----------+------------+----------------+-----------------+--------------+-----------------+-------------------+-----------------------+------------------------+-------------+---------------------+---------------------+
| customer_id | name          | credit_limit | credit_total | tx_id                                | tx_state | tx_version | tx_prepared_at | tx_committed_at | before_tx_id | before_tx_state | before_tx_version | before_tx_prepared_at | before_tx_committed_at | before_name | before_credit_limit | before_credit_total |
+-------------+---------------+--------------+--------------+--------------------------------------+----------+------------+----------------+-----------------+--------------+-----------------+-------------------+-----------------------+------------------------+-------------+---------------------+---------------------+
|           1 | Yamada Taro   |        10000 |            0 | 58f2ac0a-b1f1-470d-96c1-469babef89d4 |        3 |          1 |  1722144719134 |   1722144719293 | NULL         |            NULL |              NULL |                  NULL |                   NULL | NULL        |                NULL |                NULL |
|           2 | Yamada Hanako |        10000 |            0 | 58f2ac0a-b1f1-470d-96c1-469babef89d4 |        3 |          1 |  1722144719134 |   1722144719293 | NULL         |            NULL |              NULL |                  NULL |                   NULL | NULL        |                NULL |                NULL |
|           3 | Suzuki Ichiro |        10000 |            0 | 58f2ac0a-b1f1-470d-96c1-469babef89d4 |        3 |          1 |  1722144719134 |   1722144719293 | NULL         |            NULL |              NULL |                  NULL |                   NULL | NULL        |                NULL |                NULL |
+-------------+---------------+--------------+--------------+--------------------------------------+----------+------------+----------------+-----------------+--------------+-----------------+-------------------+-----------------------+------------------------+-------------+---------------------+---------------------+

tx_idの値が先ほどと同じことに注目してください。こうすることによってCassandra上のcoordinator.stateが全体を管理しています。tx_prepared_atがトランザクションが開始されたUNIX時間、tx_committed_atがトランザクションが完了した(コミットされた)UNIX時間です。

gradlew で投入されるデータ

データ投入で用いたgradlewはJavaのプログラムです。読み込んでいるデータはハードコーディングされています。

multi-storage-transaction-sample/src/main/java/sample/Sample.java
<snip>
 public void loadInitialData() throws TransactionException {
    DistributedTransaction transaction = null;
    try {
      transaction = manager.start();
      loadCustomerIfNotExists(transaction, 1, "Yamada Taro", 10000, 0);
      loadCustomerIfNotExists(transaction, 2, "Yamada Hanako", 10000, 0);
      loadCustomerIfNotExists(transaction, 3, "Suzuki Ichiro", 10000, 0);
      loadItemIfNotExists(transaction, 1, "Apple", 1000);
      loadItemIfNotExists(transaction, 2, "Orange", 2000);
      loadItemIfNotExists(transaction, 3, "Grape", 2500);
      loadItemIfNotExists(transaction, 4, "Mango", 5000);
      loadItemIfNotExists(transaction, 5, "Melon", 3000);
      transaction.commit();
    } catch (TransactionException e) {
<snip>

CustomerItemdatabase.propertiesによりどのデータベースを用いるかが制御されています。

database.properties
<snip>
scalar.db.multi_storage.namespace_mapping=customer:mysql,order:cassandra,coordinator:cassandra
<snip>

JavaプログラムのloadCustomerIfNotExistsの部分は以下の通りdatabase.propertiescustomer:mysqlと連携しています。

  private void loadCustomerIfNotExists(
      DistributedTransaction transaction,
      int customerId,
      String name,
      int creditLimit,
      int creditTotal)
      throws TransactionException {
    Optional<Result> customer =
        transaction.get(
            Get.newBuilder()
                .namespace("customer")
                .table("customers")
                .partitionKey(Key.ofInt("customer_id", customerId))
                .build());
<snip>

次の記事では
https://scalardb.scalar-labs.com/docs/latest/scalardb-samples/multi-storage-transaction-sample/
の後半戦を Step by Step でデータの流れを見ながら理解を深めていきたいと思います。

Discussion