👑

Nimのウェブフレームワークを高速化するための断片的な知識

2022/08/22に公開

ほんとうはスクラップに書くような内容ですが、スクラップは前後の移動ができないのでこちらに書きます。
随時更新し続けます。


起動コマンド

はじめにコンパイルオプションを確認します

オプション 意味
--mm:orc GCにORCを使う
--mm:refc GCに参照カウントを使う
-d:ssl OpenSSLサポートを有効にする
-d:release リリースビルドを行う。速くなる
-d:danger ランタイムチェックを無効にし最適化を有効にする
-d:useMalloc Nimのメモリマネージャーの代わりにCのMallocを使う
--panics パニックが起きるとプロセスを終了にする
--threads マルチスレッドを有効にする
--stackTrace プログラムがクラッシュしたときに、適切なスタックトレースが与えられるためのコードが生成される
--excessiveStackTrace スタックトレースがファイルのフルパスを表示する
--lineTrace スタックトレースに行情報を含む

シングルスレッドで動かす時とマルチスレッドで動かす時ではコマンドが異なる。
https://github.com/nim-lang/Nim/issues/15661#issuecomment-713094010

シングルスレッド

nim c -r -d:release --threads:off -d:danger --mm:orc main

マルチスレッド

nim c -r -d:release --threads:on -d:danger --mm:orc -d:useMalloc --panics:on --stackTrace --excessiveStackTrace --lineTrace main

asynchttpserverのマルチスレッド化

asynchttpserver

これを

import
  std/asyncdispatch,
  std/asynchttpserver


proc asyncProc {.async.} =
  var server = newAsyncHttpServer()
  proc cb(req: Request) {.async.} =
    let headers = {"Content-type": "text/plain; charset=utf-8"}
    await req.respond(Http200, "Hello World", headers.newHttpHeaders())

  server.listen(Port(5000))
  while true:
    if server.shouldAcceptRequest():
      await server.acceptRequest(cb)
    else:
      await sleepAsync(500)

waitFor asyncProc()

こうじゃ

import
  std/asyncdispatch,
  std/asynchttpserver,
  std/osproc,  
  std/strformat


proc threadProc() {.thread.} =
  proc asyncProc() {.async.} =
    var server = newAsyncHttpServer(true, true)
    proc cb(req: Request) {.async, gcsafe.} =
      let headers = {"Content-type": "text/plain; charset=utf-8"}
      await req.respond(Http200, "Hello World", headers.newHttpHeaders())

    server.listen(Port(5000))
    while true:
      if server.shouldAcceptRequest():
        await server.acceptRequest(cb)
      else:
        await sleepAsync(500)

  while true:
    try:
      asyncCheck asyncProc()
      runForever()
    except:
      echo repr(getCurrentException())

proc serve() =
  when compileOption("threads"):
    let countThreads = countProcessors()
    echo(&"Starting {countThreads} threads")
    var thr = newSeq[Thread[void]](countThreads)
    for i in 1..countThreads:
      createThread(thr[i-1], threadProc)
    joinThreads(thr)
  else:
    threadProc()

serve()

実行時にセットされる別ファイルに定義してある環境変数をスレッド内から呼ぶ

environment.nim
import os

let AppEnv* = getEnv("APP_ENV")
main.nim
import
  os,
  osproc,
  strformat,
  ./environment

proc thread(nThread:int) {.thread.} =
  for i in 1..5:
    echo &"スレッド{nThread} {i}回目"
    echo AppEnv
    sleep(1000)

proc main() =
  let countThreads = countProcessors()
  var thr = newSeq[Thread[int]](countThreads)
  for i in 1..countThreads:
    createThread(thr[i-1], thread, i)
  joinThreads(thr)

main()

コンパイルエラーになる

APP_ENV="aaa" nim c -r --threads:on --mm:orc -d:useMalloc -d:useRealtimeGC main

> main.nim(6, 6) Error: 'thread' is not GC-safe as it accesses 'AppEnv' which is a global using GC'ed memory

環境変数にアクセスしているところを {.gcsafe.} のプラグマを付けてあげる

main.nim
import
  os,
  osproc,
  strformat,
  ./environment

proc thread(nThread:int) {.thread.} =
  for i in 1..3:
    echo &"スレッド…{nThread} {i}回目"
+   {.gcsafe.}:
+     echo AppEnv
    sleep(1000)

proc main() =
  let countThreads = countProcessors()
  var thr = newSeq[Thread[int]](countThreads)
  for i in 1..countThreads:
    createThread(thr[i-1], thread, i)
  joinThreads(thr)

main()
出力
スレッド…2 1回目
aaa
スレッド…4 1回目
aaa
スレッド…1 1回目
aaa
スレッド…3 1回目
aaa
スレッド…2 2回目
aaa
スレッド…3 2回目
aaa
スレッド…1 2回目
aaa
スレッド…4 2回目
aaa
スレッド…2 3回目
aaa
スレッド…3 3回目
aaa
スレッド…1 3回目
aaa
スレッド…4 3回目
aaa

スレッド内からRDBにアクセスする

クエリビルダには私が作ったalllographerを使います

environment.nim
import
  std/asyncdispatch,
  std/random,
  std/json,
  allographer/connection,
  allographer/schema_builder,
  allographer/query_builder

let rdb* = dbOpen(PostgreSQL, "db_name", "user", "pass", "db_host", 5432, 20, 30, true, false)

randomize()

# マイグレーション
rdb.create(
  table("hello", [
    Column.integer("id"),
    Column.integer("randomnumber")
  ])
)

# シーダー
seeder rdb, "hello":
  var data = newSeq[JsonNode]()
  for i in 1..10000:
    let randomNum = rand(10000)
    data.add(%*{"id": i, "randomnumber": randomNum})
  rdb.table("hello").insert(data).waitFor
main.nim
import
  std/asyncdispatch,
  std/json,
  std/osproc,
  allographer/query_builder,
  ./environment

proc thread(nThread:int) {.thread.} =
  (proc() {.async.} =
    for i in 1..3:
      let res = rdb.table("hello").find(i).await.get
      echo &"スレッド{nThread} {i}回目 {res}"
  )().waitFor


proc main() =
  when compileOption("threads"):
    let countThreads = countProcessors()
    var thr = newSeq[Thread[int]](countThreads)
    for i in 1..countThreads:
      createThread(thr[i-1], thread, i)
    joinThreads(thr)
  else:
    thread(1)

main()

コンパイルエラーになる

DB_POSTGRES=true nim c -r --threads:on --mm:orc -d:useMalloc -d:useRealtimeGC main

> main.nim(9, 13) Error: 'thread' is not GC-safe as it performs an indirect call here

{.gcsafe.}プラグマを付ける

main.nim
import
  std/asyncdispatch,
  std/json,
  std/locks,
  std/os,
  std/osproc,
  allographer/query_builder,
  ./environment

proc thread(nThread:int:int) {.thread.} =
  (proc() {.async.} =
    for i in 1..3:
+     {.gcsafe.}:
        let res = rdb.table("hello").find(i).await.get
        echo &"スレッド{nThread} {i}回目 {res}"
  )().waitFor


proc main() =
  when compileOption("threads"):
    let countThreads = countProcessors()
    var thr = newSeq[Thread[int]](countThreads)
    for i in 1..countThreads:
      createThread(thr[i-1], thread, i)
    joinThreads(thr)
  else:
    thread(1)

main()

セグフォが起きたり

SIGSEGV: Illegal storage access. (Attempt to read from nil?)
Segmentation fault (core dumped)

複数のクエリの実行が混ざったりしてエラーが起きる

ERROR:  syntax error at or near "FROM"
LINE 1: SELECT * FROM "hello" LIMIT 1 FROM "hello" LIMIT 1

Lockを使う

locks
threadsによるlockのサンプル

lockのサンプル
import std/locks

var
  thr: array[0..4, Thread[tuple[a,b: int]]]
  L: Lock

proc threadFunc(interval: tuple[a,b: int]) {.thread.} =
  for i in interval.a..interval.b:
    acquire(L) # lock stdout
    echo i
    release(L)

initLock(L)

for i in 0..high(thr):
  createThread(thr[i], threadFunc, (i*10, i*10+5))
joinThreads(thr)

deinitLock(L)

これにより複数のスレッドが同時にDBへのコネクションのインスタンスを扱わないようにする。

environment.nim
import
  std/asyncdispatch,
+ std/locks,
  std/random,
  std/json,
  allographer/connection,
  allographer/schema_builder,
  allographer/query_builder

+ var L*: Lock
+ initLock(L)
let rdb* = dbOpen(PostgreSQL, "db_name", "user", "pass", "db_host", 5432, 20, 30, true, false)

randomize()

rdb.create(
  table("hello", [
    Column.integer("id"),
    Column.integer("randomnumber")
  ])
)

seeder rdb, "hello":
  var data = newSeq[JsonNode]()
  for i in 1..10000:
    let randomNum = rand(10000)
    data.add(%*{"id": i, "randomnumber": randomNum})
  rdb.table("hello").insert(data).waitFor
main.nim
import
  std/asyncdispatch,
  std/json,
+ std/locks,
  std/options,
  std/osproc,
  std/strformat,
  allographer/query_builder,
  ./environment

proc thread(nThread:int) {.thread.} =
  (proc() {.async.} =
    for i in 1..3:
+     withLock(L):
        {.gcsafe.}:
          let res = rdb.table("hello").find(i).await.get
          echo &"スレッド{nThread} {i}回目 {res}"
  )().waitFor


proc main() =
  when compileOption("threads"):
    let countThreads = countProcessors()
    var thr = newSeq[Thread[int]](countThreads)
    for i in 1..countThreads:
      createThread(thr[i-1], thread, i)
    joinThreads(thr)
  else:
    thread(1)

main()
+ deinitLock(L)
出力
スレッド1 1回目 {"id":1,"randomnumber":2}
スレッド1 2回目 {"id":2,"randomnumber":36}
スレッド1 3回目 {"id":3,"randomnumber":4355}
スレッド3 1回目 {"id":1,"randomnumber":2}
スレッド4 1回目 {"id":1,"randomnumber":2}
スレッド4 2回目 {"id":2,"randomnumber":36}
スレッド4 3回目 {"id":3,"randomnumber":4355}
スレッド2 1回目 {"id":1,"randomnumber":2}
スレッド2 2回目 {"id":2,"randomnumber":36}
スレッド2 3回目 {"id":3,"randomnumber":4355}
スレッド3 2回目 {"id":2,"randomnumber":36}
スレッド3 3回目 {"id":3,"randomnumber":4355}

正常に全てのクエリが実行されるが、スレッドずつ順番に実行するため、速度が遅い。マルチスレッド化している意味がない

Lockは使わず、子スレッド内でRDBへのコネクションを作る

environment.nim
import
  std/asyncdispatch,
  std/random,
  std/json,
  allographer/connection,
  allographer/schema_builder,
  allographer/query_builder

+ proc initDb*():Rdb =
+   return dbOpen(PostgreSQL, "db_name", "user", "pass", "db_host", 5432, 20, 30, true, false)

randomize()

+ let rdb = initDb()
rdb.create(
  table("hello", [
      Column.integer("id"),
      Column.integer("randomnumber")
  ])
)

seeder rdb, "hello":
  var data = newSeq[JsonNode]()
  for i in 1..10000:
    let randomNum = rand(10000)
    data.add(%*{"id": i, "randomnumber": randomNum})
  rdb.table("hello").insert(data).waitFor
main.nim
import
  std/asyncdispatch,
  std/json,
  std/options,
  std/osproc,
  std/strformat,
  allographer/query_builder,
  ./environment

proc thread(nThread:int) {.thread.} =
+ let rdb = initDb()
  (proc() {.async.} =
    for i in 1..3:
      {.gcsafe.}:
        let res = rdb.table("hello").find(i).await.get
        echo &"スレッド{nThread} {i}回目 {res}"
  )().waitFor


proc main() =
  when compileOption("threads"):
    let countThreads = countProcessors()
    var thr = newSeq[Thread[int]](countThreads)
    for i in 1..countThreads:
      createThread(thr[i-1], thread, i)
    joinThreads(thr)
  else:
    thread(1)

main()
出力
スレッド3 1回目 {"id":1,"randomnumber":2}
スレッド3 2回目 {"id":2,"randomnumber":36}
スレッド2 1回目 {"id":1,"randomnumber":2}
スレッド3 3回目 {"id":3,"randomnumber":4355}
スレッド4 1回目 {"id":1,"randomnumber":2}
スレッド1 1回目 {"id":1,"randomnumber":2}
スレッド2 2回目 {"id":2,"randomnumber":36}
スレッド4 2回目 {"id":2,"randomnumber":36}
スレッド1 2回目 {"id":2,"randomnumber":36}
スレッド2 3回目 {"id":3,"randomnumber":4355}
スレッド4 3回目 {"id":3,"randomnumber":4355}
スレッド1 3回目 {"id":3,"randomnumber":4355}

スレッド同士が競合せずにDBアクセスできるようになったが、フルスタックWebフレームワークのデザイン上、どう子スレッドの中でコネクションを作り開発者が触るところへ出すかは難しい。

スレッド毎にPluginという概念を持ち、そこにコネクションを持たせる

environment.nim
import
  std/asyncdispatch,
  std/random,
  std/json,
  allographer/connection,
  allographer/schema_builder,
  allographer/query_builder

proc initDb*():Rdb =
  return dbOpen(PostgreSQL, "db_name", "user", "pass", "db_host", 5432, 20, 30, true, false)

+ type Plugin* = ref object
+   rdb*: Rdb

randomize()

var rdb = initDb()
rdb.create(
  table("World", [
    Column.integer("id"),
    Column.integer("randomnumber")
  ])
)

seeder rdb, "World":
  var data = newSeq[JsonNode]()
  for i in 1..10000:
    let randomNum = rand(10000)
    data.add(%*{"id": i, "randomnumber": randomNum})
  rdb.table("World").insert(data).waitFor
controller.nim
import
  std/asyncdispatch,
  std/json,
  std/options,
  std/strformat,
  allographer/query_builder,
  ./environment

+ proc controller*(param: (Plugin, int)):Future[void] {.async, gcsafe.} =
+   let (plugin, nThread) = param
    for i in 1..3:
      let res = plugin.rdb.table("hello").find(i).await.get
      echo &"スレッド{nThread} {i}回目 {res}"
main.nim
import
  std/asyncdispatch,
  std/json,
  std/osproc,
  ./environment,
  ./controller

+ proc thread(param:(Plugin, int)) {.thread.} =
    (proc() {.async.} =
      await controller.controller(param)
    )().waitFor


+ proc main(plugins:seq[Plugin]) =
    when compileOption("threads"):
      let countThreads = countProcessors()
+     var thr = newSeq[Thread[(Plugin, int)]](countThreads)
+     for i in 0..countThreads-1:
+       createThread(thr[i], thread, (plugins[i], i+1))
      joinThreads(thr)
    else:
+     thread((plugins[0], 1))

+ var plugins = newSeq[Plugin](countProcessors())
+ for i in 0..countProcessors()-1:
+   plugins[i] = Plugin(
+     rdb:initDb()
+   ) 
+ main(plugins)
出力
スレッド2 1回目 {"id":1,"randomnumber":2}
スレッド3 1回目 {"id":1,"randomnumber":2}
スレッド1 1回目 {"id":1,"randomnumber":2}
スレッド4 1回目 {"id":1,"randomnumber":2}
スレッド2 2回目 {"id":2,"randomnumber":36}
スレッド3 2回目 {"id":2,"randomnumber":36}
スレッド1 2回目 {"id":2,"randomnumber":36}
スレッド4 2回目 {"id":2,"randomnumber":36}
スレッド3 3回目 {"id":3,"randomnumber":4355}
スレッド1 3回目 {"id":3,"randomnumber":4355}
スレッド4 3回目 {"id":3,"randomnumber":4355}
スレッド2 3回目 {"id":3,"randomnumber":4355}

スレッドの数だけPluginの配列を作り、各スレッドにはPluginを渡す。
こうすることでスレッド同士が同じPluginを触るのを防ぐ。
メモリ管理はスレッド単位で行われる。
Pluginを通してコントローラーの中でrdbへのインスタンスを使う。

フレームワークを通してコントローラーにPluginを渡す

上のサンプルではmain.nimPluginの定義を知っている。

ディレクトリ構成
.
├── controller.nim
├── fw
│   └── lib.nim
├── main
├── main.nim
└── setting.nim

この時、ファイルの依存関係は以下のようになる

main ──> controller ──> fw/lib
      setting

Pluginがどんなフィールドを持つかは開発者側のsettingなどで定義したい。しかしfw/libはPluginの定義については知りえない。
もしPluginの定義をフレームワークの中ですると、開発者が任意のフィールドをそこに乗せることはできないことになる。

templateを使う

Nimでマクロの一種であるtemplateを使えば、Pluginを呼び出しているasynchttpserverの処理ロジックを開発者側のmainに持ってくることができる。

setting.nim
import
  std/asyncdispatch,
  allographer/connection,
  allographer/query_builder


proc initDb*():Rdb =
  return dbOpen(PostgreSQL, "database", "user", "pass", "db", 5432, 20, 30, true, false)

# ============================================================
type Plugin* = ref object
  rdb*:Rdb

proc new*(_:type Plugin):Plugin =
  return Plugin(
    rdb: initDb()
  )

# ============================================================
type Controller* = ref object
  action*: proc(plugin:Plugin):Future[string] {.async, gcsafe.}

proc new*(_:type Controller, action: proc(plugin:Plugin):Future[string] {.async, gcsafe.}):Controller =
  return Controller(action:action)

Pluginの定義、そしてコントローラーがどんな引数を受け取るかを開発者側で定義する。

fw/lib.nim
import
  std/httpcore


type Route* = ref object
  path*:string
  httpMethod*:HttpMethod

template serveWithPlugin*(routes, createPlugin:untyped):untyped =
  proc threadProc(params:(seq[(Route, Controller)], Plugin)) {.thread.} =
    proc asyncProc(params:(seq[(Route, Controller)], Plugin)) {.async.} =
      var server = newAsyncHttpServer(true, true)
      proc cb(req: Request) {.async, gcsafe.} =
        let headers = {"Content-type": "text/plain; charset=utf-8"}
        let (routes, plugin) = params
        for route in routes:
          if req.url.path == route[0].path and req.reqMethod == route[0].httpMethod:
            let resp = route[1].action(plugin).await
            await req.respond(Http200, resp, headers.newHttpHeaders())
            break
        await req.respond(Http404, "", headers.newHttpHeaders())

      server.listen(Port(5000))
      while true:
        if server.shouldAcceptRequest():
          await server.acceptRequest(cb)
        else:
          await sleepAsync(500)

    while true:
      try:
        asyncCheck asyncProc(params)
        runForever()
      except:
        echo repr(getCurrentException())

  proc serve(routes:seq[(Route, Controller)], `createPlugin`:proc():Plugin) =
    let countThreads = countProcessors()
    var params:seq[(seq[(Route, Controller)], Plugin)]
    for i in 1..countThreads:
      params.add((routes, `createPlugin`()))
    when compileOption("threads"):
      echo("Starting ", countThreads, " threads")
      var thr = newSeq[Thread[(seq[(Route, Controller)], Plugin)]](countThreads)
      for i in 0..countThreads-1:
        createThread(thr[i], threadProc, params[i])
      joinThreads(thr)
    else:
      threadProc(params[0])

  serve(`routes`, `createPlugin`)
controller.nim
import 
  std/asyncdispatch,
  std/options,
  std/json,
  allographer/query_builder,
  ./setting

proc index*(plugin:Plugin):Future[string] {.async, gcsafe.} =
  for i in 1..10:
    echo plugin.rdb.table("num_table").find(i).await.get()
  return "index"

proc show*(plugin:Plugin):Future[string] {.async, gcsafe.} =
  return "show"
main.nim
import
  std/asyncdispatch,
  std/asynchttpserver,
  std/osproc,
  ./fw/lib,
  ./setting,
  ./controller


let routes = @[
  (Route(path:"/", httpMethod:HttpGet), Controller.new(index)),
  (Route(path:"/show", httpMethod:HttpGet), Controller.new(show))
]

proc createPlugin*():Plugin =
  return Plugin.new()

serveWithPlugin(routes, createPlugin)

動作する

Starting 4 threads
DEBUG SELECT * FROM num_table WHERE id = ? LIMIT 1 ["1"]
{"id":1,"randomnumber":7985}
DEBUG SELECT * FROM num_table WHERE id = ? LIMIT 1 ["2"]
{"id":2,"randomnumber":6529}
DEBUG SELECT * FROM num_table WHERE id = ? LIMIT 1 ["3"]
{"id":3,"randomnumber":6613}
DEBUG SELECT * FROM num_table WHERE id = ? LIMIT 1 ["4"]
{"id":4,"randomnumber":1471}
DEBUG SELECT * FROM num_table WHERE id = ? LIMIT 1 ["5"]
{"id":5,"randomnumber":8057}
DEBUG SELECT * FROM num_table WHERE id = ? LIMIT 1 ["6"]
{"id":6,"randomnumber":944}
DEBUG SELECT * FROM num_table WHERE id = ? LIMIT 1 ["7"]
{"id":7,"randomnumber":1194}
DEBUG SELECT * FROM num_table WHERE id = ? LIMIT 1 ["8"]
{"id":8,"randomnumber":5665}
DEBUG SELECT * FROM num_table WHERE id = ? LIMIT 1 ["9"]
{"id":9,"randomnumber":1812}
DEBUG SELECT * FROM num_table WHERE id = ? LIMIT 1 ["10"]
{"id":10,"randomnumber":6917}

Pluginが持つフィールドを標準ライブラリのdb_postgresDbConnに差し替えてみる

setting.nim
import
  std/asyncdispatch,
+ std/db_postgres


+proc initDb*():DbConn =
+  open("db", "user", "pass", "database")


# ============================================================
type Plugin* = ref object
+ rdb*:DbConn

proc new*(_:type Plugin):Plugin =
  return Plugin(
    rdb: initDb()
  )

# ============================================================
type Controller* = ref object
  action*: proc(plugin:Plugin):Future[string] {.async, gcsafe.}

proc new*(_:type Controller, action: proc(plugin:Plugin):Future[string] {.async, gcsafe.}):Controller =
  return Controller(action:action)
controller.nim
import 
  std/asyncdispatch,
  std/db_postgres,
  std/options,
  std/json,
  allographer/query_builder,
  ./setting

proc index*(plugin:Plugin):Future[string] {.async, gcsafe.} =
  for i in 1..10:
+   echo plugin.rdb.getRow(sql"SELECT * FROM num_table WHERE id = ?", i)
  return "index"

proc show*(plugin:Plugin):Future[string] {.async, gcsafe.} =
  return "show"

動作する

Starting 4 threads
@["1", "7985"]
@["2", "6529"]
@["3", "6613"]
@["4", "1471"]
@["5", "8057"]
@["6", "944"]
@["7", "1194"]
@["8", "5665"]
@["9", "1812"]
@["10", "6917"]

ベンチマーク測定

controller.nim
import 
  std/asyncdispatch,
  std/options,
  std/json,
  std/strutils,
  std/sequtils,
  allographer/query_builder,
  ./setting


proc index*(plugin:Plugin):Future[string] {.async, gcsafe.} =
  let nThreads = 500
  var futures = newSeq[Future[Option[JsonNode]]](nThreads)
  for i in 1..nThreads:
    futures[i-1] = plugin.rdb.table("num_table").find(i)
  let res = all(futures).await
  let response = res.map(
    proc(x:Option[JsonNode]):JsonNode =
      x.get()
  )
  return $(%response)

proc show*(plugin:Plugin):Future[string] {.async, gcsafe.} =
  return "show"

コンパイル

DB_POSTGRES=true nim c -r --threads:{on|off} --mm:{refc|arc|orc} -d:useMalloc -d:useRealtimeGC -d:release main

測定

wrk -c4 -t4 -d10s http://localhost:5000 # DB access
wrk -c4 -t4 -d10s http://localhost:5000/show # return string
Memory monagement Threads DB access return string
refc on 33 233624
refc off 151 234453
arc on 88 219813
arc off 166 228683
orc on 81 clash
orc off 137 182754

なぜかシングルスレッドの方がパフォーマンスが良い(´・ω・`)

Nimの非同期とマルチスレッドに関する議論

どうやらIOバウンドな処理はマルチスレッドで動かすよりも、シングルスレッドで非同期を使ったほうがいいようです。

--gc:orc ; Multi-threaded async slower than single-threaded
Criticism of Parallel Nim
Channel / Actors based parallelism? Are there such Web Servers?
Threading in Nim

コネクションプール VS プリペアドステートメント

Nim製クエリビルダのallographerはコネクションプールの仕組みを持っている。短時間に大量のDBアクセスをする時に、あるクエリがaのコネクションを使っていると、他のクエリではbのコネクションを使ってDBへアクセスする。aのコネクションの使用中の状態が解除されたら別のクエリがまたaのコネクションを使う。PostgreSQLではデフォルトで100コネクションを張ることができるので、95コネクションくらいなら同時に使って大量のDBアクセスを捌くことができる。
一方でコネクションプールを使ったクエリ実行は、毎回クエリをパースし構文を解釈し値をバインドしている。プリペアドステートメントを使えばクエリのパースは1回で済み、値のバインドだけを行えばいいので、同じ構文のクエリを何回も実行する場合はプリペアドステートメントの方が速いらしい。しかしプリペアドステートメントはコネクション単位で作られるため、1つのプリペアドステートメントでは1つのコネクションを使い回さなければいけない。
今回は毎回クエリのパースをしても複数のコネクションを使ったほうが速いのか、プリペアドステートメントを使ったほうが速いのか実験してみる。

DB_POSTGRES=true nim c -r --threads:off --mm:arc -d:useMalloc -d:useRealtimeGC -d:release main
main.nim
import 
  std/asyncdispatch,
  std/json,
  std/options,
  std/random,
  std/times,
  allographer/connection,
  allographer/schema_builder,
  allographer/query_builder,
  allographer/async/async_db

randomize()

let rdb = dbOpen(PostgreSQL, "database", "user", "pass", "db", 5432, 96, 30, false, false)

proc init() =
  rdb.create(
    table("num_threads", [
        Column.integer("id"),
        Column.integer("randomnumber")
    ])
  )

  seeder rdb, "num_threads":
    var data = newSeq[JsonNode]()
    for i in 1..10000:
      let randomNum = rand(10000)
      data.add(%*{"id": i, "randomnumber": randomNum})
    rdb.table("num_threads").insert(data).waitFor


proc main(){.async.} =
  init()
  let n = 500
  # プリペアドステートメント
  block:
    let start = cpuTime()
    var resp = newJArray()
    let conn = rdb.conn
    let prepare = conn.prepare(PostgreSQL, "select * from num_table where id = $1 LIMIT 1", "12345").await
    var futures = newSeq[Future[(seq[Row], DbRows)]](n)
    for i in 1..n:
      futures[i-1] = prepare.query(PostgreSQL, @[$i])
    
    let resArr = all(futures).await
    for res in resArr:
      let rows = res[0]
      for row in rows:
        let dbInfo = res[1]
        resp.add(%*{dbInfo[0][0].name: row[0], dbInfo[0][1].name: row[1]})
    # echo resp
    echo cpuTime() - start

  # コネクションプール
  block:
    let start = cpuTime()
    var resp = newJArray()
    var futures = newSeq[Future[Option[JsonNode]]](n)
    for i in 1..n:
      futures[i-1] = rdb.table("num_table").find(i)
    let resArr = all(futures).await
    for rowOpt in resArr:
      resp.add(rowOpt.get)
    # echo resp
    echo cpuTime() - start


main().waitFor

echo respの出力結果

[{"id":1,"randomnumber":7985},{"id":2,"randomnumber":6529},{"id":3,"randomnumber":6613}...{"id":500,"randomnumber":9237}]

結果

プリペアドステートメント コネクションプール
0.5787295100000001秒 0.05459516299999989秒

なんと10倍以上の差だった。プリペアドステートメントよりもコネクションプールを使った方が圧倒的に速かった。

asyncdispatch VS chronos

Nimは非同期の仕組みを提供する標準ライブラリのasyncdispatchと、Status社が開発した3rdパーティのchronosがある。
これのパフォーマンスを比較してみる。

main.nim
import 
  std/asyncdispatch,
  std/json,
  std/options,
  std/random,
  std/times,
  allographer/connection,
  allographer/schema_builder,
  allographer/query_builder,
  allographer/async/async_db

randomize()

let rdb = dbOpen(PostgreSQL, "database", "user", "pass", "db", 5432, 96, 30, false, false)

proc init() =
  rdb.create(
    table("num_threads", [
        Column.integer("id"),
        Column.integer("randomnumber")
    ])
  )

  seeder rdb, "num_threads":
    var data = newSeq[JsonNode]()
    for i in 1..10000:
      let randomNum = rand(10000)
      data.add(%*{"id": i, "randomnumber": randomNum})
    rdb.table("num_threads").insert(data).waitFor


proc main(){.async.} =
  init()
  let n = 10000
  let start = cpuTime()
  var resp = newJArray()
  var futures = newSeq[Future[Option[JsonNode]]](n)
  for i in 1..n:
    futures[i-1] = rdb.table("num_table").find(i)
  let resArr = all(futures).await
  for rowOpt in resArr:
    resp.add(rowOpt.get)
  # echo resp
  echo cpuTime() - start


main().waitFor
chronos_main.nim
import 
  std/json,
  std/options,
  std/random,
  std/times,
  chronos,
  # chronos_dbはallographerのasyncdispatchを全てchronosに置換したもの
  ./chronos_db/connection,
  ./chronos_db/schema_builder,
  ./chronos_db/query_builder,
  ./chronos_db/async/async_db

randomize()

let rdb = dbOpen(PostgreSQL, "database", "user", "pass", "db", 5432, 96, 30, false, false)

proc init() =
  rdb.create(
    table("num_threads", [
        Column.integer("id"),
        Column.integer("randomnumber")
    ])
  )

  seeder rdb, "num_threads":
    var data = newSeq[JsonNode]()
    for i in 1..10000:
      let randomNum = rand(10000)
      data.add(%*{"id": i, "randomnumber": randomNum})
    rdb.table("num_threads").insert(data).waitFor


proc main(){.async.} =
  init()
  let n = 10000
  let start = cpuTime()
  var resp = newJArray()
  var futures = newSeq[Future[Option[JsonNode]]](n)
  for i in 1..n:
    futures[i-1] = rdb.table("num_table").find(i)
  let resArr = all(futures).await
  for rowOpt in resArr:
    resp.add(rowOpt.get)
  # echo resp
  echo cpuTime() - start


main().waitFor
DB_POSTGRES=true nim c -r --mm:orc -d:useMalloc -d:useRealtimeGC -d:release main
DB_POSTGRES=true nim c -r --mm:arc -d:useMalloc -d:useRealtimeGC -d:release main
DB_POSTGRES=true nim c -r --mm:orc -d:useMalloc -d:useRealtimeGC -d:release -d:asyncBackend=chronos chronos_main
DB_POSTGRES=true nim c -r --mm:arc -d:useMalloc -d:useRealtimeGC -d:release -d:asyncBackend=chronos chronos_main
async lib memory management time
asyncdispatch orc 2.272063811
asyncdispatch arc 1.904029235
chronos orc 3.682025834
chronos arc 2.167253522

標準ライブラリの方が速かった

asynchttpserver VS httpbeast

標準ライブラリのasynchttpserver、Nim forumで使われているマイクロWebフレームワークのJesterがベースにしているhttpbeastでパフォーマンスを比較してみる。

HTMLを返すシナリオ

ライブラリ GC スレッド req/10s
httpbeast ORC on double free or corruption (fasttop)
httpbeast ORC off 289313
httpbeast ARC on 280547
httpbeast ARC off 310421
asynchttpserver ORC on double free or corruption (fasttop)
asynchttpserver ORC off 179902
asynchttpserver ARC on 229477
asynchttpserver ARC off 219878

リクエスト毎にDBに500回アクセスするシナリオ

ライブラリ GC スレッド req/10s
httpbeast ORC on 73
httpbeast ORC off 96
httpbeast ARC on 75
httpbeast ARC off 95
asynchttpserver ORC on double free or corruption (!prev)
asynchttpserver ORC off 124
asynchttpserver ARC on 59
asynchttpserver ARC off 139

非同期を使った時にGCがARCだと、開放されない値があるのか何回も動かしているうちにメモリリークが起きたので、現実的にはORCか現行のrefcしか選択肢はないことになる。
しかしORCもマルチスレッドで動かした時にはセグフォが頻繁に起きる。
httpbeast・ORC・シングルスレッドの組み合わせが安定していてパフォーマンスが良さそうである。

GitHubで編集を提案

Discussion