Goで、10分で30万回APIリクエストをし結果を元に30万件のデータを更新する
こんにちは、バックエンドエンジニアの永田です。
本日は表題のような大量の API リクエストを伴うバッチ処理を Go 言語で実装したお話をしようと思います。
バッチの概要
CastingONE は主に派遣会社(以降テナントと呼称します)に使っていただいている SaaS なのですが、テナントが LINE 公式アカウントを運用している場合があります。
今回のバッチはテナントが抱えている各求職者について、「テナントの LINE 公式アカウントと友達であるかどうか(= LINE 友達ステータス)」の情報を毎日更新するためのものになります。
求職者が CastingONE の求職者向けサイト と LINE ログイン連携(OAuth 連携)をしている場合、その LINE ID が DB に保存されています。それを元に LINE API を一人づつ叩いて確認し、結果を元に LINE 友達ステータスを DB に反映しています。(複数人の友達ステータスをまとめて確認できる API は現在のところ存在しないようでした。)
※ 実は LINE Developer Console 上では Webhook URL を登録することができ、アカウントに対して友達追加やブロックといったイベントが発生したときに外部システムに通知を飛ばすこともできます。しかし今回は諸事情により、 API を毎日定期バッチから叩くことでステータス更新を行う方式にしました。(諸事情については割愛します。)
非機能要件
「LINE 公式アカウントを運用しているテナントに紐づく、 LINE ログイン連携をしている求職者」の数は、合計で大体 30 万人ほどでした。
これらの求職者の LINE 友達ステータス更新するにあたって以下の二つの非機能要件があり、これらに気を使い実装する必要がありました。
- バッチ実行は数十分以内に終わらせる
- LINE API のリクエストレートリミット(チャネルごとに 2000 リクエスト/秒)を超えないようにする
コード
いきなりですが、実際のコードは以下のような形になりました。クリーンアーキテクチャにおける UseCase の実装部分について、簡素化したものを載せています。
※ デバグのためのログ出力やエッジケースのハンドリング、private 関数のコードなどが省略されています。
package application
import (
"context"
"sync"
"time"
"github.com/CastingONE/castingone/go/jobs/batch-update-job-seeker-line-friend-status/domain"
"github.com/CastingONE/castingone/go/pkg/logger"
"golang.org/x/sync/semaphore"
"golang.org/x/time/rate"
)
// LINE友達ステータス取得の並列実行数
const getConcurrencyLimit = 200
// LINE友達ステータス取得のリクエストレートリミットのための間隔
const getInterval = time.Second / maxRequestPerSec
const maxRequestPerSec = 2000
type UseCase struct {
JobSeekerService domain.JobSeekerService
LineOfficialAccountService domain.LineOfficialAccountService
LineAPI domain.LineAPI
}
func (uc *UseCase) BatchUpdateJobSeekerLineFriendStatus(ctx context.Context) (err error) {
// テナントごとのLINEチャネルアクセストークンを取得する(APIリクエストに必要)
tenantIDChannelAccessTokenMap, err := uc.LineOfficialAccountService.GetTenantIDLineChannelAccessTokenMap(ctx)
if err != nil {
return err
}
// CastingONEが現在保持している求職者のLINE友達ステータスを取得する
jobSeekerLineFriendStatusList, err := uc.JobSeekerService.GetJobSeekerLineFriendStatusList(ctx)
if err != nil {
return err
}
// テナントごとに処理を行う
for tenantID, jobSeekerLineFriendStatusList := range jobSeekerLineFriendStatusList.ToTenantIDMap() {
cctx := context.WithoutCancel(ctx)
channelAccessToken, ok := tenantIDChannelAccessTokenMap[tenantID]
if !ok {
logger.WarnfWithContext(cctx, "channelAccessToken not found for tenantID: %d", tenantID)
continue
}
var wg sync.WaitGroup
getSem := semaphore.NewWeighted(int64(getConcurrencyLimit))
getBuff := rate.NewLimiter(rate.Every(0), 1)
if len(jobSeekerLineFriendStatusList) > maxRequestPerSec {
getBuff = rate.NewLimiter(rate.Every(getInterval), 1)
}
getResults := make(chan *domain.JobSeekerLineFriendStatus, len(jobSeekerLineFriendStatusList))
// 求職者の現在のLINE友達ステータスを、並列でLINE APIから取得する
for _, jobSeekerLineFriendStatus := range jobSeekerLineFriendStatusList {
if err := getSem.Acquire(cctx, 1); err != nil {
logger.ErrorWithContext(cctx, err)
continue
}
if err := getBuff.Wait(cctx); err != nil {
getSem.Release(1)
logger.ErrorWithContext(cctx, err)
continue
}
wg.Add(1)
go func() {
defer getSem.Release(1)
defer wg.Done()
defer func() {
if r := recover(); r != nil {
logger.ErrorfWithContext(cctx, "panic occurred for tenantID: %d, jobSeekerID: %d", tenantID, jobSeekerLineFriendStatus.ID)
}
}()
// 求職者のLINE友達ステータスを取得し、結果をチャネルに送信する
uc.getLineFriendStatus(cctx, channelAccessToken, jobSeekerLineFriendStatus, getResults)
}()
}
// テナントの全ての求職者のLINE友達ステータス取得が完了するまで待機し、結果を更新用のスライスに入れる
wg.Wait()
close(getResults)
updateSlice := make(domain.JobSeekerLineFriendStatusList, 0, len(getResults))
for jobSeekerLineFriendStatus := range getResults {
updateSlice = append(updateSlice, jobSeekerLineFriendStatus)
}
// テナントの求職者のLINE友達ステータスを一括更新する
if err := uc.JobSeekerService.BatchUpdateJobSeekerLineFriendStatus(cctx, tenantID, updateSlice); err != nil {
logger.ErrorWithContext(cctx, err)
}
}
return nil
}
ポイント
30 万回もの API リクエストを数十分以内に完了しないといけないため、どこかで並列処理を行う必要がありました。以下の3つの方針が考えられると思います。
- 各テナントごとの処理を並列化する
- 各 API リクエストに関する処理を並列化する
- 1 と 2 の両方を並列化する
方針 3 については、Goroutine をネストさせると可読性や保守性が下がるため、最初に除外しました。そして当初は、方針 1 による実装を考えていました。API へのリクエストレートリミットによって各リクエスト間でバッファを持たなければならないため、そこを並列化をしても処理速度が早くならないと思ったからでです。
しかし最終的には方針 2 を選択しました。実際の API のレスポンス速度を計測したところ、平均 0.06 秒でした。レートリミットを守るためのバッファは 1 / 2000 = 0.0005 秒であるため、大量の API リクエストを直列で行った場合、速度のボトルネックになるのはレートリミットではなくレスポンス待ちの時間だったわけです。つまり、並列化による高速化の恩恵は多分にあることになります。実際に、今のところ毎日の定期実行は 十数分で完了しています。(テナントごとに並列化を行う場合と比較して、エラーが起きた際のデバグがし易いというメリットもあります。)
Goroutine の最大並列数は、レスポンス速度とバッファの時間をもとに設定しました。バッファ分スリープしつつ Goroutine を作成していった場合 120 個まで Goroutine の数が増えていきます。121 回目のリクエストを投げる前に 1 回目のリクエストへのレスポンスが返ってくるため、それ以上 Goroutine の数は増えないはずです。このことから、最大並列数は余裕を持って 200 個に設定しました。
学び
Goroutine はとっても軽量!
当初各テナントごとの処理の方を並列化しようと考えた理由として、自分が Goroutine についてせいぜい数十個程度しか作成できないという勘違いをしていたこともありました。既存のコードでは、その程度の数の Goroutine を作成しているものしかなかったからです。
しかし調べてみると Goroutine 自体が使うメモリは数 KB 程度だったので、並列数は Goroutine 内の処理で使う CPU やメモリの大きさに規定されることになります。Goroutine 内の処理が軽量であれば、(サーバーのスペックが許す範囲で)並列数は数百でも数千でも増やすことができます。
大量の API リクエストの処理時間は、レートリミットのためのバッファではなくレスポンス速度に規定される
(もちろんレートリミットとレスポンス速度の値によりますが、往々にして後者の方が遅いのではないかと思います。)
前段のポイントに記載していたことですが自分は最初これに気づいていなかったので、指摘していただいた同じチームの hiro さんに感謝です。
感想
今回のバッチ実装は Goroutine や Channel、WaitGroup、Semaphore、Buffer などをふんだんに活用したものになっています。個人的に前職ではずっと PHP を書いており、こういったものを実装してみたくて CastingONE に転職した背景があります。作っていて非常に楽しく、Go での並列処理の書きやすさに感動しておりました。
またこの記事では触れていませんが、spf13/cobraを使ったコマンドの実装や、Skaffold を使った Cloud Run Jobs へのバッチのデプロイなど、自分にとっては経験のない技術を扱うことができたのも僥倖でした。
Discussion