Open3

Yuniframe: 非同期FFI呼出しの準備

okuokuokuoku

前回異なるイベントキューを1つに混ぜる方法を検討したけど、今後もイベントキューを持つライブラリは増えて行きそうなので一旦一般化したものを1つ用意することにした。

少くとも、

  • ファイルシステム(Mini-pkg)
  • ネットワーク(miniio)
  • SDL

の3つのイベントキューを独立して処理することになる。

prev

https://zenn.dev/okuoku/scraps/c1c0510db351bb

okuokuokuoku

プリミティブの準備

一般的な非同期FFIを作るのはちょっと後回しにして、まずは非同期処理キューだけ用意することにした。

これらのプリミティブに加えて、キューそれぞれにジョブのサブミッションインターフェースが付くことになる。

(先頭の数字は序数; いわゆる interface に相当する機能が今のyuniには無いので、この辺は手動で実装する。SDLやminiioのようなキューのプロバイダからはこれらの関数ポインタを配列で受けるので、スクリプト側で名前を付けて使うことになる。)

chime、(async) と オーナーシップ

プリミティブに "(async)" が付いているものはキューの処理を行っていない他のスレッドから呼出せることを示す。

asyncなプリミティブは失敗しない。このため、 errstring はasyncである必要は無い。

スレッド間で更新する手段としては chime を提供する。 chimeproc ctx data で構成されるデータで、C関数:

void proc(uintptr_t ctx, uintptr_t data);

として呼出すことでキューをwakeupできる。libuvでは uv_async_t 、 SDLであれば SDL_PushEvent で実現される。

キューは、それを処理する主体としてオーナーがあり、通常はサブスレッドがオーナーとなっている。 suspend を発行すると、サブスレットは処理を止め、 chime を発火してメインスレッドに処理停止を知らせる。この chime の発火を持ってキューのオーナーはメインスレッドに移り、メインスレッドはジョブの発行等を行える。

async queue

キューには:

  1. メインスレッドでなければ処理できないもの(SDL)
  2. サブスレッドでも処理できるが、メインスレッドで fetch する際は suspend していなければならないもの(libuv)
  3. サブスレッドで処理でき、任意のタイミングで fetch できるもの(Mini-Pkg)

の3種類がある。SDLのようなキューはかなり特殊なので(macOS/iOSを除くと現代的なOSでは存在しない)、基本的には 2 と 3 の識別が必要ということになる。これは init の結果に含まれる needs_suspend? で区別できる。

各プリミティブ

[0] init

init(name) => [ctx, needs_suspend?]

キューを初期化する。キューには名前を付けられ、キューを待ち合わせるスレッドのスレッド名やキュー自体の名前に使用される(ことがある)。

[1] terminate

terminate(ctx) => []

キューを停止、解放する。 キューにジョブが残っているとクラッシュする

[2] errstring (async)

errstring(ctx, err, aux, buf, buflen) => [result_len, current_len] 

キューで発生したエラーをC文字列にする。デバッグ用。多くのカーネルやSDLは "カレントエラー" の概念があり、ジョブのサブミッション直後でなければ意味のあるエラーコードが得られないケースがある。

[3] fetch

fetch(ctx, buf, len) => [err, aux, result_len, current_len, reserved_timebase]

イベントを取得する。イベントキューは uintptr_t の配列で、特定のフォーマットを取る。

エラーは "これ以上このイベントキューはイベントを発行しない" という意味となる。このため、fetch がエラーを返却したqueueは即 terminate を発行することが期待される。割り込みにより起床された場合で、イベントが存在しない場合はゼロ長を返却し正常終了する。

キューに溜まっているイベントを全て取り出すために必要なイベント長(バイト単位)が current_len に返却される。

[4] fetch_nowait

fetch_nowait(ctx, buf, len) => [err, aux, result_len, current_len]

fetchの待ち合わせを行わないバージョン。 イベントが無い場合はゼロ長を返却し正常終了する

[5] chime_create

chime_create(ctx, user_data) => [err, aux, chime_proc, chime_ctx, chime_data]

chimeを確保する。 chimeが残っているとキューをterminateできないことに注意

[6] chime_destroy

chime_destroy(ctx, chime_ctx, chime_data) => []

確保したchimeを捨てる。chimeがin-flight -- suspend中またはsubscirbe中 -- である場合にdestroyを発行すると事故が起こる。特にsuspend要求は中断できないことに注意。

[7] suspend (async)

suspend(ctx, chime_proc, chime_ctx, chime_data) => []

キュー処理スレッドのサスペンドを要求する。サスペンドが完了するとchimeが発火して通知される。 キュー自体のサスペンドではないことに注意fetch によって新規イベントが発生する可能性がある。

[8] resume

resume(ctx) => []

サスペンドしたキュー処理スレッドを再開する。

[9] subscribe

subscribe(ctx, chime_proc, chime_ctx, chime_data) => [subscription]

空の キューにイベントが追加されるたびに chime を発火させる。chimeを再度発火させるためには fetchfetch_nowait でキューが空になる(result_len == current_len でリターンする)までキューを消費する必要がある。

[10] unsubscribe

unsubscribe(ctx, subscription) => []

subscriptionを中止する。

okuokuokuoku

スレッドAPI

うーん。。どう作ったもんか。。一旦NCCC形式にして、各ライブラリに直接渡す方式にしてみる。

[0] errstring

errstring(err, aux, buf, buflen) => [result_len, current_len] 

[1] thr_create

thr_create(name, class, callback, param) => [err, aux, thr]

スレッドを生成する。名前付き(大抵のOSはスレッドに名前を付けられる)。

[2] thr_detach

thr_detach(thr) => []

スレッドが終了していない場合は、スレッド関数の完了時に自動的に結果を捨てるようにスレッドをマークする。スレッドが終了している場合は結果を捨てる。

thr が呼出し元スレッド自体であった場合の挙動は未定義。

[3] thr_join

thr_join(thr) => [err, aux, result]

スレッドの終了を待つ。detach済のスレッドを待とうとするとエラーが返却される。 thr が自分自身であった場合の挙動は未定義(クラッシュさせた方が良いと思う)。

スレッドを強制終了させる方法は無い。OS側の呼出しを中断するための信頼性の高い方法が存在しないため。

callback

callback(thr, param) => [result]

thr_create で渡すコールバック。値を返却可能。

[4] mtx_create

mtx_create(name, class) => [err, aux, mtx]

mutexを作成する。classは予約でゼロのみ、recursiveでpriority inheritする普通のmutexを作成する。

nameはmutexのクラス名として扱われる。 トレースAPIによっては無限に名前を生成できないことがあるので注意

[5] mtx_destroy

mtx_destroy(mtx) => []

mutexを破棄する。ロックされたmutexを破棄した場合の結果は未定義(たぶんクラッシュする)。

[6] mtx_trylock

mtx_trylock(mtx) => [locked?]

mutexをロックする。他のスレッドがロックを持っている場合はロックせず、 locked? は偽を返却する。

[7] mtx_lock

mtx_lock(mtx) => []

mutexをロックする。他のスレッドがロックを持っている場合はそのスレッドがロックを解放するまで待つ。

時限待ちは提供しない。常識的なシチュエーションでは時限待ちではなくadaptive mutexで解決されるべき。

[8] mtx_unlock

mtx_unlock(mtx) => []

mutexをアンロックする。mutexがロックされていない場合の挙動は未定義(たぶんクラッシュする)。

[9] cnd_create

cnd_create(name) => [err, aux, cnd]

条件変数を作成する。nameは条件変数のクラス名として扱われる。 トレースAPIによっては無限に名前を生成できないことがあるので注意

[10] cnd_destroy

cnd_destroy(cnd) => []

条件変数を破棄する。条件変数を待っているスレッドが居た場合の挙動は未定義(たぶんクラッシュする)。

[11] cnd_broadcast

cnd_broadcast(cnd) => []

条件変数を待っているスレッドを全て起床する。

[12] cnd_signal

cnd_signal(cnd) => []

条件変数を待っているスレッドを1つ起床する。

[13] cnd_wait

cnd_wait(cnd, mtx, timeout_ms) => []

条件変数を待つ。 mtx がロックされていない場合の挙動は未定義(たぶんクラッシュする)。