Closed11

gRPCの基礎

Kumamoto-HamachiKumamoto-Hamachi

gRPCとは

a client application can directly call a method on a server application on a different machine as if it were a local object, making it easier for you to create distributed applications and services.
gRPC is based around the idea of defining a service, specifying the methods that can be called remotely with their parameters and return types.

リモートプロシージャコール(RPC)システムの1つ。RPCとは遠隔手続き呼出し。

stub[1]とは

On the client side, the client has a stub (referred to as just a client in some languages) that provides the same methods as the server.
クライアントはサーバー側と同じメソッドを提供してくれるスタブを持っている。

脚注
  1. ITの分野では、本物が用意できないときに動作に支障が無いようにとりあえず置いておく代用品という意味で使われることが多い ↩︎

Kumamoto-HamachiKumamoto-Hamachi

Protocol Buffersとは

gRPCはProtocol BuffersをIDL(Interface Definition Language)としてもメッセージのインタフェースフォーマットとしても使う。

serializing structured data (although it can be used with other data formats such as JSON).

デフォでprotocol buffer使うというだけで他のデータフォーマットも使える、JSONとか。

messgge

protocによって**._pb2.py**ファイルに

message Person {
  string name = 1;
  int32 id = 2;
  bool has_ponycopter = 3;
}

service

protocによって**._pb2_grpc.py**ファイルに

// The greeter service definition.
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}
Kumamoto-HamachiKumamoto-Hamachi

RPC life cycle

unaryとstreamingの2種類がありそれをclientとserverそれぞれがどちらを選択するか、詰まり2*2=4通りの方式がある。

1. Unary RPC

2. Server streaming RPC

3. Client streaming RPC

4. Bidirectional streaming RPC

Kumamoto-HamachiKumamoto-Hamachi

1. 単方向 Unary RPC

client-server共にunary

1. clientがstubメソッドを呼び出し、serverはclientのmetadataとメソッド名、deadlineが届く

2. (1)serverはmetadataを送るか(2)クライアントのリクエストメッセージを待つかする。

(1)と(2)どちらを先にやるかはアプリケーションによって決まる。

3. serverはclent側のリクエストメッセージを受け取ると、レスポンシブを作成と送付を行う。clientにはレスポンスと共にステータスの詳細(ステータスコードやオプションのステータスメッセージ)や終了時のmetadataが送られる

4. レスポンスのステータスに問題なければclientはレスポンスを受け取り終了する

Kumamoto-HamachiKumamoto-Hamachi

2. Server streaming RPC

After sending all its messages, the server’s status details (status code and optional status message) and optional trailing metadata are sent to the client.

Kumamoto-HamachiKumamoto-Hamachi

3. Client streaming RPC

The server responds with a single message (along with its status details and optional trailing metadata), typically but not necessarily after it has received all the client’s messages.

Kumamoto-HamachiKumamoto-Hamachi

4.Bidirectional streaming RPC

Client- and server-side stream processing is application specific. Since the two streams are independent, the client and server can read and write messages in any order. For example, a server can wait until it has received all of a client’s messages before writing its messages, or the server and client can play “ping-pong” – the server gets a request, then sends back a response, then the client sends another request based on the response, and so on.

clientとserverのストリームはそれぞれ独立しているので、clientとserverは任意の順序でメッセージの読み取りと書き込みを行うことが出来る。
例えば、serverはメッセージ書き込み前にclientのメッセージを全て受け取るのを待つことも出来るし、server-client間でピンポンのようにリクエストとレスポンスを交互に行うことも出来る。

Kumamoto-HamachiKumamoto-Hamachi

RPC terminaiton

Cancelling an RPC

両方共、server-client間は独立ってことだけ意識しておけばよい

Kumamoto-HamachiKumamoto-Hamachi

Metadata TODO

Metadata is information about a particular RPC call (such as authentication details) in the form of a list of key-value pairs, where the keys are strings and the values are typically strings, but can be binary data.

Channel

A gRPC channel provides a connection to a gRPC server on a specified host and port. It is used when creating a client stub. Clients can specify channel arguments to modify gRPC’s default behavior, such as switching message compression on or off. A channel has state, including connected and idle.
How gRPC deals with closing a channel is language dependent. Some languages also permit querying channel state.

Kumamoto-HamachiKumamoto-Hamachi

gRPCの4つの通信方式をPythonでやってみる

gRPCの4つの通信方式の理解を深めるため実際に実装を行っていきます。
既にGo言語で書かれた記事が多くあったので今回はPythonで進めていきます。

コードはGitHubのKumamoto-Hamachi/rpc_practicezenn-articleブランチ側に今回の記事のコードを上げています。またmainブランチには発展的な内容として複数Servicerがある場合のコードも上げてあります。

本記事ではクライアント側の実装も行っていきますが、実際の開発でgRPCの動作確認をする際にはevansBloomRPC等のツールを利用するのが楽なのでオススメです。
※evansやBloomRPCの解説はgRPCの動作確認にはBloomRPCとevansが便利!という話で行っているのでそちらを参考にしてみてください。

またgRPCについて全く聞いたこともないというレベルの方はまずさくらインターネットさんの出されているサービス間通信のための新技術「gRPC」入門が非常に分かりやすいのでオススメです。(本記事のUnary RPCの実装でも参考にさせていただいています。)

前提

gRPCとはなにか

a client application can directly call a method on a server application on a different machine as if it were a local object, making it easier for you to create distributed applications and services.
gRPC is based around the idea of defining a service, specifying the methods that can be called remotely with their parameters and return types.

Introduction to gRPC | gRPCより

gRPCはリモートプロシージャコール(RPC)システムの1つ。RPCとは遠隔手続き呼出しのことで簡単に言えば「違うサーバーにある関数を実行させる仕組み」のことで、呼び出す側(クライアント)と呼び出される側(サーバー)に別れているクライアント-サーバーモデル方式を取っています。

On the client side, the client has a stub (referred to as just a client in some languages) that provides the same methods as the server.
クライアントはサーバー側と同じメソッドを提供してくれるスタブを持っている。

クライアントはサーバー側の関数を呼び出す際に使うスタブ[1]というメソッドを持っています。

Protocol Buffersとはなにか

gRPCはProtocol Buffersをデフォルトのメッセージのインタフェースフォーマットとしています。[2]

Protocol buffers provide a language-neutral, platform-neutral, extensible mechanism for serializing structured data in a forward-compatible and backward-compatible way. It’s like JSON, except it's smaller and faster, and it generates native language bindings.
Protocol buffers are a combination of the definition language (created in .proto files), the code that the proto compiler generates to interface with data, language-specific runtime libraries, and the serialization format for data that is written to a file (or sent across a network connection).

Protocol BuffersとはIDL(Interface Definition Language/インターフェース定義言語)でデータの構造を定義するシリアライズのためのフォーマットで、データを永続的に保存したりネットワーク通信でデータをやり取りする際に使われます。

難しく書きましたが要はデータがどんな型を持っているか、どういう構造をしているのか簡単に書き表せる言語だということです。

後々具体例でProtocol Buffersを書いていきますがイメージとしては下記のようにやり取りするメッセージやメソッドの引数・返り値の型を定義していくのに用います。

// メッセージの型構造のイメージ(enumとかmapとか便利なのもあるよ)
message <定義するメッセージ型の名前> {
  <型> <フィールド名1> = <そのフィールドに紐づけるフィールド番号>;
  <型> <フィールド名2> = <そのフィールドに紐づけるフィールド番号>;
  <型> <フィールド名3> = <そのフィールドに紐づけるフィールド番号>;
  :
  :
}

4つの通信方式

unaryとstreamingの2種類がありそれをclientとserverそれぞれがどちらを選択するか、つまり2*2=4通りの通信方式があります。

0. 前提となる構成と実装

今回は下記のようなディレクトリ・ファイル構成で進めていきます。

$ cd ~
$ tree 
├── Dockerfile
├── docker-compose.yml
├── grpc
│   ├── client
│   │   └── user.py
│   ├── grpc_client.py
│   ├── grpc_manager.py
│   ├── json_data
│   │   └── users.json
│   ├── main.py
│   ├── proto.bash
│   ├── protos
│   │   ├── __init__.py
│   │   └── user.proto
│   ├── servicers
│   │   ├── __init__.py
│   │   └── user.py
│   ├── user_pb2.py
│   └── user_pb2_grpc.py
├── poetry.lock
└── pyproject.toml

またDBの簡易的な代替としてjsonファイルを用意することにします。

~/json_data/users.json
{
	"1": {
		"id": 1,
		"nickname": "admin",
		"mail_address": "admin@example.com",
		"user_type": "ADMINISTRATOR"
	},
	"2": {
		"id": 2,
		"nickname": "guest",
		"mail_address": "guest@example.com",
		"user_type": "GUEST"
	},
	"3": {
		"id": 3,
		"nickname": "foo",
		"mail_address": "foo@example.com",
		"user_type": "NORMAL"
	},
	"4": {
		"id": 4,
		"nickname": "troll",
		"mail_address": "troll@example.com",
		"user_type": "DISABLED"
	},
	"5": {
		"id": 5,
		"nickname": "Zaru",
		"mail_address": "zaru@example.com",
		"user_type": "NORMAL"
	},
	"6": {
		"id": 6,
		"nickname": "Saru",
		"mail_address": "saru@example.com",
		"user_type": "NORMAL"
	}
}

サービサーを登録・動作させる統御用のスクリプトを実装します。

~/grpc/grpc_manager.py
# gRPC serverに登録するservicer
from servicers.user import UserManager
# gRPCのサーバー実装ではThreadPoolを利用する
from concurrent.futures import ThreadPoolExecutor

# 「grpc」パッケージと、grpc_tools.protocによって生成したパッケージをimportする
import grpc
import user_pb2
import user_pb2_grpc

# grpc reflection用の追加ライブラリ
from grpc_reflection.v1alpha import reflection


def manager():
    # Serverオブジェクトを作成する
    server = grpc.server(ThreadPoolExecutor(max_workers=2))

    # Serverオブジェクトに定義したServicerクラスを登録する
    user_pb2_grpc.add_UserManagerServicer_to_server(UserManager(), server)

    # [追記] リフレクション登録
    SERVICE_NAMES = (
        reflection.SERVICE_NAME,
    )
    SERVICE_NAMES += (user_pb2.DESCRIPTOR.services_by_name[UserManager.__name__].full_name,)
    reflection.enable_server_reflection(SERVICE_NAMES, server)

    # 1234番ポートで待ち受けするよう指定する
    server.add_insecure_port("[::]:1234")

    # 待ち受けを開始する
    server.start()

    # 待ち受け終了後の後処理を実行する
    server.wait_for_termination()

if __name__ == "__main__":
    manager()

サービサーを叩くクライアント側の下準備も行っていきます。
※実際のメソッドは次のセクションで実装してきます。

~/grpc/grpc_client.py
from typing import List
from pprint import pprint
import sys

# 「grpc」パッケージと、protocによって生成したパッケージをimportする
import grpc
import user_pb2
import user_pb2_grpc


def main():
    # サーバーに接続する
    channel = grpc.insecure_channel("localhost:1234")
    # 送信先の「stub」を作成する
    stub = user_pb2_grpc.UserManagerStub(channel)
    # それぞれのリクエストを送信する
    if sys.argv[1] == "GetUser":
        res = get_user(stub=stub, user_id=int(sys.argv[2]))
    elif sys.argv[1] == "AddUser":
        res = add_user(
            stub=stub,
            user_id=int(sys.argv[2]),
            nickname=sys.argv[3],
            mail_address=sys.argv[4],
            user_type=int(sys.argv[5]),
        )
    elif sys.argv[1] == "CountAlreadyUsers":
        user_id_list = [int(sys.argv[2]), int(sys.argv[3])]
        res = count_already_users(stub=stub, user_id_list=user_id_list)
    elif sys.argv[1] == "GetUsersByType":
        res = get_users_by_type(stub=stub, user_type=int(sys.argv[2]))
    elif sys.argv[1] == "GetUsersByIds":
        user_id_list = [int(sys.argv[2]), int(sys.argv[3])]
        res = get_users_by_ids(stub=stub, user_id_list=user_id_list)
    return res


if __name__ == "__main__":
    main()

protoファイル内でmessageやserviceの定義も行います。

~/grpc/protos/user.proto
syntax = "proto3";

// ユーザー情報を表すメッセージ型
message User {
  uint32 id = 1;
  string nickname = 2;
  string mail_address = 3;
  enum UserType {
    NONE = 0;
    NORMAL = 1;
    ADMINISTRATOR = 2;
    GUEST = 3;
    DISABLED = 4;
  }
  UserType user_type = 4;
}

// ユーザーの情報リクエストに使用するメッセージ型
message UserRequest {
  uint32 id = 1;
}

// ユーザーの情報リクエストに使用するメッセージ型
message UserTypeRequest {
  enum UserType {
    NONE = 0;
    NORMAL = 1;
    ADMINISTRATOR = 2;
    GUEST = 3;
    DISABLED = 4;
  }
  UserType user_type = 4;
}

// ユーザー情報を返す際に使用するメッセージ型
message UserResponse {
  bool error = 1;
  string message = 2;
  User user = 3;
}

// ユーザー人数を返す際に使用するメッセージ型
message UserCntResponse {
  bool error = 1;
  uint32 user_cnt = 2;
}

// ユーザー管理を行うサービス
service UserManager {
  // ユーザー情報を取得する
  rpc GetUser (UserRequest) returns (UserResponse) {}
  // 新規のユーザー情報を追加する
  rpc AddUser (User) returns (UserResponse) {}
  // 与えられたユーザーのうち存在する人数を取得する
  rpc CountAlreadyUsers (stream UserRequest) returns (UserCntResponse) {}
  // 与えられたユーザータイプと同種のユーザーを取得する
  rpc GetUsersByType (UserTypeRequest) returns (stream UserResponse) {}
  rpc GetUsersByIds (stream UserRequest) returns (stream UserResponse) {}
}

上記のprotoファイルをprotocでコンパイルすることでmessageが記述されるuser_pb2.py、serviceが記述されるuser._pb2_grpc.pyを生成する。

$ python -m grpc_tools.protoc -I./protos --python_out=. --grpc_python_out=. ./protos/user.proto

最後の下準備としてサービサーのメソッドを入れるクラスを用意します。

from google.protobuf import json_format
from typing import List
from collections.abc import Iterable
import json


# 「grpc」パッケージと、grpc_tools.protocによって生成したパッケージをimportする
import user_pb2
import user_pb2_grpc


# ユーザー情報の読み込み
USER_DB = "./json_data/users.json"
with open(USER_DB, mode="r") as fp:
    users = json.load(fp)


class UserManager(user_pb2_grpc.UserManagerServicer):
    """
    サービス定義から生成されたクラスを継承して、
    定義したリモートプロシージャに対応するメソッドを実装する。
    クライアントが引数として与えたメッセージに対応するオブジェクト
    context引数にはRPCに関する情報を含むオブジェクトが渡される
    """

    #この下にメソッドを実装していく

1. Unary RPC(Simple RPC)


client、server共にunary(単一)の通信方式。

ここではuserの取得を行うGetUserとuserの追加を行うAddUserを実装していきます。

~/grpc/servicers/user.pyのGetUser
class UserManager(user_pb2_grpc.UserManagerServicer):

    def GetUser(self, request: user_pb2.UserRequest, context):
        """
        Unary RPC(Simple RPC)
        ユーザー情報を取得する
        """
        # クライアントが送信した引数はrequest引数に格納され、
        # このオブジェクトに対しては一般的なPythonオブジェクトと
        # 同様の形でプロパティにアクセスできる
        user_id = request.id

        # ユーザー情報はユーザーIDを文字列に変換したものをキーとする辞書型データ
        if str(user_id) not in users:
            # 該当するユーザーが存在しない場合エラーを返す
            return user_pb2.UserResponse(error=True, message="not found")
        user = users[str(user_id)]

        # 戻り値として返すUserオブジェクトを作成する
        result = user_pb2.User(
            id=user["id"],
            nickname=user["nickname"],
            mail_address=user["mail_address"],
            user_type=user_pb2.User.UserType.Value(user["user_type"]),
        )
        print("result.user_type", result.user_type, type(result.user_type))  # debug

        # UserResponseオブジェクトを返す
        return user_pb2.UserResponse(error=False, user=result)
~/grpc/servicers/user.pyのAddUser
class UserManager(user_pb2_grpc.UserManagerServicer):

    def AddUser(self, request: user_pb2.User, context):
        """
        Unary RPC(Simple RPC)
        新規にユーザー情報を登録する
        """
        # クライアントが送信した引数はrequest引数に格納され、
        # このオブジェクトに対しては一般的なPythonオブジェクトと
        # 同様の形でプロパティにアクセスできる
        user_id = request.id

        # ユーザー情報はユーザーIDを文字列に変換したものをキーとする辞書型データ
        # なので、適宜文字列型に変換して使用している
        if str(user_id) in users:
            # 該当するユーザーが既に存在している場合エラーを返す
            return user_pb2.UserResponse(error=True, message="already exist")

        # 新規登録用及び戻り値として返すUserオブジェクトを作成する
        result = user_pb2.User(
            id=request.id,
            nickname=request.nickname,
            mail_address=request.mail_address,
            user_type=request.user_type,
        )
        # preserving_proto_field_nameでcamelCaseがsnake_caseに変換される
        users[str(request.id)] = json_format.MessageToDict(
            result, preserving_proto_field_name=True
        )
        with open(USER_DB, mode="w") as f:
            json.dump(users, f)

        # UserResponseオブジェクトを返す
        return user_pb2.UserResponse(error=False, user=result)

クライアント側はそれぞれ下記のような実装です。

::: ~/grpc/grpc_clientのget_user

def get_user(stub, user_id: int):
    # リクエストに使用するオブジェクト(ここでは「UserRequest」型オブジェクト)を作成
    req = user_pb2.UserRequest(id=user_id)
    response = stub.GetUser(req)
    # 取得したレスポンスの表示
    pprint(response)
    return response

::: ~/grpc/grpc_clientのadd_user

def add_user(stub, user_id: int, nickname: str, mail_address: str, user_type: int):
    # リクエストに使用するオブジェクト(ここでは「User」型オブジェクト)を作成
    req = user_pb2.User(
        id=user_id, nickname=nickname, mail_address=mail_address, user_type=user_type
    )
    response = stub.AddUser(req)
    # 取得したレスポンスの表示
    pprint(response)
    return response

2. Server streaming RPC(Response Streaming RPC)

Server streaming RPCはクライアント側の投げてくるリクエストはUnary(単一)でレスポンス側が複数(Streaming)の通信方式。

ここでは渡されたtypeと同一のuserを全てを取得した上でそのuser1つ1つをレスポンスに詰めて投げるGetUsersByTypeを実装します。

~/grpc/servicers/user.pyのGetUsersByType
class UserManager(user_pb2_grpc.UserManagerServicer):

    def GetUsersByType(self, request: user_pb2.UserTypeRequest, context):
        """
        Server streaming RPC(Response-streaming RPC)
        渡されたユーザータイプと同じユーザーを複数返す
        """
        user_id_list = [u for u in users]
        print("user_id_list", user_id_list)  # debug
        result_list = []
        for u_id in user_id_list:
            user = users[u_id]
            # Name(数字), Value(NORMAL)とか
            if user_pb2.User.UserType.Name(request.user_type) == user["user_type"]:
                print(f"{user['id']}---該当あり---")
                result = user_pb2.User(
                    id=user["id"],
                    nickname=user["nickname"],
                    mail_address=user["mail_address"],
                    user_type=user_pb2.User.UserType.Value(user["user_type"]),
                )
                yield user_pb2.UserResponse(error=False, user=result)
        # 何も返さない条件が来てもエラーにならない

クライアント側の実装は下記です。

~/grpc/grpc_clientのget_users_by_type
def get_users_by_type(stub, user_type: int):
    # リクエストに使用するオブジェクト(ここでは「UserTypeRequest」型オブジェクト)を作成
    req = user_pb2.UserTypeRequest(user_type=user_type)
    responses = stub.GetUsersByType(req)
    response_list = []
    for r in responses:
        # 取得したレスポンスの表示
        pprint(r)  # debug
        response_list.append(r)
    return response_list

3. Client streaming RPC(Request Streaming RPC)

Client streaming RPCはクライアント側の投げてくるリクエストが複数(Streaming)でレスポンスがUnary(単一)の通信方式。

ここではユーザーid1つが入ったリクエストが複数投げられてくるので、users.jsonに存在するuserの持つidと比較して存在するユーザーidの数をカウントして返すCountAlreadyUsersメソッドを実装する。

~/grpc/servicers/user.pyのCountAlreadyUsers
    def CountAlreadyUsers(self, request_iter: Iterable[user_pb2.UserRequest], context):
        """
        Client streaming RPC(Request-streaming RPC)
        複数渡されたユーザーidのうち既に存在している人数を取得する
        """
        user_cnt = 0
        for request in request_iter:
            user_id = request.id
            # ユーザー情報はユーザーIDを文字列に変換したものをキーとする辞書型データ
            if str(user_id) in users:
                # 該当するユーザーが存在するならカウント
                user_cnt += 1

クライアント側の実装は下記です。

~/grpc/grpc_clientのcount_already_users
def count_already_users(stub, user_id_list: List[int]):
    # リクエストに使用するオブジェクト(ここでは「UserRequest」型オブジェクト)を複数作成
    req_list = []
    for user_id in user_id_list:
        req = user_pb2.UserRequest(id=user_id)
        req_list.append(req)
    response = stub.CountAlreadyUsers(iter(req_list))
    # 取得したレスポンスの表示
    pprint(response)
    return response

4. Bidirectional streaming RPC

Bidirectional streaming RPCはBidirectional(双方向)という語の通り、リクエストもレスポンスも両方複数(Streaming)の通信方式。

上記の図ではリクエストを受けきってからレスポンスを返す様になっているが、ピンポンのようにリクエストを1つ受けるごとにレスポンスを1つ返すという方式での実装でも問題ない。

今回はピンポン方式でリクエストから与えられたユーザーidと合致するidを持つユーザーをレスポンスとして返すGetUsersByIdsメソッドを実装する。

~/grpc/servicers/user.pyのGetUsersByIds
    def GetUsersByIds(self, request_iter: Iterable[user_pb2.UserRequest], context):
        """
        Bidrectional streaming RPC
        複数渡されたユーザーidのうち存在しているUserを返す
        """
        user_list = []
        for request in request_iter:
            user_id = request.id
            # ユーザー情報はユーザーIDを文字列に変換したものをキーとする辞書型データ
            if str(user_id) in users:
                user = users[str(user_id)]
                user_list.append(user)
        for user in user_list:
            result = user_pb2.User(
                id=user["id"],
                nickname=user["nickname"],
                mail_address=user["mail_address"],
                user_type=user_pb2.User.UserType.Value(user["user_type"]),
            )
            yield user_pb2.UserResponse(error=False, user=result)

クライアントの実装は下記です。

~/grpc/grpc_clientのget_users_by_ids
def get_users_by_ids(stub, user_id_list: List[int]):
    # リクエストに使用するオブジェクト(ここでは「UserRequest」型オブジェクト)を複数作成
    req_list = []
    for user_id in user_id_list:
        req = user_pb2.UserRequest(id=user_id)
        req_list.append(req)
    responses = stub.GetUsersByIds(iter(req_list))
    response_list = []
    for r in responses:
        # 取得したレスポンスの表示
        pprint(r)  # debug
        response_list.append(r)
    return response_list

参考

https://grpc.io/docs/what-is-grpc/core-concepts/

https://developers.google.com/protocol-buffers/docs/overview

https://knowledge.sakura.ad.jp/24059/

脚注
  1. ITの分野では、本物が用意できないときに動作に支障が無いようにとりあえず置いておく代用品という意味で使われることが多いようです。 ↩︎

  2. jsonを代替として使うことも可能です。 ↩︎

このスクラップは2022/11/29にクローズされました