🛳️

[capnp][kj]簡単なsubscribe request

に公開

概要

clint からリクエストを送って返してくる server を作成した.
実装内容と失敗したことをまとめる.

Client 側のコード

https://github.com/nagato0614/capnp/blob/main/src/notifier_client_example.cpp

Server 側のコード

https://github.com/nagato0614/capnp/blob/main/src/notifier_server_example.cpp

Schema

https://github.com/nagato0614/capnp/blob/main/schema/notification.capnp

シーケンス図

実装内容

capnp では schema で実装したinterface 一つに対して一つのクラスを作成する.
例えば

Notifier
# 通知を送る側(Notifier)
interface Notifier {
  subscribe @0 (params :SubscribeParams)
      -> (subscription :Subscription,
          stream :NotificationStream);
}

Notifier を定義すると対応するクラスを以下のように実装する

NotifierImpl
class NotifierImpl final : public Notifier::Server {
 public:
  NotifierImpl() = default;
  void setTimer(kj::Timer& t) { timer_ptr_ = &t; }

  // interface で定義した関数受け取る値は SubscribeContext 内部に格納されている.
  kj::Promise<void> subscribe(SubscribeContext ctx) override {
    KJ_REQUIRE(timer_ptr_ != nullptr, "Timer not set!");
    // params を表示
    auto params = ctx.getParams();
    LOG_COUT << "[Notifier] subscribe: filter="
             << params.getParams().getFilter().cStr() << std::endl;
    state_ = kj::heap<SharedState>();

    ctx.getResults().setStream(kj::heap<StreamImpl>(*state_, *timer_ptr_));
    ctx.getResults().setSubscription(kj::heap<SubscriptionImpl>(*state_));

    LOG_COUT << "[Notifier] new subscription\n";
    return kj::READY_NOW;
  }

 private:
  kj::Timer* timer_ptr_ = nullptr;
  kj::Own<SharedState> state_;
};

Notifier::Server が継承して subscribe 関数で実行する処理を定義する.
このとき interafce では 戻り値として2つを定義している

  • subscription :Subscription
  • stream :NotificationStream
    これを返す必要があるため引数として受けとったctx.setResultsに登録する.

client 側からは以下のようにsubscribe リクエストを送る.

subscribe request
    auto notifier = client.getMain<Notifier>();

    // ── Subscribe リクエスト送信 ──
    LOG_COUT << "Sending Subscribe request..." << std::endl;
    auto req = notifier.subscribeRequest();
    req.getParams().setFilter("Notifier");

client を生成して, リクエスト用のオブジェクトを notifier.subscribeRequest で生成する. そして引数を設定してあげてからリクエストを送信する

  // リクエストの送信してレスポンスが返ってくるまでまつ.
  auto resp = req.send().wait(ws); 

レスポンスには Server 側で登録したリクエスト先が返ってくる

    auto stream = resp.getStream();
    auto session = resp.getSubscription();

別のインターフェースへのリクエストはここからレスポンスから取得できるオブジェクト経由で行える.

この実装では subscribe を実行後 cancel と read へアクセスできるようになる.
しかし登録されていない段階でクライアント側からサーバー側にアクセスするとクライアント側が落ちてしまう.

失敗したこと

Server 作成時 mainInterface を登録していなかった.

main
int main() {
  try {
    // 1) NotifierImpl を heap で生成し、生ポインタを控える
    auto notifierOwn = kj::heap<NotifierImpl>();
    auto* notifierRaw = notifierOwn.get();  // Timer 注入用

    // 2) EzRpcServer を起動 (mainInterface, bindAddress, defaultPort)
    capnp::EzRpcServer server(kj::mv(notifierOwn), "localhost", 5923);

    // 3) サーバ作成後に Timer を取得し、NotifierImpl に注入
    auto& timer = server.getIoProvider().getTimer();
    notifierRaw->setTimer(timer);

    // 4) ログ & イベントループ
    auto& ws = server.getWaitScope();
    auto port = server.getPort().wait(ws);
    LOG_COUT << "Notifier server started on port " << port << '\n';

    kj::NEVER_DONE.wait(ws);
  } catch (kj::Exception& e) {
    LOG_COUT << "Server exception: " << e.getDescription().cStr() << '\n';
  }
}

main 関数で 1 で Subscriber 本体を作成しておく.
次に server に Subscriber を Server に登録している.

Subscriber 内部で kj::Timer を使いたいため, 以下のように実装していた.

capnp::EzRpcServer server("localhost", 5923);
auto& timer = server.getIoProvider().getTimer();
auto notifierOwn = kj::heap<NotifierImpl>(timer);
server.exportCap(kj::mv(notifierOwn);

server を作成して timer を生成し subscriber に登録してから server にsubscriber を登録してた. これだとビルドは通るが client からアクセスしたときに server が SEGV で落ちてしまう. EzRpcServer のコンストラクタに mainInterafce を登録しておかないとだめ.
EzRpcServer のコンストラクタ一覧は以下の通り

EzRpcServer
EzRpcServer::EzRpcServer(Capability::Client mainInterface, kj::StringPtr bindAddress,
                         uint defaultPort, ReaderOptions readerOpts)
    : impl(kj::heap<Impl>(kj::mv(mainInterface), bindAddress, defaultPort, readerOpts)) {}

EzRpcServer::EzRpcServer(Capability::Client mainInterface, struct sockaddr* bindAddress,
                         uint addrSize, ReaderOptions readerOpts)
    : impl(kj::heap<Impl>(kj::mv(mainInterface), bindAddress, addrSize, readerOpts)) {}

EzRpcServer::EzRpcServer(Capability::Client mainInterface, int socketFd, uint port,
                         ReaderOptions readerOpts)
    : impl(kj::heap<Impl>(kj::mv(mainInterface), socketFd, port, readerOpts)) {}

EzRpcServer::EzRpcServer(kj::StringPtr bindAddress, uint defaultPort,
                         ReaderOptions readerOpts)
    : EzRpcServer(nullptr, bindAddress, defaultPort, readerOpts) {}

EzRpcServer::EzRpcServer(struct sockaddr* bindAddress, uint addrSize,
                         ReaderOptions readerOpts)
    : EzRpcServer(nullptr, bindAddress, addrSize, readerOpts) {}

EzRpcServer::EzRpcServer(int socketFd, uint port, ReaderOptions readerOpts)
    : EzRpcServer(nullptr, socketFd, port, readerOpts) {}

EzRpcServer::~EzRpcServer() noexcept(false) {}

mainInterface の登録を行っているコンストラクタだけ impl が生成されており, それ以外はnullptr になるため SEGV が発生していた.

timer を使うときは
Server を立ち上げたあと Subscriber に追加した setTimer 関数からSubscriber にタイマーを作成すると良い.

Client 側で別スレッドでリクエストを発行してしまった.

キャンセルを実行するとき今は以下のようにしているが, std::thread を使ってリクエストを送るとクライアント側がエラーで落ちてしまう.

Timer
    // ── 5秒後にキャンセルを送信(Timer使用) ──
    auto timer_promise =
        timer.afterDelay(5 * kj::SECONDS).then([session]() mutable {
          LOG_COUT << "[Client] Cancelling subscription..." << std::endl;
          session.cancelRequest().send().ignoreResult();
        });
    task_set.add(kj::mv(timer_promise));

EventLoop はスレッド事に作る必要があると capnp のサイトに記載されている.

Event Loop Concurrency
KJ’s concurrency model is based on event loops. While multiple threads are allowed, each thread must have its own event loop. KJ discourages fine-grained interaction between threads as synchronization is expensive and error-prone. Instead, threads are encouraged to communicate through Cap’n Proto RPC.

KJ’s event loop model bears a lot of similarity to the JavaScript concurrency model. Experienced JavaScript hackers – especially node.js hackers – will feel right at home.

As of version 0.4, the only supported way to communicate between threads is over pipes or socketpairs. This will be improved in future versions. For now, just set up an RPC connection over that socketpair. :)

https://capnproto.org/cxxrpc.html?utm_source=chatgpt.com

Discussion