Slick(MySQL)でBulk Upsertを実装する
一つのテーブルへ複数レコードのUpsertを行いたい場合にバルクで実行ができればいいのですが、現在のSlick 3.3.3では一度のSQL文でBulkUpsertを行ってくれる機能はありません。
今回は、MySQLへの書き込みでSlickでのBulk Upsertを実現する方法を説明します。
一レコード単位のupsertを並列で実行させる方法
Upsertingという機能で、複数のレコードのUpsertを行いたい場合は場合は以下のような書き方ができます。
import Tables.profile.api._
case class Book(id: Int, name: String, author: String)
val books: Seq[Book] = Seq(
Book(1, "MyBook1", "Yamada"),
Book(2, "MyBook2", "Sato")
)
val actions = books.map(Tables.Book.insertOrUpdate)
DBIO.sequence(actions)
DBIO.sequence
は各I/Oアクションの実行、結果を返すもので上記をI/Oアクションを実行するとコレクションのbulk uppsertが実現できます。
ただしshow processlistを見ると、Bulk Upsertされているわけではなく、一レコードづつSQLが並列で書き込みまれます。
mysql> select INFO from information_schema.PROCESSLIST where Info like '%insert ignore into%';
+---------------------------------------------------------------------------------------------------------------------------------------------------+
| INFO |
+---------------------------------------------------------------------------------------------------------------------------------------------------+
| select INFO from information_schema.PROCESSLIST where Info like '%insert ignore into%' |
| INSERT INTO book (`id`, `name`, `author`) VALUES ('1', 'MyBook1', 'Yamada') ON DUPLICATE KEY UPDATE name=VALUES(`name`), author=VALUES(`author`) |
| INSERT INTO book (`id`, `name`, `author`) VALUES ('2', 'MyBook2', 'Sato') ON DUPLICATE KEY UPDATE name=VALUES(`name`), author=VALUES(`author`) |
+---------------------------------------------------------------------------------------------------------------------------------------------------+
3 rows in set (0.42 sec)
処理するデータ量が少ない場合は、このやり方で問題ないのですが、バルクで処理するデータ量が増えると一レコードづつの書き込みがボトルネックになってしまったりDB自体の負荷も高くなります。
ON DUPLICATE KEY UPDATE ステートメントを使う方法
MySQLでは、ON DUPLICATE KEY UPDATE ステートメントを使うことでBulk Upsertができます。
INSERT INTO book (`id`, `name`, `author`)
VALUES ('1', 'MyBook1', 'Yamada'),
('2', 'MyBook2', 'Sato')
ON DUPLICATE KEY UPDATE name=VALUES(`name`), author=VALUES(`author`);
13.2.6.2 INSERT ... ON DUPLICATE KEY UPDATE ステートメント
ON DUPLICATE KEY UPDATE を指定したとき、UNIQUE インデックスまたは PRIMARY KEY に重複した値を発生させる行が挿入された場合は、MySQL によって古い行の UPDATE が実行されます
この方法であれば、一回のSQLの実行で複数レコードを更新してくれるので、DB書き込みのボトルネックも解決できますし、 DBへの書き込みの負荷も軽減されます。
ということで、まずはON DUPLICATE KEY UPDATE のDMLを生成してくれるBulk Upsertを自前で用意します。
private val insertIntoSQL =
"INSERT INTO book (id, name, author) VALUES (?, ?, ?)"
private val addParam = ", (?, ?, ?)"
private val ON_DUPLICATE_KEY_UPDATE = " ON DUPLICATE KEY UPDATE name=VALUES(name), author=VALUES(author);"
def generateUpsertSql(rows: Seq[Book]): String = {
val sqlStatement = new StringBuffer(insertIntoSQL)
(0 until (rows.length - 1)) foreach (_ => sqlStatement.append(addParam))
sqlStatement.append(ON_DUPLICATE_KEY_UPDATE)
sqlStatement.toString
}
JDBCでSQLにパラメーターを渡すため構文です。
コレクションの数によって「?」の数が変わるため「?」を動的に作ります。
以下のようにSeqの数に応じて動的に文字列のDMLを生成してくれます。
class GenerateUpsertSQLSpec extends AnyFunSpec with Matchers {
describe("generateUpsertSqlでBookのサイズから動的にUpsertのDMLを生成できる。") {
it("[1レコードの場合]BookからUpsertのDMLを生成できる。") {
val rows: Seq[Book] = Seq(
Book(
id = "1",
name = "mybook1",
author = "Yamada"
)
)
val result = generateUpsertSql(rows)
val expect =
"INSERT INTO book (id,name, author) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE name=VALUES(name), author=VALUES(author);"
result shouldBe expect
}
it("[3レコードの場合]BookからUpsertのDDLを生成できる。") {
val now = LocalDateTime.now()
val rows: Seq[Book] = Seq(
BookRow(
id = "1",
name = "mybook1",
author = "Yamada"
),
BookRow(
id = "2",
name = "mybook2",
author = "Yoneda"
),
BookRow(
id = "3",
name = "mybook3",
author = "Sato"
)
)
val result = generateUpsertSql(rows)
val expect =
"INSERT INTO book (id,name, author) VALUES (?, ?, ?), (?, ?, ?), (?, ?, ?) ON DUPLICATE KEY UPDATE name=VALUES(name), author=VALUES(author);"
result shouldBe expect
}
}
}
[info] GenerateUpsertSQLSpec:
[info] generateUpsertSqlでBookのサイズから動的にUpsertのDMLを生成できる。
[info] - [1レコードの場合]BookからUpsertのDMLを生成できる。
[info] - [3レコードの場合]BookからUpsertのDDLを生成できる。
[info] Run completed in 559 milliseconds.
[info] Total number of tests run: 2
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 2, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 1 s, completed 2022/04/03 0:33:55
実行するSQLができました。
Slickで生のSQLを実行するためにSimpleDBIOがあります。
SimpleDBIOはSlickで利用出来ない機能を使うためにJDBCのレベルのコードを実行するために利用できます。
SimpleDBIOを使うことで以下のように実装できます。
def save(books: Seq[Book]): Future[Unit] = {
val sqlStatement = generateUpsertSql(books)
val query: SimpleDBIO[Int] = SimpleDBIO { session =>
val statement = session.connection.prepareStatement(sqlStatement)
books.zipWithIndex.foreach {
case (row, i) =>
val rowSize = 3
val offset = i * rowSize
statement.setInt(1 + offset, row.id)
statement.setString(2 + offset, row.name)
statement.setString(3 + offset, row.author)
}
statement.executeUpdate()
}
database
.run(query)
.map(_ => ())
}
generateUpsertSqlでbooksのコレクションの数に応じたパラメーターの文字列を生成し、Bookのid, name,authorをそれぞれprepareStatementの引数として設定します。
こうすることで、以下のSQLと同じSQLが発行されます。
INSERT INTO book (`id`, `name`, `author`)
VALUES ('1', 'MyBook1', 'Yamada'),
('2', 'MyBook2', 'Sato')
ON DUPLICATE KEY UPDATE name=VALUES(`name`), author=VALUES(`author`);
実際に使ってみた結果
今回の対応でスループットが 3,4倍に上がったので、パフォーマンスが欲しい時は良さそうです。
補足
Stack overflowで以下のような書き方も紹介されていたが結果は、1行つづの書き込みになり、冒頭の書き方と変わらなかったです。
Slick 3.0 bulk insert or update (upsert)
Discussion