Nimのウェブフレームワークを高速化するための断片的な知識
ほんとうはスクラップに書くような内容ですが、スクラップは前後の移動ができないのでこちらに書きます。
随時更新し続けます。
起動コマンド
はじめにコンパイルオプションを確認します
オプション | 意味 |
---|---|
--mm:orc | GCにORCを使う |
--mm:refc | GCに参照カウントを使う |
-d:ssl | OpenSSLサポートを有効にする |
-d:release | リリースビルドを行う。速くなる |
-d:danger | ランタイムチェックを無効にし最適化を有効にする |
-d:useMalloc | Nimのメモリマネージャーの代わりにCのMallocを使う |
--panics | パニックが起きるとプロセスを終了にする |
--threads | マルチスレッドを有効にする |
--stackTrace | プログラムがクラッシュしたときに、適切なスタックトレースが与えられるためのコードが生成される |
--excessiveStackTrace | スタックトレースがファイルのフルパスを表示する |
--lineTrace | スタックトレースに行情報を含む |
シングルスレッドで動かす時とマルチスレッドで動かす時ではコマンドが異なる。
シングルスレッド
nim c -r -d:release --threads:off -d:danger --mm:orc main
マルチスレッド
nim c -r -d:release --threads:on -d:danger --mm:orc -d:useMalloc --panics:on --stackTrace --excessiveStackTrace --lineTrace main
asynchttpserverのマルチスレッド化
これを
import
std/asyncdispatch,
std/asynchttpserver
proc asyncProc {.async.} =
var server = newAsyncHttpServer()
proc cb(req: Request) {.async.} =
let headers = {"Content-type": "text/plain; charset=utf-8"}
await req.respond(Http200, "Hello World", headers.newHttpHeaders())
server.listen(Port(5000))
while true:
if server.shouldAcceptRequest():
await server.acceptRequest(cb)
else:
await sleepAsync(500)
waitFor asyncProc()
こうじゃ
import
std/asyncdispatch,
std/asynchttpserver,
std/osproc,
std/strformat
proc threadProc() {.thread.} =
proc asyncProc() {.async.} =
var server = newAsyncHttpServer(true, true)
proc cb(req: Request) {.async, gcsafe.} =
let headers = {"Content-type": "text/plain; charset=utf-8"}
await req.respond(Http200, "Hello World", headers.newHttpHeaders())
server.listen(Port(5000))
while true:
if server.shouldAcceptRequest():
await server.acceptRequest(cb)
else:
await sleepAsync(500)
while true:
try:
asyncCheck asyncProc()
runForever()
except:
echo repr(getCurrentException())
proc serve() =
when compileOption("threads"):
let countThreads = countProcessors()
echo(&"Starting {countThreads} threads")
var thr = newSeq[Thread[void]](countThreads)
for i in 1..countThreads:
createThread(thr[i-1], threadProc)
joinThreads(thr)
else:
threadProc()
serve()
実行時にセットされる別ファイルに定義してある環境変数をスレッド内から呼ぶ
import os
let AppEnv* = getEnv("APP_ENV")
import
os,
osproc,
strformat,
./environment
proc thread(nThread:int) {.thread.} =
for i in 1..5:
echo &"スレッド{nThread} {i}回目"
echo AppEnv
sleep(1000)
proc main() =
let countThreads = countProcessors()
var thr = newSeq[Thread[int]](countThreads)
for i in 1..countThreads:
createThread(thr[i-1], thread, i)
joinThreads(thr)
main()
コンパイルエラーになる
APP_ENV="aaa" nim c -r --threads:on --mm:orc -d:useMalloc -d:useRealtimeGC main
> main.nim(6, 6) Error: 'thread' is not GC-safe as it accesses 'AppEnv' which is a global using GC'ed memory
環境変数にアクセスしているところを {.gcsafe.}
のプラグマを付けてあげる
import
os,
osproc,
strformat,
./environment
proc thread(nThread:int) {.thread.} =
for i in 1..3:
echo &"スレッド…{nThread} {i}回目"
+ {.gcsafe.}:
+ echo AppEnv
sleep(1000)
proc main() =
let countThreads = countProcessors()
var thr = newSeq[Thread[int]](countThreads)
for i in 1..countThreads:
createThread(thr[i-1], thread, i)
joinThreads(thr)
main()
スレッド…2 1回目
aaa
スレッド…4 1回目
aaa
スレッド…1 1回目
aaa
スレッド…3 1回目
aaa
スレッド…2 2回目
aaa
スレッド…3 2回目
aaa
スレッド…1 2回目
aaa
スレッド…4 2回目
aaa
スレッド…2 3回目
aaa
スレッド…3 3回目
aaa
スレッド…1 3回目
aaa
スレッド…4 3回目
aaa
スレッド内からRDBにアクセスする
クエリビルダには私が作ったalllographerを使います
import
std/asyncdispatch,
std/random,
std/json,
allographer/connection,
allographer/schema_builder,
allographer/query_builder
let rdb* = dbOpen(PostgreSQL, "db_name", "user", "pass", "db_host", 5432, 20, 30, true, false)
randomize()
# マイグレーション
rdb.create(
table("hello", [
Column.integer("id"),
Column.integer("randomnumber")
])
)
# シーダー
seeder rdb, "hello":
var data = newSeq[JsonNode]()
for i in 1..10000:
let randomNum = rand(10000)
data.add(%*{"id": i, "randomnumber": randomNum})
rdb.table("hello").insert(data).waitFor
import
std/asyncdispatch,
std/json,
std/osproc,
allographer/query_builder,
./environment
proc thread(nThread:int) {.thread.} =
(proc() {.async.} =
for i in 1..3:
let res = rdb.table("hello").find(i).await.get
echo &"スレッド{nThread} {i}回目 {res}"
)().waitFor
proc main() =
when compileOption("threads"):
let countThreads = countProcessors()
var thr = newSeq[Thread[int]](countThreads)
for i in 1..countThreads:
createThread(thr[i-1], thread, i)
joinThreads(thr)
else:
thread(1)
main()
コンパイルエラーになる
DB_POSTGRES=true nim c -r --threads:on --mm:orc -d:useMalloc -d:useRealtimeGC main
> main.nim(9, 13) Error: 'thread' is not GC-safe as it performs an indirect call here
{.gcsafe.}
プラグマを付ける
import
std/asyncdispatch,
std/json,
std/locks,
std/os,
std/osproc,
allographer/query_builder,
./environment
proc thread(nThread:int:int) {.thread.} =
(proc() {.async.} =
for i in 1..3:
+ {.gcsafe.}:
let res = rdb.table("hello").find(i).await.get
echo &"スレッド{nThread} {i}回目 {res}"
)().waitFor
proc main() =
when compileOption("threads"):
let countThreads = countProcessors()
var thr = newSeq[Thread[int]](countThreads)
for i in 1..countThreads:
createThread(thr[i-1], thread, i)
joinThreads(thr)
else:
thread(1)
main()
セグフォが起きたり
SIGSEGV: Illegal storage access. (Attempt to read from nil?)
Segmentation fault (core dumped)
複数のクエリの実行が混ざったりしてエラーが起きる
ERROR: syntax error at or near "FROM"
LINE 1: SELECT * FROM "hello" LIMIT 1 FROM "hello" LIMIT 1
Lockを使う
import std/locks
var
thr: array[0..4, Thread[tuple[a,b: int]]]
L: Lock
proc threadFunc(interval: tuple[a,b: int]) {.thread.} =
for i in interval.a..interval.b:
acquire(L) # lock stdout
echo i
release(L)
initLock(L)
for i in 0..high(thr):
createThread(thr[i], threadFunc, (i*10, i*10+5))
joinThreads(thr)
deinitLock(L)
これにより複数のスレッドが同時にDBへのコネクションのインスタンスを扱わないようにする。
import
std/asyncdispatch,
+ std/locks,
std/random,
std/json,
allographer/connection,
allographer/schema_builder,
allographer/query_builder
+ var L*: Lock
+ initLock(L)
let rdb* = dbOpen(PostgreSQL, "db_name", "user", "pass", "db_host", 5432, 20, 30, true, false)
randomize()
rdb.create(
table("hello", [
Column.integer("id"),
Column.integer("randomnumber")
])
)
seeder rdb, "hello":
var data = newSeq[JsonNode]()
for i in 1..10000:
let randomNum = rand(10000)
data.add(%*{"id": i, "randomnumber": randomNum})
rdb.table("hello").insert(data).waitFor
import
std/asyncdispatch,
std/json,
+ std/locks,
std/options,
std/osproc,
std/strformat,
allographer/query_builder,
./environment
proc thread(nThread:int) {.thread.} =
(proc() {.async.} =
for i in 1..3:
+ withLock(L):
{.gcsafe.}:
let res = rdb.table("hello").find(i).await.get
echo &"スレッド{nThread} {i}回目 {res}"
)().waitFor
proc main() =
when compileOption("threads"):
let countThreads = countProcessors()
var thr = newSeq[Thread[int]](countThreads)
for i in 1..countThreads:
createThread(thr[i-1], thread, i)
joinThreads(thr)
else:
thread(1)
main()
+ deinitLock(L)
スレッド1 1回目 {"id":1,"randomnumber":2}
スレッド1 2回目 {"id":2,"randomnumber":36}
スレッド1 3回目 {"id":3,"randomnumber":4355}
スレッド3 1回目 {"id":1,"randomnumber":2}
スレッド4 1回目 {"id":1,"randomnumber":2}
スレッド4 2回目 {"id":2,"randomnumber":36}
スレッド4 3回目 {"id":3,"randomnumber":4355}
スレッド2 1回目 {"id":1,"randomnumber":2}
スレッド2 2回目 {"id":2,"randomnumber":36}
スレッド2 3回目 {"id":3,"randomnumber":4355}
スレッド3 2回目 {"id":2,"randomnumber":36}
スレッド3 3回目 {"id":3,"randomnumber":4355}
正常に全てのクエリが実行されるが、スレッドずつ順番に実行するため、速度が遅い。マルチスレッド化している意味がない
Lockは使わず、子スレッド内でRDBへのコネクションを作る
import
std/asyncdispatch,
std/random,
std/json,
allographer/connection,
allographer/schema_builder,
allographer/query_builder
+ proc initDb*():Rdb =
+ return dbOpen(PostgreSQL, "db_name", "user", "pass", "db_host", 5432, 20, 30, true, false)
randomize()
+ let rdb = initDb()
rdb.create(
table("hello", [
Column.integer("id"),
Column.integer("randomnumber")
])
)
seeder rdb, "hello":
var data = newSeq[JsonNode]()
for i in 1..10000:
let randomNum = rand(10000)
data.add(%*{"id": i, "randomnumber": randomNum})
rdb.table("hello").insert(data).waitFor
import
std/asyncdispatch,
std/json,
std/options,
std/osproc,
std/strformat,
allographer/query_builder,
./environment
proc thread(nThread:int) {.thread.} =
+ let rdb = initDb()
(proc() {.async.} =
for i in 1..3:
{.gcsafe.}:
let res = rdb.table("hello").find(i).await.get
echo &"スレッド{nThread} {i}回目 {res}"
)().waitFor
proc main() =
when compileOption("threads"):
let countThreads = countProcessors()
var thr = newSeq[Thread[int]](countThreads)
for i in 1..countThreads:
createThread(thr[i-1], thread, i)
joinThreads(thr)
else:
thread(1)
main()
スレッド3 1回目 {"id":1,"randomnumber":2}
スレッド3 2回目 {"id":2,"randomnumber":36}
スレッド2 1回目 {"id":1,"randomnumber":2}
スレッド3 3回目 {"id":3,"randomnumber":4355}
スレッド4 1回目 {"id":1,"randomnumber":2}
スレッド1 1回目 {"id":1,"randomnumber":2}
スレッド2 2回目 {"id":2,"randomnumber":36}
スレッド4 2回目 {"id":2,"randomnumber":36}
スレッド1 2回目 {"id":2,"randomnumber":36}
スレッド2 3回目 {"id":3,"randomnumber":4355}
スレッド4 3回目 {"id":3,"randomnumber":4355}
スレッド1 3回目 {"id":3,"randomnumber":4355}
スレッド同士が競合せずにDBアクセスできるようになったが、フルスタックWebフレームワークのデザイン上、どう子スレッドの中でコネクションを作り開発者が触るところへ出すかは難しい。
スレッド毎にPluginという概念を持ち、そこにコネクションを持たせる
import
std/asyncdispatch,
std/random,
std/json,
allographer/connection,
allographer/schema_builder,
allographer/query_builder
proc initDb*():Rdb =
return dbOpen(PostgreSQL, "db_name", "user", "pass", "db_host", 5432, 20, 30, true, false)
+ type Plugin* = ref object
+ rdb*: Rdb
randomize()
var rdb = initDb()
rdb.create(
table("World", [
Column.integer("id"),
Column.integer("randomnumber")
])
)
seeder rdb, "World":
var data = newSeq[JsonNode]()
for i in 1..10000:
let randomNum = rand(10000)
data.add(%*{"id": i, "randomnumber": randomNum})
rdb.table("World").insert(data).waitFor
import
std/asyncdispatch,
std/json,
std/options,
std/strformat,
allographer/query_builder,
./environment
+ proc controller*(param: (Plugin, int)):Future[void] {.async, gcsafe.} =
+ let (plugin, nThread) = param
for i in 1..3:
let res = plugin.rdb.table("hello").find(i).await.get
echo &"スレッド{nThread} {i}回目 {res}"
import
std/asyncdispatch,
std/json,
std/osproc,
./environment,
./controller
+ proc thread(param:(Plugin, int)) {.thread.} =
(proc() {.async.} =
await controller.controller(param)
)().waitFor
+ proc main(plugins:seq[Plugin]) =
when compileOption("threads"):
let countThreads = countProcessors()
+ var thr = newSeq[Thread[(Plugin, int)]](countThreads)
+ for i in 0..countThreads-1:
+ createThread(thr[i], thread, (plugins[i], i+1))
joinThreads(thr)
else:
+ thread((plugins[0], 1))
+ var plugins = newSeq[Plugin](countProcessors())
+ for i in 0..countProcessors()-1:
+ plugins[i] = Plugin(
+ rdb:initDb()
+ )
+ main(plugins)
スレッド2 1回目 {"id":1,"randomnumber":2}
スレッド3 1回目 {"id":1,"randomnumber":2}
スレッド1 1回目 {"id":1,"randomnumber":2}
スレッド4 1回目 {"id":1,"randomnumber":2}
スレッド2 2回目 {"id":2,"randomnumber":36}
スレッド3 2回目 {"id":2,"randomnumber":36}
スレッド1 2回目 {"id":2,"randomnumber":36}
スレッド4 2回目 {"id":2,"randomnumber":36}
スレッド3 3回目 {"id":3,"randomnumber":4355}
スレッド1 3回目 {"id":3,"randomnumber":4355}
スレッド4 3回目 {"id":3,"randomnumber":4355}
スレッド2 3回目 {"id":3,"randomnumber":4355}
スレッドの数だけPluginの配列を作り、各スレッドにはPluginを渡す。
こうすることでスレッド同士が同じPluginを触るのを防ぐ。
メモリ管理はスレッド単位で行われる。
Pluginを通してコントローラーの中でrdbへのインスタンスを使う。
フレームワークを通してコントローラーにPluginを渡す
上のサンプルではmain.nim
がPlugin
の定義を知っている。
.
├── controller.nim
├── fw
│ └── lib.nim
├── main
├── main.nim
└── setting.nim
この時、ファイルの依存関係は以下のようになる
main ──> controller ──> fw/lib
setting
Pluginがどんなフィールドを持つかは開発者側のsettingなどで定義したい。しかしfw/libはPluginの定義については知りえない。
もしPluginの定義をフレームワークの中ですると、開発者が任意のフィールドをそこに乗せることはできないことになる。
templateを使う
Nimでマクロの一種であるtemplateを使えば、Pluginを呼び出しているasynchttpserverの処理ロジックを開発者側のmainに持ってくることができる。
import
std/asyncdispatch,
allographer/connection,
allographer/query_builder
proc initDb*():Rdb =
return dbOpen(PostgreSQL, "database", "user", "pass", "db", 5432, 20, 30, true, false)
# ============================================================
type Plugin* = ref object
rdb*:Rdb
proc new*(_:type Plugin):Plugin =
return Plugin(
rdb: initDb()
)
# ============================================================
type Controller* = ref object
action*: proc(plugin:Plugin):Future[string] {.async, gcsafe.}
proc new*(_:type Controller, action: proc(plugin:Plugin):Future[string] {.async, gcsafe.}):Controller =
return Controller(action:action)
Pluginの定義、そしてコントローラーがどんな引数を受け取るかを開発者側で定義する。
import
std/httpcore
type Route* = ref object
path*:string
httpMethod*:HttpMethod
template serveWithPlugin*(routes, createPlugin:untyped):untyped =
proc threadProc(params:(seq[(Route, Controller)], Plugin)) {.thread.} =
proc asyncProc(params:(seq[(Route, Controller)], Plugin)) {.async.} =
var server = newAsyncHttpServer(true, true)
proc cb(req: Request) {.async, gcsafe.} =
let headers = {"Content-type": "text/plain; charset=utf-8"}
let (routes, plugin) = params
for route in routes:
if req.url.path == route[0].path and req.reqMethod == route[0].httpMethod:
let resp = route[1].action(plugin).await
await req.respond(Http200, resp, headers.newHttpHeaders())
break
await req.respond(Http404, "", headers.newHttpHeaders())
server.listen(Port(5000))
while true:
if server.shouldAcceptRequest():
await server.acceptRequest(cb)
else:
await sleepAsync(500)
while true:
try:
asyncCheck asyncProc(params)
runForever()
except:
echo repr(getCurrentException())
proc serve(routes:seq[(Route, Controller)], `createPlugin`:proc():Plugin) =
let countThreads = countProcessors()
var params:seq[(seq[(Route, Controller)], Plugin)]
for i in 1..countThreads:
params.add((routes, `createPlugin`()))
when compileOption("threads"):
echo("Starting ", countThreads, " threads")
var thr = newSeq[Thread[(seq[(Route, Controller)], Plugin)]](countThreads)
for i in 0..countThreads-1:
createThread(thr[i], threadProc, params[i])
joinThreads(thr)
else:
threadProc(params[0])
serve(`routes`, `createPlugin`)
import
std/asyncdispatch,
std/options,
std/json,
allographer/query_builder,
./setting
proc index*(plugin:Plugin):Future[string] {.async, gcsafe.} =
for i in 1..10:
echo plugin.rdb.table("num_table").find(i).await.get()
return "index"
proc show*(plugin:Plugin):Future[string] {.async, gcsafe.} =
return "show"
import
std/asyncdispatch,
std/asynchttpserver,
std/osproc,
./fw/lib,
./setting,
./controller
let routes = @[
(Route(path:"/", httpMethod:HttpGet), Controller.new(index)),
(Route(path:"/show", httpMethod:HttpGet), Controller.new(show))
]
proc createPlugin*():Plugin =
return Plugin.new()
serveWithPlugin(routes, createPlugin)
動作する
Starting 4 threads
DEBUG SELECT * FROM num_table WHERE id = ? LIMIT 1 ["1"]
{"id":1,"randomnumber":7985}
DEBUG SELECT * FROM num_table WHERE id = ? LIMIT 1 ["2"]
{"id":2,"randomnumber":6529}
DEBUG SELECT * FROM num_table WHERE id = ? LIMIT 1 ["3"]
{"id":3,"randomnumber":6613}
DEBUG SELECT * FROM num_table WHERE id = ? LIMIT 1 ["4"]
{"id":4,"randomnumber":1471}
DEBUG SELECT * FROM num_table WHERE id = ? LIMIT 1 ["5"]
{"id":5,"randomnumber":8057}
DEBUG SELECT * FROM num_table WHERE id = ? LIMIT 1 ["6"]
{"id":6,"randomnumber":944}
DEBUG SELECT * FROM num_table WHERE id = ? LIMIT 1 ["7"]
{"id":7,"randomnumber":1194}
DEBUG SELECT * FROM num_table WHERE id = ? LIMIT 1 ["8"]
{"id":8,"randomnumber":5665}
DEBUG SELECT * FROM num_table WHERE id = ? LIMIT 1 ["9"]
{"id":9,"randomnumber":1812}
DEBUG SELECT * FROM num_table WHERE id = ? LIMIT 1 ["10"]
{"id":10,"randomnumber":6917}
Pluginが持つフィールドを標準ライブラリのdb_postgres
のDbConn
に差し替えてみる
import
std/asyncdispatch,
+ std/db_postgres
+proc initDb*():DbConn =
+ open("db", "user", "pass", "database")
# ============================================================
type Plugin* = ref object
+ rdb*:DbConn
proc new*(_:type Plugin):Plugin =
return Plugin(
rdb: initDb()
)
# ============================================================
type Controller* = ref object
action*: proc(plugin:Plugin):Future[string] {.async, gcsafe.}
proc new*(_:type Controller, action: proc(plugin:Plugin):Future[string] {.async, gcsafe.}):Controller =
return Controller(action:action)
import
std/asyncdispatch,
std/db_postgres,
std/options,
std/json,
allographer/query_builder,
./setting
proc index*(plugin:Plugin):Future[string] {.async, gcsafe.} =
for i in 1..10:
+ echo plugin.rdb.getRow(sql"SELECT * FROM num_table WHERE id = ?", i)
return "index"
proc show*(plugin:Plugin):Future[string] {.async, gcsafe.} =
return "show"
動作する
Starting 4 threads
@["1", "7985"]
@["2", "6529"]
@["3", "6613"]
@["4", "1471"]
@["5", "8057"]
@["6", "944"]
@["7", "1194"]
@["8", "5665"]
@["9", "1812"]
@["10", "6917"]
ベンチマーク測定
import
std/asyncdispatch,
std/options,
std/json,
std/strutils,
std/sequtils,
allographer/query_builder,
./setting
proc index*(plugin:Plugin):Future[string] {.async, gcsafe.} =
let nThreads = 500
var futures = newSeq[Future[Option[JsonNode]]](nThreads)
for i in 1..nThreads:
futures[i-1] = plugin.rdb.table("num_table").find(i)
let res = all(futures).await
let response = res.map(
proc(x:Option[JsonNode]):JsonNode =
x.get()
)
return $(%response)
proc show*(plugin:Plugin):Future[string] {.async, gcsafe.} =
return "show"
コンパイル
DB_POSTGRES=true nim c -r --threads:{on|off} --mm:{refc|arc|orc} -d:useMalloc -d:useRealtimeGC -d:release main
測定
wrk -c4 -t4 -d10s http://localhost:5000 # DB access
wrk -c4 -t4 -d10s http://localhost:5000/show # return string
Memory monagement | Threads | DB access | return string |
---|---|---|---|
refc | on | 33 | 233624 |
refc | off | 151 | 234453 |
arc | on | 88 | 219813 |
arc | off | 166 | 228683 |
orc | on | 81 | clash |
orc | off | 137 | 182754 |
なぜかシングルスレッドの方がパフォーマンスが良い(´・ω・`)
Nimの非同期とマルチスレッドに関する議論
どうやらIOバウンドな処理はマルチスレッドで動かすよりも、シングルスレッドで非同期を使ったほうがいいようです。
--gc:orc ; Multi-threaded async slower than single-threaded
Criticism of Parallel Nim
Channel / Actors based parallelism? Are there such Web Servers?
Threading in Nim
コネクションプール VS プリペアドステートメント
Nim製クエリビルダのallographerはコネクションプールの仕組みを持っている。短時間に大量のDBアクセスをする時に、あるクエリがaのコネクションを使っていると、他のクエリではbのコネクションを使ってDBへアクセスする。aのコネクションの使用中の状態が解除されたら別のクエリがまたaのコネクションを使う。PostgreSQLではデフォルトで100コネクションを張ることができるので、95コネクションくらいなら同時に使って大量のDBアクセスを捌くことができる。
一方でコネクションプールを使ったクエリ実行は、毎回クエリをパースし構文を解釈し値をバインドしている。プリペアドステートメントを使えばクエリのパースは1回で済み、値のバインドだけを行えばいいので、同じ構文のクエリを何回も実行する場合はプリペアドステートメントの方が速いらしい。しかしプリペアドステートメントはコネクション単位で作られるため、1つのプリペアドステートメントでは1つのコネクションを使い回さなければいけない。
今回は毎回クエリのパースをしても複数のコネクションを使ったほうが速いのか、プリペアドステートメントを使ったほうが速いのか実験してみる。
DB_POSTGRES=true nim c -r --threads:off --mm:arc -d:useMalloc -d:useRealtimeGC -d:release main
import
std/asyncdispatch,
std/json,
std/options,
std/random,
std/times,
allographer/connection,
allographer/schema_builder,
allographer/query_builder,
allographer/async/async_db
randomize()
let rdb = dbOpen(PostgreSQL, "database", "user", "pass", "db", 5432, 96, 30, false, false)
proc init() =
rdb.create(
table("num_threads", [
Column.integer("id"),
Column.integer("randomnumber")
])
)
seeder rdb, "num_threads":
var data = newSeq[JsonNode]()
for i in 1..10000:
let randomNum = rand(10000)
data.add(%*{"id": i, "randomnumber": randomNum})
rdb.table("num_threads").insert(data).waitFor
proc main(){.async.} =
init()
let n = 500
# プリペアドステートメント
block:
let start = cpuTime()
var resp = newJArray()
let conn = rdb.conn
let prepare = conn.prepare(PostgreSQL, "select * from num_table where id = $1 LIMIT 1", "12345").await
var futures = newSeq[Future[(seq[Row], DbRows)]](n)
for i in 1..n:
futures[i-1] = prepare.query(PostgreSQL, @[$i])
let resArr = all(futures).await
for res in resArr:
let rows = res[0]
for row in rows:
let dbInfo = res[1]
resp.add(%*{dbInfo[0][0].name: row[0], dbInfo[0][1].name: row[1]})
# echo resp
echo cpuTime() - start
# コネクションプール
block:
let start = cpuTime()
var resp = newJArray()
var futures = newSeq[Future[Option[JsonNode]]](n)
for i in 1..n:
futures[i-1] = rdb.table("num_table").find(i)
let resArr = all(futures).await
for rowOpt in resArr:
resp.add(rowOpt.get)
# echo resp
echo cpuTime() - start
main().waitFor
echo resp
の出力結果
[{"id":1,"randomnumber":7985},{"id":2,"randomnumber":6529},{"id":3,"randomnumber":6613}...{"id":500,"randomnumber":9237}]
結果
プリペアドステートメント | コネクションプール |
---|---|
0.5787295100000001秒 | 0.05459516299999989秒 |
なんと10倍以上の差だった。プリペアドステートメントよりもコネクションプールを使った方が圧倒的に速かった。
asyncdispatch VS chronos
Nimは非同期の仕組みを提供する標準ライブラリのasyncdispatchと、Status社が開発した3rdパーティのchronosがある。
これのパフォーマンスを比較してみる。
import
std/asyncdispatch,
std/json,
std/options,
std/random,
std/times,
allographer/connection,
allographer/schema_builder,
allographer/query_builder,
allographer/async/async_db
randomize()
let rdb = dbOpen(PostgreSQL, "database", "user", "pass", "db", 5432, 96, 30, false, false)
proc init() =
rdb.create(
table("num_threads", [
Column.integer("id"),
Column.integer("randomnumber")
])
)
seeder rdb, "num_threads":
var data = newSeq[JsonNode]()
for i in 1..10000:
let randomNum = rand(10000)
data.add(%*{"id": i, "randomnumber": randomNum})
rdb.table("num_threads").insert(data).waitFor
proc main(){.async.} =
init()
let n = 10000
let start = cpuTime()
var resp = newJArray()
var futures = newSeq[Future[Option[JsonNode]]](n)
for i in 1..n:
futures[i-1] = rdb.table("num_table").find(i)
let resArr = all(futures).await
for rowOpt in resArr:
resp.add(rowOpt.get)
# echo resp
echo cpuTime() - start
main().waitFor
import
std/json,
std/options,
std/random,
std/times,
chronos,
# chronos_dbはallographerのasyncdispatchを全てchronosに置換したもの
./chronos_db/connection,
./chronos_db/schema_builder,
./chronos_db/query_builder,
./chronos_db/async/async_db
randomize()
let rdb = dbOpen(PostgreSQL, "database", "user", "pass", "db", 5432, 96, 30, false, false)
proc init() =
rdb.create(
table("num_threads", [
Column.integer("id"),
Column.integer("randomnumber")
])
)
seeder rdb, "num_threads":
var data = newSeq[JsonNode]()
for i in 1..10000:
let randomNum = rand(10000)
data.add(%*{"id": i, "randomnumber": randomNum})
rdb.table("num_threads").insert(data).waitFor
proc main(){.async.} =
init()
let n = 10000
let start = cpuTime()
var resp = newJArray()
var futures = newSeq[Future[Option[JsonNode]]](n)
for i in 1..n:
futures[i-1] = rdb.table("num_table").find(i)
let resArr = all(futures).await
for rowOpt in resArr:
resp.add(rowOpt.get)
# echo resp
echo cpuTime() - start
main().waitFor
DB_POSTGRES=true nim c -r --mm:orc -d:useMalloc -d:useRealtimeGC -d:release main
DB_POSTGRES=true nim c -r --mm:arc -d:useMalloc -d:useRealtimeGC -d:release main
DB_POSTGRES=true nim c -r --mm:orc -d:useMalloc -d:useRealtimeGC -d:release -d:asyncBackend=chronos chronos_main
DB_POSTGRES=true nim c -r --mm:arc -d:useMalloc -d:useRealtimeGC -d:release -d:asyncBackend=chronos chronos_main
async lib | memory management | time |
---|---|---|
asyncdispatch | orc | 2.272063811 |
asyncdispatch | arc | 1.904029235 |
chronos | orc | 3.682025834 |
chronos | arc | 2.167253522 |
標準ライブラリの方が速かった
asynchttpserver VS httpbeast
標準ライブラリのasynchttpserver、Nim forumで使われているマイクロWebフレームワークのJesterがベースにしているhttpbeastでパフォーマンスを比較してみる。
HTMLを返すシナリオ
ライブラリ | GC | スレッド | req/10s |
---|---|---|---|
httpbeast | ORC | on | double free or corruption (fasttop) |
httpbeast | ORC | off | 289313 |
httpbeast | ARC | on | 280547 |
httpbeast | ARC | off | 310421 |
asynchttpserver | ORC | on | double free or corruption (fasttop) |
asynchttpserver | ORC | off | 179902 |
asynchttpserver | ARC | on | 229477 |
asynchttpserver | ARC | off | 219878 |
リクエスト毎にDBに500回アクセスするシナリオ
ライブラリ | GC | スレッド | req/10s |
---|---|---|---|
httpbeast | ORC | on | 73 |
httpbeast | ORC | off | 96 |
httpbeast | ARC | on | 75 |
httpbeast | ARC | off | 95 |
asynchttpserver | ORC | on | double free or corruption (!prev) |
asynchttpserver | ORC | off | 124 |
asynchttpserver | ARC | on | 59 |
asynchttpserver | ARC | off | 139 |
非同期を使った時にGCがARCだと、開放されない値があるのか何回も動かしているうちにメモリリークが起きたので、現実的にはORCか現行のrefcしか選択肢はないことになる。
しかしORCもマルチスレッドで動かした時にはセグフォが頻繁に起きる。
httpbeast・ORC・シングルスレッドの組み合わせが安定していてパフォーマンスが良さそうである。
Discussion