✍️

Slick(MySQL)でBulk Upsertを実装する

2022/04/03に公開

一つのテーブルへ複数レコードのUpsertを行いたい場合にバルクで実行ができればいいのですが、現在のSlick 3.3.3では一度のSQL文でBulkUpsertを行ってくれる機能はありません。
今回は、MySQLへの書き込みでSlickでのBulk Upsertを実現する方法を説明します。

一レコード単位のupsertを並列で実行させる方法

Upsertingという機能で、複数のレコードのUpsertを行いたい場合は場合は以下のような書き方ができます。

  import Tables.profile.api._
  
    case class Book(id: Int, name: String, author: String)
    val books: Seq[Book] = Seq(
      Book(1, "MyBook1", "Yamada"),
      Book(2, "MyBook2", "Sato")
    )
    val actions = books.map(Tables.Book.insertOrUpdate)
    DBIO.sequence(actions)

Sequential Execution

DBIO.sequenceは各I/Oアクションの実行、結果を返すもので上記をI/Oアクションを実行するとコレクションのbulk uppsertが実現できます。

ただしshow processlistを見ると、Bulk Upsertされているわけではなく、一レコードづつSQLが並列で書き込みまれます。


mysql> select INFO from information_schema.PROCESSLIST where Info like '%insert ignore into%';

+---------------------------------------------------------------------------------------------------------------------------------------------------+
| INFO                                                                                                                                              |
+---------------------------------------------------------------------------------------------------------------------------------------------------+
| select INFO from information_schema.PROCESSLIST where Info like '%insert ignore into%'                                                            |
| INSERT INTO book (`id`, `name`, `author`) VALUES  ('1', 'MyBook1', 'Yamada') ON DUPLICATE KEY UPDATE name=VALUES(`name`), author=VALUES(`author`) |
| INSERT INTO book (`id`, `name`, `author`) VALUES  ('2', 'MyBook2', 'Sato') ON DUPLICATE KEY UPDATE name=VALUES(`name`), author=VALUES(`author`)   |
+---------------------------------------------------------------------------------------------------------------------------------------------------+
3 rows in set (0.42 sec)

処理するデータ量が少ない場合は、このやり方で問題ないのですが、バルクで処理するデータ量が増えると一レコードづつの書き込みがボトルネックになってしまったりDB自体の負荷も高くなります。

ON DUPLICATE KEY UPDATE ステートメントを使う方法

MySQLでは、ON DUPLICATE KEY UPDATE ステートメントを使うことでBulk Upsertができます。

INSERT INTO book (`id`, `name`, `author`)
VALUES  ('1', 'MyBook1', 'Yamada'),
        ('2', 'MyBook2', 'Sato')
    ON DUPLICATE KEY UPDATE name=VALUES(`name`), author=VALUES(`author`);

13.2.6.2 INSERT ... ON DUPLICATE KEY UPDATE ステートメント

ON DUPLICATE KEY UPDATE を指定したとき、UNIQUE インデックスまたは PRIMARY KEY に重複した値を発生させる行が挿入された場合は、MySQL によって古い行の UPDATE が実行されます

この方法であれば、一回のSQLの実行で複数レコードを更新してくれるので、DB書き込みのボトルネックも解決できますし、 DBへの書き込みの負荷も軽減されます。

ということで、まずはON DUPLICATE KEY UPDATE のDMLを生成してくれるBulk Upsertを自前で用意します。


  private val insertIntoSQL =
    "INSERT INTO book (id, name, author) VALUES (?, ?, ?)"

  private val addParam = ", (?, ?, ?)"

  private val ON_DUPLICATE_KEY_UPDATE = " ON DUPLICATE KEY UPDATE name=VALUES(name), author=VALUES(author);"

  def generateUpsertSql(rows: Seq[Book]): String = {
    val sqlStatement = new StringBuffer(insertIntoSQL)
    (0 until (rows.length - 1)) foreach (_ => sqlStatement.append(addParam))
    sqlStatement.append(ON_DUPLICATE_KEY_UPDATE)
    sqlStatement.toString
  }

JDBCでSQLにパラメーターを渡すため構文です。
コレクションの数によって「?」の数が変わるため「?」を動的に作ります。

以下のようにSeqの数に応じて動的に文字列のDMLを生成してくれます。

class GenerateUpsertSQLSpec extends AnyFunSpec with Matchers {

  describe("generateUpsertSqlでBookのサイズから動的にUpsertのDMLを生成できる。") {

    it("[1レコードの場合]BookからUpsertのDMLを生成できる。") {
      val rows: Seq[Book] = Seq(
        Book(
          id = "1",
          name = "mybook1",
          author = "Yamada"
        )
      )
      val result = generateUpsertSql(rows)
      val expect =
        "INSERT INTO book (id,name, author) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE name=VALUES(name), author=VALUES(author);"
      result shouldBe expect
    }
    it("[3レコードの場合]BookからUpsertのDDLを生成できる。") {
      val now = LocalDateTime.now()

      val rows: Seq[Book] = Seq(
        BookRow(
          id = "1",
          name = "mybook1",
          author = "Yamada"
        ),
        BookRow(
          id = "2",
          name = "mybook2",
          author = "Yoneda"
        ),
        BookRow(
          id = "3",
          name = "mybook3",
          author = "Sato"
        )
      )
      val result = generateUpsertSql(rows)
      val expect =
        "INSERT INTO book (id,name, author) VALUES (?, ?, ?), (?, ?, ?), (?, ?, ?) ON DUPLICATE KEY UPDATE name=VALUES(name), author=VALUES(author);"
      result shouldBe expect
    }
  }
}

[info] GenerateUpsertSQLSpec:
[info] generateUpsertSqlでBookのサイズから動的にUpsertのDMLを生成できる。
[info] - [1レコードの場合]BookからUpsertのDMLを生成できる。
[info] - [3レコードの場合]BookからUpsertのDDLを生成できる。
[info] Run completed in 559 milliseconds.
[info] Total number of tests run: 2
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 2, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 1 s, completed 2022/04/03 0:33:55

実行するSQLができました。

Slickで生のSQLを実行するためにSimpleDBIOがあります。

SimpleDBIOはSlickで利用出来ない機能を使うためにJDBCのレベルのコードを実行するために利用できます。

JDBC Interoperability

SimpleDBIOを使うことで以下のように実装できます。

  def save(books: Seq[Book]): Future[Unit] = {
    val sqlStatement = generateUpsertSql(books)
    val query: SimpleDBIO[Int] = SimpleDBIO { session =>
      val statement = session.connection.prepareStatement(sqlStatement)
      books.zipWithIndex.foreach {
        case (row, i) =>
          val rowSize = 3
          val offset  = i * rowSize
          statement.setInt(1 + offset, row.id)
          statement.setString(2 + offset, row.name)
          statement.setString(3 + offset, row.author)
      }
      statement.executeUpdate()
    }
    database
      .run(query)
      .map(_ => ())
  }

generateUpsertSqlでbooksのコレクションの数に応じたパラメーターの文字列を生成し、Bookのid, name,authorをそれぞれprepareStatementの引数として設定します。

こうすることで、以下のSQLと同じSQLが発行されます。

INSERT INTO book (`id`, `name`, `author`)
VALUES  ('1', 'MyBook1', 'Yamada'),
        ('2', 'MyBook2', 'Sato')
    ON DUPLICATE KEY UPDATE name=VALUES(`name`), author=VALUES(`author`);

実際に使ってみた結果

今回の対応でスループットが 3,4倍に上がったので、パフォーマンスが欲しい時は良さそうです。

補足

Stack overflowで以下のような書き方も紹介されていたが結果は、1行つづの書き込みになり、冒頭の書き方と変わらなかったです。

Slick 3.0 bulk insert or update (upsert)

参考

Discussion