🔔

プッシュ通知の実行環境(Rpush)を独自にFargate対応した話

2024/10/30に公開

ourly株式会社でバックエンドエンジニアをしているjakeです。

弊社ではプッシュ通知の送信にRpush Gemを使用しているのですが、これをより安定的に稼働させるための取り組みについて共有いたします。

前提

Rpushについて

RpushはAppleプッシュ通知サービス(APNs), Firebase Cloud Messaging(FCM), Amazon Device Messagingといった複数の通知送信基盤に対応した通知作成&送信のためのGemです。

基本的にプッシュ通知1通ごとに対応するレコードをDBに作成します。

Rpushの実行方法には次のような方法が用意されています。

(詳しくはREADMEを参照)

  • 独立プロセス

    • 基本的にこちらが推奨されており、 bundle exec rpush start といった形でRpushを起動します。起動したあとは設定したポーリング間隔で常にDBを探索し、各プッシュ通知サービスへのリクエストを行います。
  • ジョブ

    • 非同期処理として実行されるジョブとして実行します。 Rpush.push をジョブのコード中で呼び出すことでプッシュ通知を行います。
  • 埋め込み

    • pumaなど既存のプロセスに埋め込む形で起動します。そのあとの挙動は独立プロセスの場合と同じと思われます。

施策実施前の実行方法

従来はSidekiqを使用した定期実行ジョブとしてRpushを起動し、運用しておりました。当時のourlyの規模からして、独立したプロセスで起動するまでではないという判断でした。

現実としてこれまで大きな問題なく稼働しておりましたが、ありがたいことに利用企業様が増えていること、弊社で利用しているFCMのAPI移行によってプッシュ通知の送信にかかる時間が増加してしまったことから、より安定的な稼働を目的として独立したプロセスとして起動するよう変更することとしました。

(埋め込み方式については未検証ですが、既存のプロセスと同居することから安定稼働の面での不利が大きいと考えて不採用としました)

移行にあたって行う必要のある対応

ourlyはAmazon ECS on Fargateで稼働しておりますが、Fargateで独立したプロセスとして起動するにあたって考慮すべきことが二点ありました。

一点目はECSのヘルスチェックです。

正常に稼働しているかどうかを監視するためには必要ですが、Rpushにはデフォルトでその仕組みはないようでした。

二点目はGraceful Shutdownです。

デプロイ時には新しいコンテナ(タスク)を起動すると同時に、それまで起動していたコンテナを安全に終了(Graceful Shutdown)させる必要があります。

Rpushを起動するコンテナにおいては、着手したプッシュ通知の送信処理が終わるまで待ってから終了することを担保する、ということを意味します。

対応1 - ヘルスチェック

Rpushが正常に稼働していることを確認して適切な終了ステータスを返す仕組みが必要です。

ありがたいことに、Rpushは起動したメインのプロセス(以下、サーバープロセス)とRPCで通信する仕組みをあらかじめ用意してくれていました。

またこちらstatusメソッドを実行すると、実行中のサーバープロセスからヘルスチェックに使用できる情報を取得できることもわかりました。

こちらを利用してECSのヘルスチェックを行うためのコードは次のとおりです。(記事化にあたり実際のコードから簡略化しています。以下同じ)

class RpushDaemonRpcCommand
  class << self
    def healthcheck
      rpc = Rpush::Daemon::Rpc::Client.new(pid)
      status = rpc.status
      rpc.close
      raise 'No running apps' if status['app_runners'].blank?
      exit 0
    rescue StandardError => e
      exit 1
    end
  end
end

まずサーバープロセスに対してRPC通信を行ってステータスを確認し、その正常/異常を終了ステータスとします。

これをECSのhealthcheckから実行します。

bin/rpush_healthcheck
#!/usr/bin/env ruby

RpushDaemonRpcCommand.healthcheck
タスク定義
"healthCheck": {
  "command": [
    "CMD-SHELL",
    "bundle exec ruby bin/rpush_healthcheck"
  ]
},

これでECSのヘルスステータスが正常になりました。

対応2 - Graceful Shutdown

ポーリング停止の実現方法

Graceful Shutdownについてはもう少しごにょごにょする必要がありました。

というのもRpushには今回の要件である「現在の処理は正常に終了させて、次の処理は行わない」という仕組みはなかったからです。

Rpushのコードを読み、使えそうな処理を見つけました。

Rpush::Daemon::Feeder という、送信対象のプッシュ通知レコードをDBから取得してキューに入れることを役割とするクラスの feed_forever メソッドです。(Ref.)

def self.feed_forever
  loop do
    enqueue_notifications
    interruptible_sleeper.sleep(Rpush.config.push_poll)
    return if should_stop
  end
end

実装は単純で、

  • 対象のプッシュ通知を探索してキューに入れる
  • ポーリング間隔のぶんsleepする
  • 必要であれば停止する

という処理を無限ループしています。

このsleepの時間はあらかじめ設定された Rpush.config.push_poll で、プッシュ通知をどれくらいの間隔でDBにポーリングするか、という設定値です。

この間隔をとても長くしてしまえば擬似的に頭書の目標が達成できるのではと考え、試したところ(少々強引なのは理解しつつ)実現できてしまいました。

Rpush側の実装が少々複雑なため詳細な解説は省きますが、この Feeder は実際に各プッシュ通知サービスへのリクエストを行う処理とは別スレッドに分離されて実行されており、このような優れたGem側の設計も今回の対応がうまくいった要因であったと考えています。

サーバプロセスへポーリング停止を伝える

実現方法にあたりが付いたので、ヘルスチェックと同じような方法でサーバプロセスにポーリング停止を伝える実装を行います。

ただし今回はサーバプロセス側にそもそもこの処理を実行する機構がありませんので自作します。

まずサーバプロセスで処理停止のための quiet_for_deployment メソッドを定義し、RPC通信を受け付けられるようにします。

module RpushPlugins
  # patch for Rpush::Daemon::Rpc::Server
  module DaemonRpcServerPatch
    private
    # see Rpush::Daemon::Rpc::Server#process
    def process(cmd, args)
      case cmd
      when 'quiet_for_deployment'
        quiet_for_deployment(args['sleeping_seconds'])
      else
        super
      end
    end
    # デプロイ時の旧コンテナ停止用
    # Rpush::Daemon::Feeder.feed_forever での待ち時間を伸ばすことで実質的にキューイングを停止する
    # @param [Integer] sleeping_seconds
    def quiet_for_deployment(sleeping_seconds)
      Rpush.config.push_poll = sleeping_seconds
      Rpush::Daemon.store.release_connection
      EcsTaskCommand.enable_quiet_protection(namespace: :rpush)
    end
  end
end

( EcsTaskCommand.enable_quiet_protection(namespace: :rpush)ECSのタスクスケールイン保護を有効にする独自実装です)

このmoduleをサーバ起動時にRpushのプラグインという仕組みを利用しつつprependし、 quiet_for_deployment 以外の既存の挙動に変化がないようにします。

Rpush.plugin(:patch).init do
  require 'rpush_plugins/daemon_rpc_server_patch'
  Rpush::Daemon::Rpc::Server.prepend(RpushPlugins::DaemonRpcServerPatch)
end

その処理を実行するためにまず Rpush::Daemon::Rpc::Client を継承した独自のクライアントクラスを定義し、

class Rpush::Daemon::Rpc::Client::Ourly < Rpush::Daemon::Rpc::Client
  def quiet_for_deployment(**kwargs)
    call(:quiet_for_deployment, kwargs)
  end
end

ヘルスチェック処理のために作った RpushDaemonRpcCommand クラスに次のメソッドを追加しました。

def quiet_for_deployment
  rpc = Rpush::Daemon::Rpc::Client::Ourly.new(pid)
  rpc.quiet_for_deployment(sleeping_seconds: SLEEPING_SECONDS_FOR_DEPLOYMENT)
  rpc.close
  exit 0
rescue StandardError => e
  exit 1
end

このとき、SLEEPING_SECONDS_FOR_DEPLOYMENTは既存のプッシュ通知が終了するのを待つために充分長い値を指定します。

bin/rpush_quiet_for_deployment
#!/usr/bin/env ruby

RpushDaemonRpcCommand.quiet_for_deployment

あとはヘルスチェックと同じようにデプロイ時にこの処理を実行すれば、Graceful Shutdownが実現できるというわけです。

実運用では上の方でさらっと書いたECSのタスクスケールイン保護を併用してコンテナが意図せず終了しないようにしているわけですが、これについてはまた別の記事で書きたいと思います。

ourly tech blog

Discussion