Open21

ApolloでSubscriptionを触ってみる

RikuRiku

ApolloServer, ApolloClientでアプリを作成している。チャット系の機能でWebSocketを使おうと思ったところGraphQLにはSubscriptionという便利な仕組みがあると知ったので使ってみる。

RikuRiku

WebSocketを使うにはLinkの設定が必要。
yarn add subscriptions-transport-ws
でインストール

RikuRiku

Linkの初期化

import { WebSocketLink } from "@apollo/client/link/ws";

const wsLink = new WebSocketLink({
  uri: "ws://localhost:4000/subscriptions",
  options: {
    reconnect: true,
  },
});
`
RikuRiku

queryとmutatonはwsLinkではなくhttpLinkでいいので使い分けができるようにする。

import { getMainDefinition } from "@apollo/client/utilities";
import { split } from '@apollo/client';

const uploadLink = createUploadLink({
  uri: "http://localhost:4000/graphql",
});

const wsLink = new WebSocketLink({
  uri: "ws://localhost:4000/subscriptions",
  options: {
    reconnect: true,
  },
});

const splitLink = split(
  ({ query }) => {
    const definition = getMainDefinition(query);
    return (
      definition.kind === "OperationDefinition" &&
      definition.operation === "subscription"
    );
  },
  wsLink,
  uploadLink
);
RikuRiku

ApolloClientに設定

const client = new ApolloClient({
    link: from([errorLink, authLink, splitLink]),
    cache,
  });
RikuRiku

WebSocketでデータを受け取る際に認証をつけたい場合がある。その場合の仕組みは connectionParams で提供されている。

const wsLink = setContext(async () => {
  const currentUser = auth().currentUser;
  const idToken = await currentUser?.getIdToken();

  return new WebSocketLink({
    uri: "ws://localhost:4000/subscriptions",
    options: {
      reconnect: true,
      connectionParams: {
        authToken: idToken,
      },
    },
  });
});

こんな感じだろうか?
この仕組みはあまりまだわかってない。

https://stackoverflow.com/questions/63253201/how-does-websocketclient-pass-authentication-token-in-apollo-client-3-0
サーバー側の設定もしないとエラーでそう

RikuRiku

現在Expressでhttp通信用のサーバーしか作っていないので SubscriptionServer を作成する。

yarn add subscriptions-transport-ws
...
import { createServer } from "http";
import { SubscriptionServer } from "subscriptions-transport-ws";

const app = express();
const httpServer = createServer(app);

const subscriptionServer = SubscriptionServer.create(
    {
      schema,
      execute,
      subscribe,
    },
    {
      server: httpServer,
      path: "/graphql",
    }
  );
RikuRiku

ApolloServerの更新

const server = new ApolloServer({
    schema: schemaWithResolvers,
    context,
   // 以下追加
    plugins: [
      {
        async serverWillStart() {
          return {
            async drainServer() {
              subscriptionServer.close();
            },
          };
        },
      },
    ],
  });
RikuRiku

app.listen の部分を httpServer.listen に変更する。

httpServer.listen({ port: 4000 }, () =>
    console.log(`🚀 Server ready at http://localhost:4000${server.graphqlPath}`)
  );

これでhttp, wsどちらも使える(はず)

RikuRiku

リゾルバを作成するための準備をする。

SubscriptionはQueryやMutationと異なりsubscribe関数が定義されたオブジェクトでないといけない。そしてその関数は AsyncIterator を返さなければいけない。

AsyncIteratorはpubsub.asyncIteratorから作成することができる。

そしてこの pubsub は graphql-subscriptions で提供されているクラスを使うと簡単に作成することができる。

yarn add graphql-subscriptions
import { PubSub } from 'graphql-subscriptions';

const pubsub = new PubSub();

ただこのクラスは本番では推奨されていなく、別のものを使うことが勧められている。

The PubSub class is not recommended for production environments, because it's an in-memory event system that only supports a single server instance. After you get subscriptions working in development, we strongly recommend switching it out for a different subclass of the abstract PubSubEngine class. Recommended subclasses are listed in Production PubSub libraries.

RikuRiku

リゾルバ

subscriptions
// pubsub.asyncIteratorで発生するタイプエラーが解決できなかったので一旦nocheck
// @ts-nocheck

import { SubscriptionResolvers } from "~/generated/graphql";
import { pubsub } from "~/lib/pubsub";

export const Subscription: SubscriptionResolvers = {
  messageCreated: {
    subscribe: () => pubsub.asyncIterator("MESSAGE_CREATED"),
  },
};
index.ts
import { Mutation } from "./mutations";
import { Resolvers } from "~/generated/graphql";
import { Query } from "./queries";
import { Subscription } from "./subscriptoins";

export const resolvers: Resolvers = {
  Mutation,
  Query,
  Subscription,
};

RikuRiku

publishするコード

createMessage.ts
const message = await prisma.message.create({
    data: {
      text: input.text,
      roomId: input.roomId,
      senderId: user.id,
    },
  });

pubsub.publish("MESSAGE_CREATED", {createdMessage: message}); // publish

return message;
RikuRiku

またクライアント側に戻ってsubscriptionを定義する。

subscription OnMessageCreated {
  messageCreated {
    id
    text
    createdAt
    sender {
      ...UserParts
    }
    roomId
  }
}
RikuRiku

上記の

const wsLink = setContext(async () => {
  const currentUser = auth().currentUser;
  const idToken = await currentUser?.getIdToken();

  return new WebSocketLink({
    uri: "ws://localhost:4000/subscriptions",
    options: {
      reconnect: true,
      connectionParams: {
        authToken: idToken,
      },
    },
  });
});

だとエラーになるので

const wsLink = new WebSocketLink({
  uri: "ws://localhost:4000/subscriptions",
  options: {
    reconnect: true,
    // connectionParams: {
    //   authToken: idToken,
    // },
  },
});

こうする

RikuRiku

上記のuriの設定だとサーバーのエンドポイントと違っていたので修正

// uri: "ws://localhost:4000/subscriptions",
uri: "ws://localhost:4000/graphql",
RikuRiku

ここまででクライアントからサブスクリプションしてcreateMessageを実行するmutationを実行するとサーバーがpublishしたデータをsubscribeすることができた

RikuRiku

これだと全ての作成されたメッセージをサブスクライブしてしまうので絞る必要がある。
サーバー側でスキーマを変更し、withFilterを使用する。
これfilterの条件が変わる場合がある時ってどう実装すべきなのだろう。
例えば最初はroomId = 1のものしかsubscribeする必要なくても、トークルームを追加したらroomId = 2のデータもsubscribeする必要が出てくる。毎回新しく実行していたらコネクション数めちゃ増えることになるから、一回disconnectして、再度新しい条件(roomId = [1, 2]みたいな)でsubscriptionすることになるのか?

RikuRiku

それかuserId = 1みたいな感じでuserIdを基準にして「作成されたメッセージのルームのメンバーにそのユーザーがいる場合」という処理をサーバー側で行うのがいいのか??

RikuRiku

Subscriptionによって実行されるリゾルバにはQueryとMutationとは別にcontextを設定する必要がある。

SubscriptionServer.create(
  onConnect: () => {
        return {
          // contextで渡したいオブジェクト
        };
      },
)