🤔

gRPCのインターセプター処理の流れ(Go言語)

2023/09/10に公開

業務でgRPCを使用しているのですが、インターセプターの処理の流れについてどうなっているか気になって調べたことがあったので記事にしたいと思います。

はじめに

gRPCについての基本的な説明については割愛させて頂きます。
↓基本的な説明については、この方の記事が非常に参考になりますので是非読んでみてください。
さき(H.Saki)
https://zenn.dev/hsaki/books/golang-grpc-starting

gRPCのインターセプター

gRPCのインターセプターはハンドラ処理の前後に挟むミドルウェアで、追加することで、ロギングや認証処理などの処理を行うことができます。
※今回の記事では、Unary RPCのみを対象としております。

それでは、実際にコードで見ていきたいと思います。

単一のインターセプターを追加する場合

まずは単一のインターセプターを追加する場合を見ていきます。
はじめに、以下のコードでgRPCサーバーを作成します。

main.go
// gRPCサーバーを作成する
s := grpc.NewServer()

このメソッドは、

server.go
// NewServer creates a gRPC server which has no service registered and has 
// not started to accept requests yet.
func grpc.NewServer(opt ...grpc.ServerOption) *grpc.Server

のように定義されており、optにインターセプターを追加することで、インターセプターを利用することができます。

grpcのインターセプターは、

// UnaryServerInterceptor provides a hook to intercept the execution of a 
// unary RPC on the server. info contains all the information of this RPC the 
// interceptor can operate on. And handler is the wrapper of the service 
// method implementation. It is the responsibility of the interceptor to 
// invoke handler to complete the RPC.
type UnaryServerInterceptor func(ctx context.Context, req any, info *UnaryServerInfo, handler UnaryHandler) (resp any, err error)

このような関数型で定義されており、以下のメソッドによってServerOption型に追加することができます。

// UnaryInterceptor returns a ServerOption that sets the 
// UnaryServerInterceptor for the server. Only one unary interceptor can be 
// installed. The construction of multiple interceptors (e.g., chaining) can 
// be implemented at the caller.
func grpc.UnaryInterceptor(i grpc.UnaryServerInterceptor) grpc.ServerOption

それでは、インターセプターを追加していきます。

main.go
// gRPCサーバーを作成して、インターセプターをサーバーに追加する
s := grpc.NewServer(grpc.UnaryInterceptor(myIntercepter()))
intercepter.go
// インターセプターを定義する
func myIntercepter() grpc.UnaryServerInterceptor {
	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
		log.Println("MyIntercepter start")
		resp, err := handler(ctx, req)
		log.Println("MyIntercepter end")
		return resp, err
	}
}

これでサーバーにインターセプターを追加することができました。実際にサーバーをうごかしてみると、以下のようなログが確認できます。

2023/09/10 02:29:27 MyIntercepter start
2023/09/10 02:29:27 MyIntercepter end

サーバーの処理の流れは、以下のようになっています。

  1. MyIntercepter startのログが出力される。
  2. handlerの呼び出しで、grpcに登録されているサービスが呼び出される。
  3. MyIntercepter endのログが出力される。

単一のインターセプターでは処理の流れを追いやすいですね。

複数インターセプターを追加する場合

次に、複数のインターセプターを追加する場合を見ていきたいと思います。
複数登録する場合は、ChainUnaryInterceptorを利用することで登録することができます。

// ChainUnaryInterceptor returns a ServerOption that specifies the chained 
// interceptor for unary RPCs. The first interceptor will be the outer most, 
// while the last interceptor will be the inner most wrapper around the real 
// call. All unary interceptors added by this method will be chained.
func grpc.ChainUnaryInterceptor(interceptors ...grpc.UnaryServerInterceptor) grpc.ServerOption

それでは、実際に追加してみたいと思います。

main.go
// gRPCサーバーを作成して、インターセプターをサーバーに追加する
s := grpc.NewServer(grpc.ChainUnaryInterceptor(
	myIntercepter1(),
	myIntercepter2(),
	myIntercepter3(),
))
intercepter.go
// インターセプターを定義する
func myIntercepter1() grpc.UnaryServerInterceptor {
	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
		log.Println("MyIntercepter1 start")
		resp, err := handler(ctx, req)
		log.Println("MyIntercepter1 end")
		return resp, err
	}
}

func myIntercepter2() grpc.UnaryServerInterceptor {
	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
		log.Println("MyIntercepter2 start")
		resp, err := handler(ctx, req)
		log.Println("MyIntercepter2 end")
		return resp, err
	}
}

func myIntercepter3() grpc.UnaryServerInterceptor {
	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
		log.Println("MyIntercepter3 start")
		resp, err := handler(ctx, req)
		log.Println("MyIntercepter3 end")
		return resp, err
	}
}

ログは以下のようになります。

2023/09/10 03:44:03 MyIntercepter1 start
2023/09/10 03:44:03 MyIntercepter2 start
2023/09/10 03:44:03 MyIntercepter3 start
2023/09/10 03:44:03 MyIntercepter3 end
2023/09/10 03:44:03 MyIntercepter2 end
2023/09/10 03:44:03 MyIntercepter1 end

処理の流れとしては、

  1. MyIntercepter1 startのログが出力される。
  2. MyIntercepter2 startのログが出力される。
  3. MyIntercepter3 startのログが出力される。
  4. handlerの呼び出しで、grpcに登録されているサービスが呼び出される。
  5. MyIntercepter3 endのログが出力される。
  6. MyIntercepter2 endのログが出力される。
  7. MyIntercepter1 endのログが出力される。

となります。処理の流れとしては、何ら難しいことはないと思いますが、
私はここで。「え、handlerが3回も呼ばれてる、なんでだろう?」となりました。

gRPCのパッケージ内を覗いてみる

handlerが行なっている処理は、gRPCパッケージのserver.goのファイルを見ることで処理の流れを理解することができます。

まずはじめに、ChainUnaryInterceptorによって、インターセプターがchainUnaryIntsの中に追加されます。

server.go
// ChainUnaryInterceptor returns a ServerOption that specifies the chained interceptor
// for unary RPCs. The first interceptor will be the outer most,
// while the last interceptor will be the inner most wrapper around the real call.
// All unary interceptors added by this method will be chained.
func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption {
	return newFuncServerOption(func(o *serverOptions) {
		o.chainUnaryInts = append(o.chainUnaryInts, interceptors...)
	})
}

ここで追加されたchainUnaryIntsはchainUnaryServerInterceptorsによって以下のように処理が呼び出されます。

server.go
// chainUnaryServerInterceptors chains all unary server interceptors into one.
func chainUnaryServerInterceptors(s *Server) {
	// Prepend opts.unaryInt to the chaining interceptors if it exists, since unaryInt will
	// be executed before any other chained interceptors.
	interceptors := s.opts.chainUnaryInts
	if s.opts.unaryInt != nil {
		interceptors = append([]UnaryServerInterceptor{s.opts.unaryInt}, s.opts.chainUnaryInts...)
	}
	
	var chainedInt UnaryServerInterceptor
	if len(interceptors) == 0 {
		chainedInt = nil
	} else if len(interceptors) == 1 {
		chainedInt = interceptors[0]
	} else {
		chainedInt = chainUnaryInterceptors(interceptors)
	}

	s.opts.unaryInt = chainedInt
}

この処理では複数あるインターセプターを、UnaryServerInterceptor型の一つの関数にまとめています。
インターセプターが複数ある場合は、chainUnaryInterceptors関数が呼び出されており、この関数の中身は以下のようになっています。

server.go
func chainUnaryInterceptors(interceptors []UnaryServerInterceptor) UnaryServerInterceptor {
	return func(ctx context.Context, req any, info *UnaryServerInfo, handler UnaryHandler) (any, error) {
		return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler))
	}
}

func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info *UnaryServerInfo, finalHandler UnaryHandler) UnaryHandler {
	if curr == len(interceptors)-1 {
		return finalHandler
	}
	return func(ctx context.Context, req any) (any, error) {
		return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler))
	}
}

実はこのchainUnaryInterceptors関数の中で、getChainUnaryHandler関数が再帰処理で呼ばれており、これによって複数あるインターセプターが一つにまとめられているんですね。

ここまで見ると段々わかってくるかと思いますが、先程の複数インターセプターのある処理の流れを細かく見ると、

  1. MyIntercepter1 startのログが出力される。

  2. MyIntercepter1のhandlerの呼び出しによって、MyIntercepter2に処理が移る。

  3. MyIntercepter2 startのログが出力される。

  4. MyIntercepter2のhandlerの呼び出しによって、MyIntercepter3に処理が移る。

  5. MyIntercepter3 startのログが出力される。

  6. handlerの呼び出しで、grpcに登録されているサービスが呼び出される。

  7. MyIntercepter3 endのログが出力される。

  8. MyIntercepter2に処理がもどる。

  9. MyIntercepter2 endのログが出力される。

  10. MyIntercepter1 に処理がもどる。

  11. MyIntercepter1 endのログが出力される。

このように、インターセプターのhandler関数は、自身が登録された順番より後にインターセプターが登録されている場合、handler関数で次のインターセプターを呼び出していることがわかりました。

図にすると以下のようになるかと思います。

このように見てみると、インターセプターに書かれていた出力の順番にも納得が行きますね。
goは外部パッケージも同じ言語で書かれているため、何か疑問に思ったことがあってもパッケージの中身を除くことで処理の流れが追えるので便利ですね。

gRPCの公式ドキュメントはこちらから

https://grpc.io/
https://github.com/grpc/grpc-go
https://pkg.go.dev/google.golang.org/grpc

今回の記事は以上になります。最後まで読んでいただきありがとうございました。

Discussion