プッシュ通知の実行環境(Rpush)を独自にFargate対応した話
ourly株式会社でバックエンドエンジニアをしているjakeです。
弊社ではプッシュ通知の送信にRpush Gemを使用しているのですが、これをより安定的に稼働させるための取り組みについて共有いたします。
前提
Rpushについて
RpushはAppleプッシュ通知サービス(APNs), Firebase Cloud Messaging(FCM), Amazon Device Messagingといった複数の通知送信基盤に対応した通知作成&送信のためのGemです。
基本的にプッシュ通知1通ごとに対応するレコードをDBに作成します。
Rpushの実行方法には次のような方法が用意されています。
(詳しくはREADMEを参照)
-
独立プロセス
- 基本的にこちらが推奨されており、
bundle exec rpush start
といった形でRpushを起動します。起動したあとは設定したポーリング間隔で常にDBを探索し、各プッシュ通知サービスへのリクエストを行います。
- 基本的にこちらが推奨されており、
-
ジョブ
- 非同期処理として実行されるジョブとして実行します。
Rpush.push
をジョブのコード中で呼び出すことでプッシュ通知を行います。
- 非同期処理として実行されるジョブとして実行します。
-
埋め込み
- pumaなど既存のプロセスに埋め込む形で起動します。そのあとの挙動は独立プロセスの場合と同じと思われます。
施策実施前の実行方法
従来はSidekiqを使用した定期実行ジョブとしてRpushを起動し、運用しておりました。当時のourlyの規模からして、独立したプロセスで起動するまでではないという判断でした。
現実としてこれまで大きな問題なく稼働しておりましたが、ありがたいことに利用企業様が増えていること、弊社で利用しているFCMのAPI移行によってプッシュ通知の送信にかかる時間が増加してしまったことから、より安定的な稼働を目的として独立したプロセスとして起動するよう変更することとしました。
(埋め込み方式については未検証ですが、既存のプロセスと同居することから安定稼働の面での不利が大きいと考えて不採用としました)
移行にあたって行う必要のある対応
ourlyはAmazon ECS on Fargateで稼働しておりますが、Fargateで独立したプロセスとして起動するにあたって考慮すべきことが二点ありました。
一点目はECSのヘルスチェックです。
正常に稼働しているかどうかを監視するためには必要ですが、Rpushにはデフォルトでその仕組みはないようでした。
二点目はGraceful Shutdownです。
デプロイ時には新しいコンテナ(タスク)を起動すると同時に、それまで起動していたコンテナを安全に終了(Graceful Shutdown)させる必要があります。
Rpushを起動するコンテナにおいては、着手したプッシュ通知の送信処理が終わるまで待ってから終了することを担保する、ということを意味します。
対応1 - ヘルスチェック
Rpushが正常に稼働していることを確認して適切な終了ステータスを返す仕組みが必要です。
ありがたいことに、Rpushは起動したメインのプロセス(以下、サーバープロセス)とRPCで通信する仕組みをあらかじめ用意してくれていました。
またこちらの status
メソッドを実行すると、実行中のサーバープロセスからヘルスチェックに使用できる情報を取得できることもわかりました。
こちらを利用してECSのヘルスチェックを行うためのコードは次のとおりです。(記事化にあたり実際のコードから簡略化しています。以下同じ)
class RpushDaemonRpcCommand
class << self
def healthcheck
rpc = Rpush::Daemon::Rpc::Client.new(pid)
status = rpc.status
rpc.close
raise 'No running apps' if status['app_runners'].blank?
exit 0
rescue StandardError => e
exit 1
end
end
end
まずサーバープロセスに対してRPC通信を行ってステータスを確認し、その正常/異常を終了ステータスとします。
これをECSのhealthcheckから実行します。
#!/usr/bin/env ruby
RpushDaemonRpcCommand.healthcheck
"healthCheck": {
"command": [
"CMD-SHELL",
"bundle exec ruby bin/rpush_healthcheck"
]
},
これでECSのヘルスステータスが正常になりました。
対応2 - Graceful Shutdown
ポーリング停止の実現方法
Graceful Shutdownについてはもう少しごにょごにょする必要がありました。
というのもRpushには今回の要件である「現在の処理は正常に終了させて、次の処理は行わない」という仕組みはなかったからです。
Rpushのコードを読み、使えそうな処理を見つけました。
Rpush::Daemon::Feeder
という、送信対象のプッシュ通知レコードをDBから取得してキューに入れることを役割とするクラスの feed_forever
メソッドです。(Ref.)
def self.feed_forever
loop do
enqueue_notifications
interruptible_sleeper.sleep(Rpush.config.push_poll)
return if should_stop
end
end
実装は単純で、
- 対象のプッシュ通知を探索してキューに入れる
- ポーリング間隔のぶんsleepする
- 必要であれば停止する
という処理を無限ループしています。
このsleepの時間はあらかじめ設定された Rpush.config.push_poll
で、プッシュ通知をどれくらいの間隔でDBにポーリングするか、という設定値です。
この間隔をとても長くしてしまえば擬似的に頭書の目標が達成できるのではと考え、試したところ(少々強引なのは理解しつつ)実現できてしまいました。
Rpush側の実装が少々複雑なため詳細な解説は省きますが、この Feeder
は実際に各プッシュ通知サービスへのリクエストを行う処理とは別スレッドに分離されて実行されており、このような優れたGem側の設計も今回の対応がうまくいった要因であったと考えています。
サーバプロセスへポーリング停止を伝える
実現方法にあたりが付いたので、ヘルスチェックと同じような方法でサーバプロセスにポーリング停止を伝える実装を行います。
ただし今回はサーバプロセス側にそもそもこの処理を実行する機構がありませんので自作します。
まずサーバプロセスで処理停止のための quiet_for_deployment
メソッドを定義し、RPC通信を受け付けられるようにします。
module RpushPlugins
# patch for Rpush::Daemon::Rpc::Server
module DaemonRpcServerPatch
private
# see Rpush::Daemon::Rpc::Server#process
def process(cmd, args)
case cmd
when 'quiet_for_deployment'
quiet_for_deployment(args['sleeping_seconds'])
else
super
end
end
# デプロイ時の旧コンテナ停止用
# Rpush::Daemon::Feeder.feed_forever での待ち時間を伸ばすことで実質的にキューイングを停止する
# @param [Integer] sleeping_seconds
def quiet_for_deployment(sleeping_seconds)
Rpush.config.push_poll = sleeping_seconds
Rpush::Daemon.store.release_connection
EcsTaskCommand.enable_quiet_protection(namespace: :rpush)
end
end
end
( EcsTaskCommand.enable_quiet_protection(namespace: :rpush)
はECSのタスクスケールイン保護を有効にする独自実装です)
このmoduleをサーバ起動時にRpushのプラグインという仕組みを利用しつつprependし、 quiet_for_deployment
以外の既存の挙動に変化がないようにします。
Rpush.plugin(:patch).init do
require 'rpush_plugins/daemon_rpc_server_patch'
Rpush::Daemon::Rpc::Server.prepend(RpushPlugins::DaemonRpcServerPatch)
end
その処理を実行するためにまず Rpush::Daemon::Rpc::Client
を継承した独自のクライアントクラスを定義し、
class Rpush::Daemon::Rpc::Client::Ourly < Rpush::Daemon::Rpc::Client
def quiet_for_deployment(**kwargs)
call(:quiet_for_deployment, kwargs)
end
end
ヘルスチェック処理のために作った RpushDaemonRpcCommand
クラスに次のメソッドを追加しました。
def quiet_for_deployment
rpc = Rpush::Daemon::Rpc::Client::Ourly.new(pid)
rpc.quiet_for_deployment(sleeping_seconds: SLEEPING_SECONDS_FOR_DEPLOYMENT)
rpc.close
exit 0
rescue StandardError => e
exit 1
end
このとき、SLEEPING_SECONDS_FOR_DEPLOYMENT
は既存のプッシュ通知が終了するのを待つために充分長い値を指定します。
#!/usr/bin/env ruby
RpushDaemonRpcCommand.quiet_for_deployment
あとはヘルスチェックと同じようにデプロイ時にこの処理を実行すれば、Graceful Shutdownが実現できるというわけです。
実運用では上の方でさらっと書いたECSのタスクスケールイン保護を併用してコンテナが意図せず終了しないようにしているわけですが、これについてはまた別の記事で書きたいと思います。
Discussion