💬

gRPCの4つの通信方式をPythonでやってみる

2022/11/29に公開

gRPCの4つの通信方式の理解を深めるため実際に実装を行っていきます。
既にGo言語で書かれた記事が多くあったので本記事ではPythonで進めていきます。

コードはGitHubのKumamoto-Hamachi/rpc_practicezenn-articleブランチ側に今回の記事のコードを上げています。またmainブランチには発展的な内容として複数Servicerがある場合のコードも上げてあります。

本記事ではクライアント側の実装も行っていきますが、実際の開発でgRPCの動作確認をする際にはevansBloomRPC等のツールを利用するのが楽なのでオススメです。
※evansやBloomRPCの解説はgRPCの動作確認にはBloomRPCとevansが便利!という話で行っているのでそちらを参考にしてみてください。

またgRPCについて全く聞いたこともないというレベルの方はまずさくらインターネットさんの出されているサービス間通信のための新技術「gRPC」入門が非常に分かりやすいのでオススメです。(本記事のUnary RPCの実装でも参考にさせていただいています。)

前提

gRPCとはなにか

a client application can directly call a method on a server application on a different machine as if it were a local object, making it easier for you to create distributed applications and services.
gRPC is based around the idea of defining a service, specifying the methods that can be called remotely with their parameters and return types.

Introduction to gRPC | gRPCより

gRPCはリモートプロシージャコール(RPC)システムの1つ。RPCとは遠隔手続き呼出しのことで簡単に言えば「違うサーバーにある関数を実行させる仕組み」のことで、呼び出す側(クライアント)と呼び出される側(サーバー)に別れているクライアント-サーバーモデル方式を取っています。

On the client side, the client has a stub (referred to as just a client in some languages) that provides the same methods as the server.
クライアントはサーバー側と同じメソッドを提供してくれるスタブを持っている。

クライアントはサーバー側の関数を呼び出す際に使うスタブ[1]というメソッドを持っています。

Protocol Buffersとはなにか

gRPCはProtocol Buffersをデフォルトのメッセージのインタフェースフォーマットとしています。[2]

Protocol buffers provide a language-neutral, platform-neutral, extensible mechanism for serializing structured data in a forward-compatible and backward-compatible way. It’s like JSON, except it's smaller and faster, and it generates native language bindings.
Protocol buffers are a combination of the definition language (created in .proto files), the code that the proto compiler generates to interface with data, language-specific runtime libraries, and the serialization format for data that is written to a file (or sent across a network connection).

Protocol BuffersとはIDL(Interface Definition Language/インターフェース定義言語)でデータの構造を定義するシリアライズのためのフォーマットで、データを永続的に保存したりネットワーク通信でデータをやり取りする際に使われます。

難しく書きましたが要はデータがどんな型を持っているか、どういう構造をしているのか簡単に書き表せる言語だということです。

後々具体例でProtocol Buffersを書いていきますが、イメージとしては下記のようにやり取りするメッセージやメソッドの引数・返り値の型を定義していくのに用います。

// メッセージの型構造のイメージ(enumとかmapとか便利なのもあるよ)
message <定義するメッセージ型の名前> {
  <型> <フィールド名1> = <そのフィールドに紐づけるフィールド番号>;
  <型> <フィールド名2> = <そのフィールドに紐づけるフィールド番号>;
  <型> <フィールド名3> = <そのフィールドに紐づけるフィールド番号>;
  :
  :
}

4つの通信方式

unaryとstreamingの2種類がありそれをclientとserverそれぞれがどちらを選択するか、つまり2*2=4通りの通信方式があります。

0. 前提となる構成と実装

今回は下記のようなディレクトリ・ファイル構成で進めていきます。

$ cd ~
$ tree
├── Dockerfile
├── docker-compose.yml
├── grpc
│   ├── client
│   │   └── user.py
│   ├── grpc_client.py
│   ├── grpc_manager.py
│   ├── json_data
│   │   └── users.json
│   ├── proto.bash
│   ├── protos
│   │   ├── __init__.py
│   │   └── user.proto
│   ├── servicers
│   │   ├── __init__.py
│   │   └── user.py
│   ├── user_pb2.py
│   └── user_pb2_grpc.py
├── poetry.lock
└── pyproject.toml

またDBの簡易的な代替としてjsonファイルを用意することにします。

~/json_data/users.json
{
	"1": {
		"id": 1,
		"nickname": "admin",
		"mail_address": "admin@example.com",
		"user_type": "ADMINISTRATOR"
	},
	"2": {
		"id": 2,
		"nickname": "guest",
		"mail_address": "guest@example.com",
		"user_type": "GUEST"
	},
	"3": {
		"id": 3,
		"nickname": "foo",
		"mail_address": "foo@example.com",
		"user_type": "NORMAL"
	},
	"4": {
		"id": 4,
		"nickname": "troll",
		"mail_address": "troll@example.com",
		"user_type": "DISABLED"
	},
	"5": {
		"id": 5,
		"nickname": "Zaru",
		"mail_address": "zaru@example.com",
		"user_type": "NORMAL"
	},
	"6": {
		"id": 6,
		"nickname": "Saru",
		"mail_address": "saru@example.com",
		"user_type": "NORMAL"
	}
}

サービサーを登録・動作させる統御用のスクリプトを実装します。
※gRPCリフレクション云々とある箇所については初心者の方は気にせずで大丈夫です。

~/grpc/grpc_manager.py
# gRPC serverに登録するservicer
from servicers.user import UserManager
# gRPCのサーバー実装ではThreadPoolを利用する
from concurrent.futures import ThreadPoolExecutor

# 「grpc」パッケージと、grpc_tools.protocによって生成したパッケージをimportする
import grpc
import user_pb2
import user_pb2_grpc

# grpc reflection用の追加ライブラリ
from grpc_reflection.v1alpha import reflection


def manager():
    # Serverオブジェクトを作成する
    server = grpc.server(ThreadPoolExecutor(max_workers=2))

    # Serverオブジェクトに定義したServicerクラスを登録する
    user_pb2_grpc.add_UserManagerServicer_to_server(UserManager(), server)

    # [追記] リフレクション登録
    SERVICE_NAMES = (
        reflection.SERVICE_NAME,
    )
    SERVICE_NAMES += (user_pb2.DESCRIPTOR.services_by_name[UserManager.__name__].full_name,)
    reflection.enable_server_reflection(SERVICE_NAMES, server)

    # 1234番ポートで待ち受けするよう指定する
    server.add_insecure_port("[::]:1234")

    # 待ち受けを開始する
    server.start()

    # 待ち受け終了後の後処理を実行する
    server.wait_for_termination()

if __name__ == "__main__":
    manager()

サービサーを叩くクライアント側の下準備も行っていきます。
※実際のメソッドは次のセクションで実装してきます。

~/grpc/grpc_client.py
from typing import List
from pprint import pprint
import sys

# 「grpc」パッケージと、protocによって生成したパッケージをimportする
import grpc
import user_pb2
import user_pb2_grpc


def main():
    # サーバーに接続する
    channel = grpc.insecure_channel("localhost:1234")
    # 送信先の「stub」を作成する
    stub = user_pb2_grpc.UserManagerStub(channel)
    # それぞれのリクエストを送信する
    if sys.argv[1] == "GetUser":
        res = get_user(stub=stub, user_id=int(sys.argv[2]))
    elif sys.argv[1] == "AddUser":
        res = add_user(
            stub=stub,
            user_id=int(sys.argv[2]),
            nickname=sys.argv[3],
            mail_address=sys.argv[4],
            user_type=int(sys.argv[5]),
        )
    elif sys.argv[1] == "CountAlreadyUsers":
        user_id_list = [int(sys.argv[2]), int(sys.argv[3])]
        res = count_already_users(stub=stub, user_id_list=user_id_list)
    elif sys.argv[1] == "GetUsersByType":
        res = get_users_by_type(stub=stub, user_type=int(sys.argv[2]))
    elif sys.argv[1] == "GetUsersByIds":
        user_id_list = [int(sys.argv[2]), int(sys.argv[3])]
        res = get_users_by_ids(stub=stub, user_id_list=user_id_list)
    return res


if __name__ == "__main__":
    main()

protoファイル内でmessageやserviceの定義も行います。

~/grpc/protos/user.proto
syntax = "proto3";

// ユーザー情報を表すメッセージ型
message User {
  uint32 id = 1;
  string nickname = 2;
  string mail_address = 3;
  enum UserType {
    NONE = 0;
    NORMAL = 1;
    ADMINISTRATOR = 2;
    GUEST = 3;
    DISABLED = 4;
  }
  UserType user_type = 4;
}

// ユーザーの情報リクエストに使用するメッセージ型
message UserRequest {
  uint32 id = 1;
}

// ユーザーの情報リクエストに使用するメッセージ型
message UserTypeRequest {
  enum UserType {
    NONE = 0;
    NORMAL = 1;
    ADMINISTRATOR = 2;
    GUEST = 3;
    DISABLED = 4;
  }
  UserType user_type = 4;
}

// ユーザー情報を返す際に使用するメッセージ型
message UserResponse {
  bool error = 1;
  string message = 2;
  User user = 3;
}

// ユーザー人数を返す際に使用するメッセージ型
message UserCntResponse {
  bool error = 1;
  uint32 user_cnt = 2;
}

// ユーザー管理を行うサービス
service UserManager {
  // ユーザー情報を取得する
  rpc GetUser (UserRequest) returns (UserResponse) {}
  // 新規のユーザー情報を追加する
  rpc AddUser (User) returns (UserResponse) {}
  // 与えられたユーザーのうち存在する人数を取得する
  rpc CountAlreadyUsers (stream UserRequest) returns (UserCntResponse) {}
  // 与えられたユーザータイプと同種のユーザーを取得する
  rpc GetUsersByType (UserTypeRequest) returns (stream UserResponse) {}
  rpc GetUsersByIds (stream UserRequest) returns (stream UserResponse) {}
}

上記のprotoファイルをprotocでコンパイルすることでmessageが記述されるuser_pb2.py、serviceが記述されるuser._pb2_grpc.pyを生成します。

$ python -m grpc_tools.protoc -I./protos --python_out=. --grpc_python_out=. ./protos/user.proto

最後の下準備としてサービサーのメソッドを入れるクラスを用意します。

from google.protobuf import json_format
from typing import List
from collections.abc import Iterable
import json


# 「grpc」パッケージと、grpc_tools.protocによって生成したパッケージをimportする
import user_pb2
import user_pb2_grpc


# ユーザー情報の読み込み
USER_DB = "./json_data/users.json"
with open(USER_DB, mode="r") as fp:
    users = json.load(fp)


class UserManager(user_pb2_grpc.UserManagerServicer):
    """
    サービス定義から生成されたクラスを継承して、
    定義したリモートプロシージャに対応するメソッドを実装する。
    クライアントが引数として与えたメッセージに対応するオブジェクト
    context引数にはRPCに関する情報を含むオブジェクトが渡される
    """

    #この下にメソッドを実装していく

1. Unary RPC(Simple RPC)


client、server共にunary(単一)の通信方式。

ここではuserの取得を行うGetUserとuserの追加を行うAddUserを実装していきます。

~/grpc/servicers/user.pyのGetUser
class UserManager(user_pb2_grpc.UserManagerServicer):

    def GetUser(self, request: user_pb2.UserRequest, context):
        """
        Unary RPC(Simple RPC)
        ユーザー情報を取得する
        """
        # クライアントが送信した引数はrequest引数に格納され、
        # このオブジェクトに対しては一般的なPythonオブジェクトと
        # 同様の形でプロパティにアクセスできる
        user_id = request.id

        # ユーザー情報はユーザーIDを文字列に変換したものをキーとする辞書型データ
        if str(user_id) not in users:
            # 該当するユーザーが存在しない場合エラーを返す
            return user_pb2.UserResponse(error=True, message="not found")
        user = users[str(user_id)]

        # 戻り値として返すUserオブジェクトを作成する
        result = user_pb2.User(
            id=user["id"],
            nickname=user["nickname"],
            mail_address=user["mail_address"],
            user_type=user_pb2.User.UserType.Value(user["user_type"]),
        )
        print("result.user_type", result.user_type, type(result.user_type))  # debug

        # UserResponseオブジェクトを返す
        return user_pb2.UserResponse(error=False, user=result)
~/grpc/servicers/user.pyのAddUser
class UserManager(user_pb2_grpc.UserManagerServicer):

    def AddUser(self, request: user_pb2.User, context):
        """
        Unary RPC(Simple RPC)
        新規にユーザー情報を登録する
        """
        # クライアントが送信した引数はrequest引数に格納され、
        # このオブジェクトに対しては一般的なPythonオブジェクトと
        # 同様の形でプロパティにアクセスできる
        user_id = request.id

        # ユーザー情報はユーザーIDを文字列に変換したものをキーとする辞書型データ
        # なので、適宜文字列型に変換して使用している
        if str(user_id) in users:
            # 該当するユーザーが既に存在している場合エラーを返す
            return user_pb2.UserResponse(error=True, message="already exist")

        # 新規登録用及び戻り値として返すUserオブジェクトを作成する
        result = user_pb2.User(
            id=request.id,
            nickname=request.nickname,
            mail_address=request.mail_address,
            user_type=request.user_type,
        )
        # preserving_proto_field_nameでcamelCaseがsnake_caseに変換される
        users[str(request.id)] = json_format.MessageToDict(
            result, preserving_proto_field_name=True
        )
        with open(USER_DB, mode="w") as f:
            json.dump(users, f)

        # UserResponseオブジェクトを返す
        return user_pb2.UserResponse(error=False, user=result)

クライアント側はそれぞれ下記のような実装です。

::: ~/grpc/grpc_clientのget_user

def get_user(stub, user_id: int):
    # リクエストに使用するオブジェクト(ここでは「UserRequest」型オブジェクト)を作成
    req = user_pb2.UserRequest(id=user_id)
    response = stub.GetUser(req)
    # 取得したレスポンスの表示
    pprint(response)
    return response

:::

::: ~/grpc/grpc_clientのadd_user

def add_user(stub, user_id: int, nickname: str, mail_address: str, user_type: int):
    # リクエストに使用するオブジェクト(ここでは「User」型オブジェクト)を作成
    req = user_pb2.User(
        id=user_id, nickname=nickname, mail_address=mail_address, user_type=user_type
    )
    response = stub.AddUser(req)
    # 取得したレスポンスの表示
    pprint(response)
    return response

:::

2. Server streaming RPC(Response Streaming RPC)

Server streaming RPCはクライアント側の投げてくるリクエストはUnary(単一)でレスポンス側が複数(Streaming)の通信方式。

ここでは渡されたtypeと同一のuserを全てを取得した上でそのuser1つ1つをレスポンスに詰めて投げるGetUsersByTypeを実装します。

~/grpc/servicers/user.pyのGetUsersByType
class UserManager(user_pb2_grpc.UserManagerServicer):

    def GetUsersByType(self, request: user_pb2.UserTypeRequest, context):
        """
        Server streaming RPC(Response-streaming RPC)
        渡されたユーザータイプと同じユーザーを複数返す
        """
        user_id_list = [u for u in users]
        print("user_id_list", user_id_list)  # debug
        result_list = []
        for u_id in user_id_list:
            user = users[u_id]
            # Name(数字), Value(NORMAL)とか
            if user_pb2.User.UserType.Name(request.user_type) == user["user_type"]:
                print(f"{user['id']}---該当あり---")
                result = user_pb2.User(
                    id=user["id"],
                    nickname=user["nickname"],
                    mail_address=user["mail_address"],
                    user_type=user_pb2.User.UserType.Value(user["user_type"]),
                )
                yield user_pb2.UserResponse(error=False, user=result)
        # 何も返さない条件が来てもエラーにならない

クライアント側の実装は下記です。

~/grpc/grpc_clientのget_users_by_type
def get_users_by_type(stub, user_type: int):
    # リクエストに使用するオブジェクト(ここでは「UserTypeRequest」型オブジェクト)を作成
    req = user_pb2.UserTypeRequest(user_type=user_type)
    responses = stub.GetUsersByType(req)
    response_list = []
    for r in responses:
        # 取得したレスポンスの表示
        pprint(r)  # debug
        response_list.append(r)
    return response_list

3. Client streaming RPC(Request Streaming RPC)

Client streaming RPCはクライアント側の投げてくるリクエストが複数(Streaming)でレスポンスがUnary(単一)の通信方式。

ここではユーザーid1つが入ったリクエストが複数投げられてくるので、users.jsonに存在するuserの持つidと比較して存在するユーザーidの数をカウントして返すCountAlreadyUsersメソッドを実装する。

~/grpc/servicers/user.pyのCountAlreadyUsers
    def CountAlreadyUsers(self, request_iter: Iterable[user_pb2.UserRequest], context):
        """
        Client streaming RPC(Request-streaming RPC)
        複数渡されたユーザーidのうち既に存在している人数を取得する
        """
        user_cnt = 0
        for request in request_iter:
            user_id = request.id
            # ユーザー情報はユーザーIDを文字列に変換したものをキーとする辞書型データ
            if str(user_id) in users:
                # 該当するユーザーが存在するならカウント
                user_cnt += 1

クライアント側の実装は下記です。

~/grpc/grpc_clientのcount_already_users
def count_already_users(stub, user_id_list: List[int]):
    # リクエストに使用するオブジェクト(ここでは「UserRequest」型オブジェクト)を複数作成
    req_list = []
    for user_id in user_id_list:
        req = user_pb2.UserRequest(id=user_id)
        req_list.append(req)
    response = stub.CountAlreadyUsers(iter(req_list))
    # 取得したレスポンスの表示
    pprint(response)
    return response

4. Bidirectional streaming RPC

Bidirectional streaming RPCはBidirectional(双方向)という語の通り、リクエストもレスポンスも両方複数(Streaming)の通信方式。

上記の図ではリクエストを受けきってからレスポンスを返す様になっているが、ピンポンのようにリクエストを1つ受けるごとにレスポンスを1つ返すという方式での実装でも問題ない。

今回はピンポン方式でリクエストから与えられたユーザーidと合致するidを持つユーザーをレスポンスとして返すGetUsersByIdsメソッドを実装する。

~/grpc/servicers/user.pyのGetUsersByIds
    def GetUsersByIds(self, request_iter: Iterable[user_pb2.UserRequest], context):
        """
        Bidrectional streaming RPC
        複数渡されたユーザーidのうち存在しているUserを返す
        """
        user_list = []
        for request in request_iter:
            user_id = request.id
            # ユーザー情報はユーザーIDを文字列に変換したものをキーとする辞書型データ
            if str(user_id) in users:
                user = users[str(user_id)]
                user_list.append(user)
        for user in user_list:
            result = user_pb2.User(
                id=user["id"],
                nickname=user["nickname"],
                mail_address=user["mail_address"],
                user_type=user_pb2.User.UserType.Value(user["user_type"]),
            )
            yield user_pb2.UserResponse(error=False, user=result)

クライアントの実装は下記です。

~/grpc/grpc_clientのget_users_by_ids
def get_users_by_ids(stub, user_id_list: List[int]):
    # リクエストに使用するオブジェクト(ここでは「UserRequest」型オブジェクト)を複数作成
    req_list = []
    for user_id in user_id_list:
        req = user_pb2.UserRequest(id=user_id)
        req_list.append(req)
    responses = stub.GetUsersByIds(iter(req_list))
    response_list = []
    for r in responses:
        # 取得したレスポンスの表示
        pprint(r)  # debug
        response_list.append(r)
    return response_list

参考

https://grpc.io/docs/what-is-grpc/core-concepts/

https://developers.google.com/protocol-buffers/docs/overview

https://knowledge.sakura.ad.jp/24059/

脚注
  1. ITの分野では、本物が用意できないときに動作に支障が無いようにとりあえず置いておく代用品という意味で使われることが多いようです。 ↩︎

  2. jsonを代替として使うことも可能です。 ↩︎

GitHubで編集を提案

Discussion