🔥

GoのAPIサーバーで非同期処理をGracefull Shutdownしたい

に公開

はじめに

Goではgoroutineの仕組みを使って並行処理をgo func() { ... }()goとつけるだけで簡単に実装できるのが大きな強みです。APIサーバーを実装しているうえでもレスポンスを先に返して裏で集計や外部通知といった重い処理を走らせたいシチュエーションでgoroutineを使って実装したいことは多いと思います。

一方で、実際に商用システムのアプリケーションをクラウド/コンテナ環境にデプロイすると Graceful Shutdown が避けて通れません。Kubernetes や Fargate は停止時にプロセスへシグナル (一般に SIGTERM) を送り、一定時間後に強制終了 (SIGKILL) します。HTTPサーバーやgRPCサーバーのライブラリでは、Graceful shutdownを行うためのメソッドが用意されている場合が多く、シグナルを受け取ってそのメソッドを呼び出す実装をしておけば問題ないでしょう。(標準パッケージであるnet/httpでもServer.ShutdownはGracefulにシャットダウンしてくれます)
しかしハンドラ内部でgo func() { doSomethingHeavy() }()のようにレスポンス返却後も走り続ける goroutine がある場合、サーバーライブラリのGraceful shutdown機能では その終了を待ちません。メインサーバーのシャットダウン処理とは関係なくバックグラウンドジョブはシグナルを知らずに動き続け、やがて途中終了してしまいます。
バックグラウンドの goroutine が処理途中で切られれば、データの欠損や重複、外部サービスへの通知漏れが発生しかねません。

この記事では、「クライアントへレスポンスを返したあとに回している非同期処理を、シグナル受信後も最後まで走らせ、安全にプロセスを終了させる方法」について、いくつかのパターンを解説したいと思います。

なおこの記事はGo言語の基本文法やgoroutine・channelといった概念を一通り理解している方に向けて書いています。

そもそもGraceful Shutdownって?

Graceful Shutdown とは「プロセス終了シグナルを受け取っても、現在処理中のリクエストやジョブを途中で切らずに、一定の猶予時間内にきれいに完了させてからプロセスを終了する」仕組みです。 Kubernetesのような環境ではアプリケーションプロセスはいつでも再起動やスケールインされる可能性があるため、そういった場合でもデータ欠損や重複処理が発生しないためにGraceful Shutdownの実装は必須となります。

何の工夫もしない場合

以下に簡単なHTTPサーバーを書いてみました。このHTTPサーバーでは/translate{"english": "hello"} を送ると {"japanese": "こんにちは"} を返し、その後50秒かかる非同期処理を実行します。

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"net/http"
	"os/signal"
	"sync"
	"syscall"
	"time"
)

// 翻訳関数 (ダミー処理として50秒スリープ)
func translateAsync(text string) {
	time.Sleep(50 * time.Second)
	fmt.Println("非同期処理完了: 翻訳結果を保存しました")
}

// ハンドラ
func translateHandler(w http.ResponseWriter, r *http.Request) {
	var req map[string]string
	if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
		http.Error(w, "invalid request", http.StatusBadRequest)
		return
	}

	response := map[string]string{"japanese": "こんにちは"}
	json.NewEncoder(w).Encode(response)

	// 非同期処理を開始(SIGTERMが来ると強制終了される)
	go translateAsync(req["english"])
}

func main() {
	mux := http.NewServeMux()
	mux.HandleFunc("/translate", translateHandler)

	server := &http.Server{Addr: ":8080", Handler: mux}

	// Graceful Shutdownの処理
	ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
	defer stop()

	// サーバーの起動
	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		defer wg.Done()
		if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
			fmt.Printf("サーバーエラー: %v\n", err)
		}
	}()

	fmt.Println("サーバー起動: http://localhost:8080")

	// SIGTERMを待機
	<-ctx.Done()
	fmt.Println("シャットダウン開始...")

	// HTTPサーバーの停止
	shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	server.Shutdown(shutdownCtx)

	// 完全に終了
	wg.Wait()
	fmt.Println("サーバー停止完了")
}

この実装では非同期処理の関数の前にただgoキーワードをつけてgoroutineを立てているだけで、シグナルを受け取った際の挙動について何の実装上の工夫もしていないため、リクエストを送った後非同期処理部分が完了する前にSIGTERMやSIGINTを送ると処理を待たずに終了してしまいます。

リクエストを送った後...

$ curl -X POST http://localhost:8080/translate \
>      -H "Content-Type: application/json" \
>      -d '{"english": "hello"}'
{"japanese":"こんにちは"}

cntl+cでSIGINTを送ると処理を待たずに終了してしまった("非同期処理完了: 翻訳結果を保存しました"が出力されていない)

$ go run main.go
サーバー起動: http://localhost:8080
^Cシャットダウン開始...
サーバー停止完了

なおこの実装でもserver.Shutdownメソッドはgracefullに終了してくれるので、クライアントからリクエストを受け取ってレスポンスを返すところはgracefull shutdownになっています。(https://pkg.go.dev/net/http#Server.Shutdown)

Gracefull Shutdownの実装パターン

では上記の実装を修正して、非同期処理部分も含めてgracefullにシャットダウンできるようにしてみましょう。

sync.WaitGroupを使う

まずはsync.WaitGroupを使う方法です。
先ほどと同じ仕様のAPIサーバーを、非同期部分もgracefullにシャットダウンできるよう、sync.WaitGroupを使って実装してみました。

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"net/http"
	"os/signal"
	"sync"
	"syscall"
	"time"
)

// API サーバーの構造体
type APIServer struct {
	server *http.Server
	wg     sync.WaitGroup
}

// 非同期処理の実行
func (s *APIServer) executeAsyncProcess(f func()) {
	s.wg.Add(1) // WaitGroupのカウントを増やす
	go func() {
		defer s.wg.Done() // goroutine終了時にカウントを減らす
		f()
	}()
}

// Graceful Shutdown
func (s *APIServer) gracefullStop() {
	fmt.Println("シャットダウン開始...")
	shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	s.server.Shutdown(shutdownCtx)

	// 非同期処理の完了を待つ
	fmt.Println("非同期処理の終了を待機中...")
	s.wg.Wait()

	fmt.Println("サーバー停止完了")
}

// ハンドラ (構造体のメソッドとして実装)
func (s *APIServer) translateHandler(w http.ResponseWriter, r *http.Request) {
	var req map[string]string
	if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
		http.Error(w, "invalid request", http.StatusBadRequest)
		return
	}

	response := map[string]string{"japanese": "こんにちは"}
	json.NewEncoder(w).Encode(response)

	// 非同期処理の開始
	s.executeAsyncProcess(func() {
		time.Sleep(50 * time.Second)
		fmt.Println("非同期処理完了: 翻訳結果を保存しました")
	})
}

func main() {
	server := &APIServer{
		server: &http.Server{Addr: ":8080"},
	}

	mux := http.NewServeMux()
	mux.HandleFunc("/translate", server.translateHandler)
	server.server.Handler = mux

	// Graceful Shutdownの処理
	ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
	defer stop()

	// サーバーの起動
	var serverWg sync.WaitGroup
	serverWg.Add(1)
	go func() {
		defer serverWg.Done()
		if err := server.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
			fmt.Printf("サーバーエラー: %v\n", err)
		}
	}()

	fmt.Println("サーバー起動: http://localhost:8080")

	// SIGTERMを待機
	<-ctx.Done()

	// Graceful Shutdownの実行
	server.gracefullStop()

	// 完全に終了
	serverWg.Wait()
}

この実装では構造体にsync.WaitGroupをフィールドとして持たせています。非同期処理を行う際には必ずexecuteAsyncProcessメソッドを経由して構造体のsync.WaitGroupのAddメソッドを呼び出し、処理が終わったらDoneメソッドを呼ぶようにしています。
こうしておくことで実行中の非同期処理を構造体のsync.WaitGroupで管理できるようになり、シグナルを受け取った際にはWaitメソッドを読んで全ての処理が終わるまで待機することでgracefullなシャットダウンが実現できています。(gracefullStopメソッド)

3回リクエスト送った後

$ curl -X POST http://localhost:8080/translate      -H "Content-Type: application/json"      -d '{"english": "hello"}'
{"japanese":"こんにちは"}
$ curl -X POST http://localhost:8080/translate      -H "Content-Type: application/json"      -d '{"english": "hello"}'
{"japanese":"こんにちは"}
$ curl -X POST http://localhost:8080/translate      -H "Content-Type: application/json"      -d '{"english": "hello"}'
{"japanese":"こんにちは"}

すぐcntl+cでSIGINTを送ると全ての処理を待ってから終了した

$ go run main2.go
サーバー起動: http://localhost:8080
^Cシャットダウン開始...
非同期処理の終了を待機中...
非同期処理完了: 翻訳結果を保存しました
非同期処理完了: 翻訳結果を保存しました
非同期処理完了: 翻訳結果を保存しました
サーバー停止完了

この方法はとても単純でわかりやすいので、手軽に実装することが可能です。
なお、この実装ではgoroutine の起動数を制御できていないので、実際に商用システムを実装する際には別途semaphoreなどを使って制御が必要になるかもしれません。

channelを使う

続いてchannelを使う方法です。
channelをつかって非同期部分もgracefullにシャットダウンできるよう実装してみました。

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"net/http"
	"os/signal"
	"syscall"
	"time"
)

// API サーバーの構造体
type APIServer struct {
	server    *http.Server
	taskQueue chan struct{}
}

// 非同期処理の実行
func (s *APIServer) executeAsyncProcess(f func()) {
	go func() {
		s.taskQueue <- struct{}{}        // タスク開始通知
		defer func() { <-s.taskQueue }() // タスク終了通知
		f()
	}()
}

// Graceful Shutdown
func (s *APIServer) gracefulStop() {
	fmt.Println("シャットダウン開始...")
	shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	s.server.Shutdown(shutdownCtx)

	fmt.Println("非同期処理の終了を待機中...")

	// `taskQueue` の処理が終わるのを待つ
	for len(s.taskQueue) > 0 {
		time.Sleep(1 * time.Second) // 適切な間隔で待機
	}

	fmt.Println("サーバー停止完了")
}

// ハンドラ (構造体のメソッドとして実装)
func (s *APIServer) translateHandler(w http.ResponseWriter, r *http.Request) {
	var req map[string]string
	if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
		http.Error(w, "invalid request", http.StatusBadRequest)
		return
	}

	response := map[string]string{"japanese": "こんにちは"}
	json.NewEncoder(w).Encode(response)

	// 非同期処理の開始
	s.executeAsyncProcess(func() {
		time.Sleep(50 * time.Second)
		fmt.Println("非同期処理完了: 翻訳結果を保存しました")
	})
}

func main() {
	server := &APIServer{
		server:    &http.Server{Addr: ":8080"},
		taskQueue: make(chan struct{}, 10), // タスクキュー
	}

	mux := http.NewServeMux()
	mux.HandleFunc("/translate", server.translateHandler)
	server.server.Handler = mux

	// Graceful Shutdownの処理
	ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
	defer stop()

	// サーバーの起動
	go func() {
		if err := server.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
			fmt.Printf("サーバーエラー: %v\n", err)
		}
	}()

	fmt.Println("サーバー起動: http://localhost:8080")

	// SIGTERMを待機
	<-ctx.Done()

	// Graceful Shutdownの実行
	server.gracefulStop()
}

この実装ではchannelをタスクキューとしてつかい、非同期処理の開始時に通知を投入し終了時に取り出すことで実行中のタスクがchannelに残るようになります。 あとはシャットダウン時にタスクキューの中身が0になるまで待ってから終了することでgracefull shutdownとなります。
こちらは先ほどのsync.WaitGroupを使った実装より複雑ですが、残タスクがいくつあるかが分かったり同時実行数制御を同時にできるなど、柔軟性があがります。
一方で同時実行数の見積もりを見誤ると同期処理側でchannelへの送信がブロックされ、クライアントへの応答が遅延してしまう、といったデメリットもあります。

比較

ここまで上げた二つの方法について比較してみましょう。

観点 sync.WaitGroup chan struct{}
使い方の意図 “いま何個ゴルーチンが動いているか” をただカウントして待つ “タスクをキューに入れ、バッファ長で残件数・同時数も制御”
待機方法 wg.Wait() 1 行でブロック。ポーリング不要 for len(queue) > 0 { … } など手動ポーリング or select
同時実行数の上限 単体では制御不能(無限並列) バッファ容量で “流量制御” ができる
可読性 / 慣習 Go の “王道” パターンで読みやすい キュー目的が混在すると意図が読みにくい
拡張性 途中経過を取得できない。進捗・キャンセルを載せるには別構造が必要 select でキャンセル・進捗メッセージなども送れる
  1. sync.WaitGroup を選ぶ場面
  • “非同期仕事が終わるまで待ちたい”だけのシンプル要件。
  • タスク数が不明・無制限でも OK。(別途semaphoreなどを使えば制御は可能ではある)
  • 実装を単純にしたい・可読性を重視したい。
  • チーム全員が Go の典型パターンに慣れている。

実例

  • HTTP ハンドラで生やしたゴルーチンを全て回収し、シャットダウン完了を保証するだけの API サーバ。
  1. chan struct{} を選ぶ場面
  • 「同時に走らせるタスクは 10 件まで」など流量制限(semaphore 的用途)が欲しい。
  • 残作業数をメトリクスとして len(queue) で即時取得したい。
  • 将来キューの前段・後段を足し、ワーカーを複数立てるパイプライン構成に発展しそう。
  • ポーリングによる 1 秒程度の待ち遅延が許容されるプロセス。

実例

  • 画像変換 API で CPU を守るために並列数を制限したい。
  • WebSocket/メッセージブローカーから大量に流入するジョブを内部キューに乗せてワーカーで処理、Graceful shutdown 時には「キューが空になる or タイムアウト」の二択で終了したい。

そのほか気を付けるべきこと

今回紹介したようなgoroutineによる非同期処理を実装するうえでgracefull shutdown以外に気を付けたほうが良いことを並べてみます。

contextの扱いについて

traceやloggerの運搬のためなどメインのハンドラから渡したcotenxtを非同期処理側でも使いたい場合があります。
そういった場合、何も工夫をしないと同期処理側でレスポンスを返した段階でhttpやgRPCのライブラリ側でcontextをcancelしてしまい、まだ続いている非同期処理部分でもDBアクセスや外部API呼び出しがcontext canceledエラーで終了してしまう場合があります。
そのためこのような場合にはgo1.21で追加されたWithoutCancelを使うなど、cancelを非同期処理部分に伝播させない工夫が必要です。

そもそもgoroutineを使った非同期処理を行うことが適切か

今回紹介したパターンを使えば簡単に非同期処理が書けますしgracefullに終了することも可能です。
しかし現実のKubernetesなどの環境での終了処理ではsingterm等を送って一定以上の時間プロセスが終了しなかった場合にはsigkillを送って強制的に終了させるような仕様が一般的です。
そのためいくらgracefull shutdownを実装したとしても、1時間かかるような重い処理を安全に終了することはできません。そのような場合にはkafkaやcloud pub/subといったメッセージキューサービスを介して別プロセスに処理を委譲し、処理を受け付けるワーカープロセス側は冪等に作っておくことでshutdownされても再度メッセージキューからメッセージを取得して安全に処理を再開できる、といった設計が必要になります。
今回紹介したようなパターンは、あくまで「決済完了後エンドユーザーにメールを一本送る」のようなそこまで重くない処理を裏で走らせることでユーザーへのレスポンス速度をあげたい、といったユースケースで使うべきだと思います。

おわりに

今回実装したパターンはあくまで一例です。より良いパターンがあればぜひコメントで教えてください!

Discussion