ApolloでSubscriptionを触ってみる
ApolloServer, ApolloClientでアプリを作成している。チャット系の機能でWebSocketを使おうと思ったところGraphQLにはSubscriptionという便利な仕組みがあると知ったので使ってみる。
WebSocketを使うにはLinkの設定が必要。
yarn add subscriptions-transport-ws
でインストール
Linkの初期化
import { WebSocketLink } from "@apollo/client/link/ws";
const wsLink = new WebSocketLink({
uri: "ws://localhost:4000/subscriptions",
options: {
reconnect: true,
},
});
`
queryとmutatonはwsLinkではなくhttpLinkでいいので使い分けができるようにする。
import { getMainDefinition } from "@apollo/client/utilities";
import { split } from '@apollo/client';
const uploadLink = createUploadLink({
uri: "http://localhost:4000/graphql",
});
const wsLink = new WebSocketLink({
uri: "ws://localhost:4000/subscriptions",
options: {
reconnect: true,
},
});
const splitLink = split(
({ query }) => {
const definition = getMainDefinition(query);
return (
definition.kind === "OperationDefinition" &&
definition.operation === "subscription"
);
},
wsLink,
uploadLink
);
ApolloClientに設定
const client = new ApolloClient({
link: from([errorLink, authLink, splitLink]),
cache,
});
WebSocketでデータを受け取る際に認証をつけたい場合がある。その場合の仕組みは connectionParams で提供されている。
const wsLink = setContext(async () => {
const currentUser = auth().currentUser;
const idToken = await currentUser?.getIdToken();
return new WebSocketLink({
uri: "ws://localhost:4000/subscriptions",
options: {
reconnect: true,
connectionParams: {
authToken: idToken,
},
},
});
});
こんな感じだろうか?
この仕組みはあまりまだわかってない。
サーバー側の設定もしないとエラーでそう
ここまではクライアント側の設定なので実行できるようにサーバー側も設定していく。
とりあえず最小の定義
type Subscription {
messageCreated: Message
}
現在Expressでhttp通信用のサーバーしか作っていないので SubscriptionServer を作成する。
yarn add subscriptions-transport-ws
...
import { createServer } from "http";
import { SubscriptionServer } from "subscriptions-transport-ws";
const app = express();
const httpServer = createServer(app);
const subscriptionServer = SubscriptionServer.create(
{
schema,
execute,
subscribe,
},
{
server: httpServer,
path: "/graphql",
}
);
ApolloServerの更新
const server = new ApolloServer({
schema: schemaWithResolvers,
context,
// 以下追加
plugins: [
{
async serverWillStart() {
return {
async drainServer() {
subscriptionServer.close();
},
};
},
},
],
});
app.listen の部分を httpServer.listen に変更する。
httpServer.listen({ port: 4000 }, () =>
console.log(`🚀 Server ready at http://localhost:4000${server.graphqlPath}`)
);
これでhttp, wsどちらも使える(はず)
リゾルバを作成するための準備をする。
SubscriptionはQueryやMutationと異なりsubscribe関数が定義されたオブジェクトでないといけない。そしてその関数は AsyncIterator を返さなければいけない。
AsyncIteratorはpubsub.asyncIteratorから作成することができる。
そしてこの pubsub は graphql-subscriptions で提供されているクラスを使うと簡単に作成することができる。
yarn add graphql-subscriptions
import { PubSub } from 'graphql-subscriptions';
const pubsub = new PubSub();
ただこのクラスは本番では推奨されていなく、別のものを使うことが勧められている。
The PubSub class is not recommended for production environments, because it's an in-memory event system that only supports a single server instance. After you get subscriptions working in development, we strongly recommend switching it out for a different subclass of the abstract PubSubEngine class. Recommended subclasses are listed in Production PubSub libraries.
リゾルバ
// pubsub.asyncIteratorで発生するタイプエラーが解決できなかったので一旦nocheck
// @ts-nocheck
import { SubscriptionResolvers } from "~/generated/graphql";
import { pubsub } from "~/lib/pubsub";
export const Subscription: SubscriptionResolvers = {
messageCreated: {
subscribe: () => pubsub.asyncIterator("MESSAGE_CREATED"),
},
};
import { Mutation } from "./mutations";
import { Resolvers } from "~/generated/graphql";
import { Query } from "./queries";
import { Subscription } from "./subscriptoins";
export const resolvers: Resolvers = {
Mutation,
Query,
Subscription,
};
publishするコード
const message = await prisma.message.create({
data: {
text: input.text,
roomId: input.roomId,
senderId: user.id,
},
});
pubsub.publish("MESSAGE_CREATED", {createdMessage: message}); // publish
return message;
またクライアント側に戻ってsubscriptionを定義する。
subscription OnMessageCreated {
messageCreated {
id
text
createdAt
sender {
...UserParts
}
roomId
}
}
上記の
const wsLink = setContext(async () => {
const currentUser = auth().currentUser;
const idToken = await currentUser?.getIdToken();
return new WebSocketLink({
uri: "ws://localhost:4000/subscriptions",
options: {
reconnect: true,
connectionParams: {
authToken: idToken,
},
},
});
});
だとエラーになるので
const wsLink = new WebSocketLink({
uri: "ws://localhost:4000/subscriptions",
options: {
reconnect: true,
// connectionParams: {
// authToken: idToken,
// },
},
});
こうする
上記のuriの設定だとサーバーのエンドポイントと違っていたので修正
// uri: "ws://localhost:4000/subscriptions",
uri: "ws://localhost:4000/graphql",
ここまででクライアントからサブスクリプションしてcreateMessageを実行するmutationを実行するとサーバーがpublishしたデータをsubscribeすることができた
これだと全ての作成されたメッセージをサブスクライブしてしまうので絞る必要がある。
サーバー側でスキーマを変更し、withFilterを使用する。
これfilterの条件が変わる場合がある時ってどう実装すべきなのだろう。
例えば最初はroomId = 1のものしかsubscribeする必要なくても、トークルームを追加したらroomId = 2のデータもsubscribeする必要が出てくる。毎回新しく実行していたらコネクション数めちゃ増えることになるから、一回disconnectして、再度新しい条件(roomId = [1, 2]みたいな)でsubscriptionすることになるのか?
それかuserId = 1みたいな感じでuserIdを基準にして「作成されたメッセージのルームのメンバーにそのユーザーがいる場合」という処理をサーバー側で行うのがいいのか??
subscribeToMoreを使うことでunsubscribeできる関数が返されるから、Subscriptionのargを変更したいときに都度 unsbscribe -> 新しい条件([1, 2, newId]みたいな感じ)でsubscribe という処理を行うようにする。とりあえず。
Subscriptionによって実行されるリゾルバにはQueryとMutationとは別にcontextを設定する必要がある。
SubscriptionServer.create(
onConnect: () => {
return {
// contextで渡したいオブジェクト
};
},
)