Multi Storage Transaction サンプルを試す
今日は以下のチュートリアルをStep by Stepで見ていきたいと思います。
環境構築
まずは Amazon Linux 2023 をt2.xlargeのEBS 50GBで起動します。
起動できたらこちらのBlogの内容をもとに必要なツール群をインストールしていきます。
- Java 実行環境
- Git ツール群
- Docker と Docker Compose
をインストールしてください。
全て上記のブログにコピペ用コマンドが載っています。
次にCassandraへアクセスするために以下のコマンドでツールをインストールします。
sudo yum install pip
pip install cqlsh
最後にMySQL用クライアントをインストールしますが、歴史的な背景からAWSではMySQLではなく互換のMariaDBを用いることを推奨していますのでそちらをインストールします。
sudo dnf -y install mariadb105
以上で環境構築は完了です。
ScalarDB Sample セットアップ
まずScalarDBの学習用に準備されているサンプルのレポジトリをクローンし、移動します。
git clone https://github.com/scalar-labs/scalardb-samples
cd scalardb-samples/multi-storage-transaction-sample
環境はコンテナ形式で準備されているのでコンテナを起動します。
sudo docker-compose up -d
sudo docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
21ce621184e7 mysql:8.0 "docker-entrypoint.s…" 38 seconds ago Up 31 seconds 0.0.0.0:3306->3306/tcp, :::3306->3306/tcp, 33060/tcp mysql-1
9676eb1252e9 cassandra:3.11 "docker-entrypoint.s…" 38 seconds ago Up 31 seconds 7000-7001/tcp, 7199/tcp, 9160/tcp, 0.0.0.0:9042->9042/tcp, :::9042->9042/tcp cassandra-1
mysql
とcassandra
が起動していることがわかります。
一旦ここでcassandra
の中身を見ておきます。
cqlsh
SELECT * FROM system_schema.keyspaces;
keyspace_name | durable_writes | replication
--------------------+----------------+-------------------------------------------------------------------------------------
system_auth | True | {'class': 'org.apache.cassandra.locator.SimpleStrategy', 'replication_factor': '1'}
system_schema | True | {'class': 'org.apache.cassandra.locator.LocalStrategy'}
system_distributed | True | {'class': 'org.apache.cassandra.locator.SimpleStrategy', 'replication_factor': '3'}
system | True | {'class': 'org.apache.cassandra.locator.LocalStrategy'}
system_traces | True | {'class': 'org.apache.cassandra.locator.SimpleStrategy', 'replication_factor': '2'}
(5 rows)
存在しているKeyspaceはシステム用のみでまだ何のスキーマやデータも投入されていません。
同様にMySQLもみておきます。
mariadb -u root -p -h 127.0.0.1
パスワードはmysql
です。
show databases;
+--------------------+
| Database |
+--------------------+
| information_schema |
| mysql |
| performance_schema |
| sys |
+--------------------+
4 rows in set (0.002 sec)
同様にまだ何も含まれていません。
スキーマ投入
ではここにScalarDBの機能を用いてまずはスキーマを投入していきます。
wget https://github.com/scalar-labs/scalardb/releases/download/v3.13.0/scalardb-schema-loader-3.13.0.jar
java -jar scalardb-schema-loader-3.13.0.jar --config database.properties --schema-file schema.json --coordinator
[main] INFO com.scalar.db.schemaloader.command.SchemaLoaderCommand - Config path: database.properties
[main] INFO com.scalar.db.schemaloader.command.SchemaLoaderCommand - Schema path: schema.json
[main] INFO com.datastax.driver.core - DataStax Java driver 3.11.5 for Apache Cassandra
[main] INFO com.datastax.driver.core.GuavaCompatibility - Detected Guava >= 19 in the classpath, using modern compatibility layer
[main] INFO com.datastax.driver.core.ClockFactory - Using native clock to generate timestamps.
[main] INFO com.datastax.driver.core.NettyUtil - Found Netty's native epoll transport in the classpath, using it
[main] INFO com.datastax.driver.core.Cluster - Cannot connect with protocol version V5, trying with V4
[main] INFO com.datastax.driver.core.policies.DCAwareRoundRobinPolicy - Using data-center name 'datacenter1' for DCAwareRoundRobinPolicy (if this is incorrect, please provide the correct datacenter name with DCAwareRoundRobinPolicy constructor)
[main] INFO com.datastax.driver.core.Cluster - New Cassandra host localhost/127.0.0.1:9042 added
[main] INFO com.scalar.db.storage.cassandra.ClusterManager - Session to the cluster is created
[main] INFO com.scalar.db.schemaloader.SchemaOperator - Creating the table customers in the namespace customer succeeded
[main] INFO com.scalar.db.schemaloader.SchemaOperator - Creating the table orders in the namespace order succeeded
[main] INFO com.scalar.db.schemaloader.SchemaOperator - Creating the table statements in the namespace order succeeded
[main] INFO com.scalar.db.schemaloader.SchemaOperator - Creating the table items in the namespace order succeeded
[main] INFO com.scalar.db.schemaloader.SchemaOperator - Creating the coordinator tables succeeded
基本的には前回と同様ですが、database.properties
で指定されたデータベース(今回でいうとCassandaraとMySQL)に接続を行い、schema.json
の内容に従いスキーマを作成します。
{
"customer.customers": {
"transaction": true,
"partition-key": [
"customer_id"
],
"columns": {
"customer_id": "INT",
"name": "TEXT",
"credit_limit": "INT",
"credit_total": "INT"
}
},
"order.orders": {
"transaction": true,
"partition-key": [
"customer_id"
],
"clustering-key": [
"timestamp"
],
"secondary-index": [
"order_id"
],
"columns": {
"order_id": "TEXT",
"customer_id": "INT",
"timestamp": "BIGINT"
}
},
"order.statements": {
"transaction": true,
"partition-key": [
"order_id"
],
"clustering-key": [
"item_id"
],
"columns": {
"order_id": "TEXT",
"customer_id": "INT",
"timestamp": "BIGINT"
}
},
"order.statements": {
"transaction": true,
"partition-key": [
"order_id"
],
"clustering-key": [
"item_id"
],
"columns": {
"order_id": "TEXT",
"item_id": "INT",
"count": "INT"
}
},
"order.items": {
"transaction": true,
"partition-key": [
"item_id"
],
"columns": {
"item_id": "INT",
"name": "TEXT",
"price": "INT"
}
}
}
ではこの状態でもう一度両方のDBを確認します。
cqlsh
SELECT * FROM system_schema.keyspaces;
keyspace_name | durable_writes | replication
--------------------+----------------+-------------------------------------------------------------------------------------
<snip>
coordinator | True | {'class': 'org.apache.cassandra.locator.SimpleStrategy', 'replication_factor': '1'}
order | True | {'class': 'org.apache.cassandra.locator.SimpleStrategy', 'replication_factor': '1'}
<snip>
coordinator
とorder
が出来ていますね。
SELECT table_name FROM system_schema.tables WHERE keyspace_name = 'coordinator';
table_name
------------
state
(1 rows)
SELECT table_name FROM system_schema.tables WHERE keyspace_name = 'order';
table_name
------------
items
orders
statements
(3 rows)
それぞれのKeyspcaeにテーブルが出来ています。なおこの3つは実際のテーブルではなくViewとなっています。
SELECT * FROM system_schema.views WHERE keyspace_name = 'order' AND view_name = 'items';
keyspace_name | view_name | base_table_id | base_table_name | bloom_filter_fp_chance | caching | cdc | comment | compaction | compression | crc_check_chance | dclocal_read_repair_chance | default_time_to_live | extensions | gc_grace_seconds | id | include_all_columns | max_index_interval | memtable_flush_period_in_ms | min_index_interval | read_repair_chance | speculative_retry | where_clause
---------------+-----------+---------------+-----------------+------------------------+---------+-----+---------+------------+-------------+------------------+----------------------------+----------------------+------------+------------------+----+---------------------+--------------------+-----------------------------+--------------------+--------------------+-------------------+--------------
coordonator.state
の中身を見ておきます。
select * from coordinator.state;
tx_id | tx_created_at | tx_state
-------+---------------+----------
(0 rows)
このようにテーブルの中にまだデータは格納されていません。他3つのデーブルも同じです。
同様にMySQLもみておきます。
mariadb -u root -p -h 127.0.0.1
show databases;
+--------------------+
| Database |
+--------------------+
| customer|
<snip>
| scalardb |
<snip>
scalardb
とcustomer
が増えています。
SHOW TABLES FROM scalardb;
+--------------------+
| Tables_in_scalardb |
+--------------------+
| metadata |
+--------------------+
1 row in set (0.002 sec)
select * from scalardb.metadata;
+--------------------+------------------------+-----------+-----------+------------------+---------+------------------+
| full_table_name | column_name | data_type | key_type | clustering_order | indexed | ordinal_position |
+--------------------+------------------------+-----------+-----------+------------------+---------+------------------+
| customer.customers | before_credit_limit | INT | NULL | NULL | 0 | 16 |
| customer.customers | before_credit_total | INT | NULL | NULL | 0 | 17 |
| customer.customers | before_name | TEXT | NULL | NULL | 0 | 15 |
| customer.customers | before_tx_committed_at | BIGINT | NULL | NULL | 0 | 14 |
| customer.customers | before_tx_id | TEXT | NULL | NULL | 0 | 10 |
| customer.customers | before_tx_prepared_at | BIGINT | NULL | NULL | 0 | 13 |
| customer.customers | before_tx_state | INT | NULL | NULL | 0 | 11 |
| customer.customers | before_tx_version | INT | NULL | NULL | 0 | 12 |
| customer.customers | credit_limit | INT | NULL | NULL | 0 | 3 |
| customer.customers | credit_total | INT | NULL | NULL | 0 | 4 |
| customer.customers | customer_id | INT | PARTITION | NULL | 0 | 1 |
| customer.customers | name | TEXT | NULL | NULL | 0 | 2 |
| customer.customers | tx_committed_at | BIGINT | NULL | NULL | 0 | 9 |
| customer.customers | tx_id | TEXT | NULL | NULL | 0 | 5 |
| customer.customers | tx_prepared_at | BIGINT | NULL | NULL | 0 | 8 |
| customer.customers | tx_state | INT | NULL | NULL | 0 | 6 |
| customer.customers | tx_version | INT | NULL | NULL | 0 | 7 |
+--------------------+------------------------+-----------+-----------+------------------+---------+------------------+
17 rows in set (0.001 sec)
SHOW TABLES FROM customer;
+--------------------+
| Tables_in_customer |
+--------------------+
| customers |
+--------------------+
1 row in set (0.003 sec)
現時点ではCustomerテーブルの中身はまだ空です。
図にするとこうなります。
データ投入
では次にgradlew
というツールを用いてデータを投入してきます。
./gradlew run --args="LoadInitialData"
BUILD SUCCESSFUL in xxs
と表示されれば完了です。
まずcassandraのcoordinator.state
には以下の通りデータが1件投入されています。
select * from coordinator.state;
tx_id | tx_created_at | tx_state
--------------------------------------+---------------+----------
58f2ac0a-b1f1-470d-96c1-469babef89d4 | 1722144719272 | 3
(1 rows)
items
,のViewにもデータが投入されました。(まだorders
,statements
にはデータが書き込まれていません)ではその中身を見ていきます。なおCassandraではorder
は予約語になるためSQLで指定する場合はダブルクォーテーション(")を付ける必要があります。
SELECT * FROM "order".items;
item_id | before_name | before_price | before_tx_committed_at | before_tx_id | before_tx_prepared_at | before_tx_state | before_tx_version | name | price | tx_committed_at | tx_id | tx_prepared_at | tx_state | tx_version
---------+-------------+--------------+------------------------+--------------+-----------------------+-----------------+-------------------+--------+-------+-----------------+--------------------------------------+----------------+----------+------------
5 | null | null | null | null | null | null | null | Melon | 3000 | 1722144719293 | 58f2ac0a-b1f1-470d-96c1-469babef89d4 | 1722144719134 | 3 | 1
1 | null | null | null | null | null | null | null | Apple | 1000 | 1722144719293 | 58f2ac0a-b1f1-470d-96c1-469babef89d4 | 1722144719134 | 3 | 1
2 | null | null | null | null | null | null | null | Orange | 2000 | 1722144719293 | 58f2ac0a-b1f1-470d-96c1-469babef89d4 | 1722144719134 | 3 | 1
4 | null | null | null | null | null | null | null | Mango | 5000 | 1722144719293 | 58f2ac0a-b1f1-470d-96c1-469babef89d4 | 1722144719134 | 3 | 1
3 | null | null | null | null | null | null | null | Grape | 2500 | 1722144719293 | 58f2ac0a-b1f1-470d-96c1-469babef89d4 | 1722144719134 | 3 | 1
tx_id
の値がcoordonator.state
の値と一致していることに注目してください。gradlew
がデータを投入したタイミングで付与されたものです。これによりトランザクションの順番がわかるようになっています。次にカラムのtx_prepared_at
はデータが投入されたUNIX時間です。これによりトランザクションの順番がわかります。つまり、1回のトランザクションでデータが5件投入され、各Viewでは5のデータを保持していますが、coordonator.state
ではそれをデータ単位ではなく1つのトランザクションとして管理しているためデータは1個になります。
同様にMySQLも見ておきます。
show databases;
+--------------------+
| Database |
+--------------------+
| customer |
<snip>|
| scalardb |
<snip>
6 rows in set (0.001 sec)
SHOW TABLES FROM customer;
+--------------------+
| Tables_in_customer |
+--------------------+
| customers |
+--------------------+
1 row in set (0.001 sec)
select * from customer.customers;
+-------------+---------------+--------------+--------------+--------------------------------------+----------+------------+----------------+-----------------+--------------+-----------------+-------------------+-----------------------+------------------------+-------------+---------------------+---------------------+
| customer_id | name | credit_limit | credit_total | tx_id | tx_state | tx_version | tx_prepared_at | tx_committed_at | before_tx_id | before_tx_state | before_tx_version | before_tx_prepared_at | before_tx_committed_at | before_name | before_credit_limit | before_credit_total |
+-------------+---------------+--------------+--------------+--------------------------------------+----------+------------+----------------+-----------------+--------------+-----------------+-------------------+-----------------------+------------------------+-------------+---------------------+---------------------+
| 1 | Yamada Taro | 10000 | 0 | 58f2ac0a-b1f1-470d-96c1-469babef89d4 | 3 | 1 | 1722144719134 | 1722144719293 | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL |
| 2 | Yamada Hanako | 10000 | 0 | 58f2ac0a-b1f1-470d-96c1-469babef89d4 | 3 | 1 | 1722144719134 | 1722144719293 | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL |
| 3 | Suzuki Ichiro | 10000 | 0 | 58f2ac0a-b1f1-470d-96c1-469babef89d4 | 3 | 1 | 1722144719134 | 1722144719293 | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL |
+-------------+---------------+--------------+--------------+--------------------------------------+----------+------------+----------------+-----------------+--------------+-----------------+-------------------+-----------------------+------------------------+-------------+---------------------+---------------------+
tx_id
の値が先ほどと同じことに注目してください。こうすることによってCassandra上のcoordinator.stateが全体を管理しています。tx_prepared_at
がトランザクションが開始されたUNIX時間、tx_committed_at
がトランザクションが完了した(コミットされた)UNIX時間です。
gradlew で投入されるデータ
データ投入で用いたgradlew
はJavaのプログラムです。読み込んでいるデータはハードコーディングされています。
<snip>
public void loadInitialData() throws TransactionException {
DistributedTransaction transaction = null;
try {
transaction = manager.start();
loadCustomerIfNotExists(transaction, 1, "Yamada Taro", 10000, 0);
loadCustomerIfNotExists(transaction, 2, "Yamada Hanako", 10000, 0);
loadCustomerIfNotExists(transaction, 3, "Suzuki Ichiro", 10000, 0);
loadItemIfNotExists(transaction, 1, "Apple", 1000);
loadItemIfNotExists(transaction, 2, "Orange", 2000);
loadItemIfNotExists(transaction, 3, "Grape", 2500);
loadItemIfNotExists(transaction, 4, "Mango", 5000);
loadItemIfNotExists(transaction, 5, "Melon", 3000);
transaction.commit();
} catch (TransactionException e) {
<snip>
Customer
やItem
はdatabase.properties
によりどのデータベースを用いるかが制御されています。
<snip>
scalar.db.multi_storage.namespace_mapping=customer:mysql,order:cassandra,coordinator:cassandra
<snip>
JavaプログラムのloadCustomerIfNotExists
の部分は以下の通りdatabase.properties
のcustomer:mysql
と連携しています。
private void loadCustomerIfNotExists(
DistributedTransaction transaction,
int customerId,
String name,
int creditLimit,
int creditTotal)
throws TransactionException {
Optional<Result> customer =
transaction.get(
Get.newBuilder()
.namespace("customer")
.table("customers")
.partitionKey(Key.ofInt("customer_id", customerId))
.build());
<snip>
次の記事では
の後半戦を Step by Step でデータの流れを見ながら理解を深めていきたいと思います。
Discussion