🌊

GCPでSagaパターン実装

2021/02/13に公開1

概要

メディアやコミュニティ系のアプリケーション開発を中心に行っていたが、最近会社で決済系のシステムを扱うようになったこともあり、複数のサービス間がある中でどう結果整合性を担保するかについて学んでいた。

そこで学んだ複数サービス間での整合性を保つための手法として分散Sagaパターンがあり、実際にCloud RunとCloud PubSubで実装をしてみた。

複数サービスでの整合性

複数サービスでの整合性の問題

一般的にシステムを複数サービスに分けることによって、サービスを効率的に利用することや開発チームを分けることや小さくデプロイが可能など多くのメリットが挙げられる。

しかし、複数サービスにしたときの1つの問題として、データの整合性を取ることが難しくなることが挙げられる。

例えば、1つのDBのシステムに対してデータのWriteをアトミックに行う場合はDBのトランザクション等を使うことによって実現することができる。しかし、サービスが複数あり各々のサービスがDBを持っている場合はアトミックにWriteを行うことは通常だとできない。


解決方針

この問題を解決するために考えられる1つの解決策は結果整合性を目指すということである。マイクロサービスでこのようなトランザクションを実現する場合は結果整合性が基本となる。

結果整合性とは最終的な状態として一貫性が保たれていれば良いという考えであり、プロセスの最中は一貫性が保たれていないことも許容する考え方である。このとき考えなければならないことは処理に成功するときよりも処理に失敗したときのことである。

整合性の一種であれば、処理に失敗したときはすべての状態をもとに戻す必要があり、この問題も解決し結果整合性を担保するようなアーキテクチャのパターンがいくつか存在する。

Sagaパターンによる解決策

この結果整合性を使ったアーキテクチャの1つとしてSagaパターンがある。これは結果整合性を担保したい範囲を1つの擬似的なトランザクションと考えて処理を行うアーキテクチャパターンである。

下記のように成功時はシンプルに各々のサービスで必要なアトミックな処理を行い全体として成功時の整合性を最終的に実現する。また、失敗時はそれまで行われてきたトランザクション処理を打ち消すような補償トランザクションを発行し、元の状態に戻るようになる。これにより結果整合性のトランザクション処理とロールバックを実現できる。


実装したアプリケーション

今回はチケット購入のアプリケーションを実際にこのSagaパターンを用いて実装してみた。

ユーザーはポイントを持っており、そのポイントとチケットを交換することでチケットを入手できるようなシンプルなアプリケーションになっている。

アーキテクチャ

今回はGCPをの「CloudRun」「PubSub」「Firestore」を利用してアプリケーションの実装を行った。

また、本来は各サービスでDBを分けたほうが良いが、Firestoreを利用したかったため1つのDBで行った。ただし、collectionは分けて内部的には分かれているような構成にはしている。PubSubは各サービスの実行結果イベントがpublishされるqueueとなっており、このイベントをsubscribeして各サービスが処理を実行するようになっている。

データフローとイベント定義

図の通りで、ユーザーからのリクエスト → レスポンスの全体処理としては同期的に処理を行い、各サービスはイベントをsubscribeして非同期的に処理を行うフローとなっている。そして、order serviceはこの非同期処理がすべて終わったイベントを検知したらユーザーへレスポンスを返却し、処理が終了したことを伝える。

また、赤い矢印で書かれたとおり補償トランザクションや処理に失敗した場合にもイベントを発行し、その後に必要な補償トランザクションについても各サービスが判断して実行するようになっている。

実装

実装に関してはgithubにあげているので、興味ある方はこちらを見ていただきたい。typescript x expressで各サービスを実装している。

また、ソフトウェアアーキテクチャはざっくりと下記のような感じになっている。

実装したときに気をつけた点についてのみ記述していく。

https://github.com/winor30/distributed-saga-payment-system

サービスの実装

PointServiceを例にあげると、メインの処理としてポイントを消費する処理が必要になる。その後ポイントを消費したことをpubsubでイベントとして伝える。

下記PointHandlerのconsumePoint内で一連の処理が行われる。PointConsumerでは名前の通りポイントの処理を行う。この処理が成功した場合は、成功したことをEventで伝えるために、PointEventPublisherでtopicへ成功したイベントを通知する。

また、もし失敗した場合はPointEventPublisherが失敗したイベントをpublishし、他のサービスへ伝えることができる。

class PointHandler {
 private readonly consumer: PointConsumer;
  private readonly publisher: PointEventPublisher;

  // ポイントを消費する処理
	consumePoint: RequestHandler<any, any, PaymentEvent> = async (req, res) => {
    const eventData = req.body;
    const { id, order } = eventData;
		...
    // 
    const consumedResult = await this.consumer.consume(order.userId, id, order.price).catch((err: Error) => err);
    if (consumedResult instanceof Error) {
			...
      await this.publisher.publishFailed(eventData);
      throw new HttpError(errorMsg, 500);
    }

    await this.publisher.publishConsumed(eventData, consumedResult.history);

    res.send({ msg: 'ok' });
  };
}

publisherについて

ここの実装は少し悩んだが、基本的にはすべてのサービスは同じ型のイベントを受け取ったデータにマージしてイベントとしてpublishするようにした。

そのため、過去にどんなやりとりがされたのかが基本的にはイベントを見れば分かるような形式になっている。

Point Serviceのpublisherに関しては下記のように、ポイント消費の履歴であるPointHistoryをマージしている

export class PubSubClient {
...
  publishEvent = async (eventType: Attributes['event_type'], receivedData: PaymentEvent, history?: PointHistory) => {
    const topic = this.pubsub.topic(TOPIC_NAME);
    const event = this.mergeHistory(receivedData, history);

    const attributes: Attributes = {
      event_type: eventType,
    };
    return topic.publishJSON(event, attributes);
  };

  private mergeHistory = (receivedData: PaymentEvent, history?: PointHistory): PaymentEvent => (history ? {
    ...receivedData,
    point: {
      userId: history.userId,
      historyId: history.historyId,
      value: history.value,
      createdAt: history.createdAt,
    },
  } : receivedData);
}

冪等性

pubsubを使うと必ず1回しか叩かれないという制約はない。そのため、冪等性を担保した実装にしておく必要がある。おそらくマイクロサービスな決済システムなどでは冪等性はできる限り担保しておくのが良いと思う。

export class TicketGranter {

  grant = async (
    orderId: string,
    userId: string,
    ticketId: string,
    totalPrice: number
  ): Promise<{ ticket: Ticket; inventory: TicketInventory; history: TicketHistory }> => {
    return this.transactionManager.runTransaction(async (tx) => {
			...

      const currentHistory = await historyRepository.get(orderId);
      const currentTicket = await ticketRepository.get(userId, ticketId);
      if (currentHistory && currentTicket) {
        console.log('already granted ticket');
        return { ticket: currentTicket, inventory, history: currentHistory };
      }

Subscriberの種類(PushとPull)

すべてのサービスがpublisherであり、Subscriberであるが今回はpush型のsubscriberはPointServiceとTicketServiceに、Pull型のsubscriberはOrderServiceにした。

理由としては、Cloud Runを使ったため、常に動かしながらSubscribeするのは難しいと考えたためである。(サーバーレスのアーキテクチャとは相性が悪いように感じた。もし問題なさそうな方法あれば教えていただきたい)ただし、OrderServiceに関してはイベントが終わって、オーダーを終了する必要があるのか?を判断するために、pull型のSubscriberになるようにした。

定義はコマンドベースになってしまうが、下記のようにしている

# Makefile内

create-subscriptions:
# チケットの付与が終わったイベントをorder serviceが検知するためのsubscription
	gcloud pubsub subscriptions create order-subscription --topic distributed-payment-system-topic \
		--message-filter='attributess.event_type = "granted-ticket"' && \
# オーダーが開始したイベントをpoint serviceが検知するためのsubscription
	gcloud pubsub subscriptions create point-subscription --topic distributed-payment-system-topic \
		--push-endpoint=$(POINT_SERVICE_URL)/consume \
		--push-auth-service-account=distributed-payment-system@$(GCP_PROJECT).iam.gserviceaccount.com \
		--message-filter='attributess.event_type = "started-order"' && \
# チケット付与が失敗 or キャンセルしたイベントをpoint serviceが検知するためのsubscription
	gcloud pubsub subscriptions create point-refund-subscription --topic distributed-payment-system-topic \
		--push-endpoint=$(POINT_SERVICE_URL)/refundPoint \
		--push-auth-service-account=distributed-payment-system@$(GCP_PROJECT).iam.gserviceaccount.com \
		--message-filter='attributess.event_type = "canceled-ticket" OR attributess.event_type = "failed-ticket"' && \
# ポイント消費したイベントをticket serviceが検知するためのsubscription
	gcloud pubsub subscriptions create ticket-subscription --topic distributed-payment-system-topic \
		--push-endpoint=$(TICKET_SERVICE_URL)/grant \
		--push-auth-service-account=distributed-payment-system@$(GCP_PROJECT).iam.gserviceaccount.com \
		--message-filter='attributess.event_type = "consumed-point"' && \
# orderがストップしたイベントをticket serviceが検知するためのsubscription
	gcloud pubsub subscriptions create ticket-cancel-subscription --topic distributed-payment-system-topic \
		--push-endpoint=$(TICKET_SERVICE_URL)/cancel \
		--push-auth-service-account=distributed-payment-system@$(GCP_PROJECT).iam.gserviceaccount.com \
		--message-filter='attributess.event_type = "stopped-order"'

まとめ

今回は複数サービス間で結果整合性を担保できるようなアプリケーションをSagaパターンというアーキテクチャを採用して、GCP+Typescriptで実装をした。

振り返りをすれば、各サービスの独立性を保つことができトランザクション処理が必要な割と難易度の高いアプリケーションを開発することができるので、とても可能性を感じた。

反省をすれば、push型のアーキテクチャよりはpull型のsubscriptionを使ったほうがよりシンプルに実装できそうだなと感じたので、GKEとかを使ってそっちのパターンも実装してみた。

参考資料

Discussion

hidexirhidexir

マイクロサービスにおけるsagaパターンの寄稿ありがとうございます。参考にさせていただきました。
質問なのですが
1.pubsubの順序指定は有効にしている前提でしょうか?
2. リクエストの増加に伴い、サブスクライブの不一致の量が増えた場合はレイテンシーが増加する認識であっておりますでしょうか?

図の通りで、ユーザーからのリクエスト → レスポンスの全体処理としては同期的に処理を行い、各サービスはイベントをsubscribeして非同期的に処理を行うフローとなっている。そして、order serviceはこの非同期処理がすべて終わったイベントを検知したらユーザーへレスポンスを返却し、処理が終了したことを伝える。
の部分で、図の一番左のエンドユーザーは同期的にレスポンスを受け取るとのことで、どうやって実現しているのかを気になって該当の実装を拝見しました。
distributed-saga-payment-system/order-service/src/client/pubsub.ts

        const orderId = event.id;
        if (order.orderId !== orderId) {
          console.log(`order.orderId: ${order.orderId}, orderId: ${orderId}`);
          return;
        }

これで対となるpubsubのメッセージサブスクライブの一致の判定をしているとおもうのですが、pubsubではfifoの設定を有効にしている前提なのではないかと思いました。
もしリクエスト量が膨大になってくると不一致するサブスクライブの量が増えてエンドユーザーへのレイテンシーが悪化する可能性もあるのかなとおもいました。