gRPCのstream方式でCSV出力処理を実装する
はじめに
マイクロサービス方式の開発で選定されることが多いgRPCを用いてCSV出力処理を実装してみました。
gRPCの通信方式
- Unary RPCs
シンプルな通信方式。1つのrequestに対して1つのresponseが返ってくる方式になります。gRPCの通信方式の中で一番REST方式に近いものになると思います。 - Server streaming RPCs
クライアントからの1つのrequestに対して、サーバーから複数のresponseを返すことができる通信方式になります。サーバーからクライアントに向けてサイズの大きいファイルなどを送信したい場合に使うことが多いと思います。 - Client streaming RPCs
クライアントからの複数のrequestに対して、サーバーから1つのresponseを返す通信方式になります。クライアントからサーバーに向けてサイズの大きいファイルなどをアップロードする際に使うことが多いと思います。 - Bidirectional streaming RPCs
クライアントからの複数のrequestに対して、サーバーから複数のresposeを返すことができる通信方式になります。私自身、こちらの通信方式を用いての実装を未だ行ったことがありません。今後ユースケースを探っていきたいと思います。
実装方針
今回はCSV出力データをサーバー側で作成し、クライアントに返す形式で実装を進めたいと思います。
CSV出力処理の大まかな流れは以下のようなイメージになります。
- クライアント側からのリクエストを受信した後、サーバー側でCSV文字列をバイナリデータに変換してクライアントに送信。
- サーバー側から送信されてきたバイナリデータをクライアント側でCSV文字列に変換後、指定したファイル名でダウンロード処理を実行。
CSV出力データのサイズが大きなものになるケースを想定して、 今回は、server streaming RPCs
方式で実装を進めます。
protocol buffersの定義
Server streaming RPCs
を使用する場合には、protoのrpc関数定義箇所にて、以下のように stream
という設定を追加する必要があります。
クライアントから送信するrequestのフィールド定義には、group_id
を追加し、グループ条件を指定してオペレーター一覧を取得できる様にします。
一方でサーバーから送信するresponseのフィールド定義には、 file_name
とCSV文字列の本体である csv_data
をbyte型を指定して追加します。
service OperatorService {
rpc OutputCsv(OutputCsvOperatorRequest) returns (stream OperatorCsvFile) {}
}
message OutputCsvOperatorRequest {
int32 group_id = 1;
}
message OperatorCsvFile {
string file_name = 1;
bytes csv_data = 2;
}
server側の実装
先ほどprotocol buffers
にてstrema方式で OutputCsv
rpc関数を定義したと思います。
stream方式で定義されたrpc関数を呼び出すとサーバーとクライアント間で通信のコネクションが発生します。
サーバー側で stream.Send(&genproto.OperatorCsvFile{FieldName: data})
を1回呼び出すと、クライアント側に1つのレスポンスが送信されます。
stream.Send
を複数回呼び出すことでクライアントからの1回のrequestに対して複数のresponseを送信することができます。
この特徴を活かして、容量の大きなデータをいくつかに分割し、複数回のresponseとして送信するということが実現可能となります。
package services
import (
"backend-go/ent"
"backend-go/ent/operatormaster"
"backend-go/genproto"
"context"
"strconv"
"time"
)
// OperatorService implements OperatorServiceServer
type OperatorService struct {
client *ent.Client
genproto.UnimplementedOperatorServiceServer
}
// NewOperatorService returns a new OperatorService
func NewOperatorService(client *ent.Client) *OperatorService {
return &OperatorService{
client: client,
}
}
func (svc *OperatorService) OutputCsv(req *genproto.OutputCsvOperatorRequest, stream genproto.OperatorService_OutputCsvServer) error {
var (
err error
operators []*ent.OperatorMaster
)
m := svc.client.OperatorMaster.Query()
department := req.GetDepartment()
if department != genproto.OperatorDepartment_OPERARTOR_DEPARTMENT_NONE {
m.Where(operatormaster.DepartmentEQ(uint32(department)))
}
availableFlg := req.GetAvailableFlg()
if availableFlg != genproto.AvailableFlg(genproto.Operator_AVAILABLE_FLG_NONE) {
m.Where(operatormaster.AuthorizationFlgEQ(int32(availableFlg)))
}
operators, err = m.All(stream.Context())
if err != nil {
return err
}
var csvData string
fileName := "operator_list." + time.Now().Format("20060102") + ".csv"
stream.Send(&genproto.OperatorCsvFile{FileName: fileName})
csvData += "状態,ID,社員名,部署,第2所属部署,役職,有効日時,無効日時\n"
for _, v := range operators {
csvData +=
convertAvailableFlg(genproto.Operator_AvailableFlg(v.AvailableFlg)) + "," +
strconv.Itoa(int(v.ID)) + "," +
v.OperatorName + "," +
convertDepartment(genproto.OperatorDepartment(v.Department)) + "," +
convertDepartment(genproto.OperatorDepartment(v.SubDepartment)) + "," +
convertPosition(genproto.Operator_OperatorPosition(v.Position)) + "," +
v.CreatedOn.String() + "," +
v.StoppedOn.String() + "\n"
}
stream.Send(&genproto.OperatorCsvFile{CsvData: []byte(csvData)})
return nil
}
client側の実装
クライアント側でfor文を回して複数のレスポンスを受け取る処理を書かなければいけないので、クライアント側の実装については少々面倒な印象を持ちました。
複数のレスポンスを受け取る処理に関しては、ヘルパー関数を定義して、抽象化したいところではあります。
package services
import (
"backend-go/genproto"
"context"
"io"
"log"
"os"
"testing"
_ "github.com/go-sql-driver/mysql"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
func TestOperatorServiceOutputCsv(t *testing.T) {
if err := fixtures.Load(); err != nil {
log.Fatalf("failed load fixtures: %v", err)
}
ctx := context.Background()
conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(bufDialer), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("failed connect grpc: %v", err)
}
defer conn.Close()
client := genproto.NewOperatorServiceClient(conn)
resp, err := client.OutputCsv(ctx, &genproto.OutputCsvOperatorRequest{})
assert.Nil(t, err)
var filePath string
for {
reply, err := resp.Recv()
if err == io.EOF {
break
}
if reply.FileName != "" {
filePath = "../testdata/" + reply.FileName
continue
}
if err != nil {
t.Fatalf("failed receive csv data: %v", err)
}
f, _ := os.Create(filePath)
defer f.Close()
f.Write(reply.CsvData)
}
if err != nil {
t.Fatalf("failed receive csv data: %v", err)
}
assert.FileExists(t, filePath)
os.Remove(filePath)
}
終わりに
今回は、CSV出力処理をgRPCのServer streaming
方式を用いて実装してみました。
出力するデータサイズが大きくなってくると、どうしてもダウンロード処理が重たくなってきてしまうので、クライアント側にストリーミング方式でresponseを並列で送信でできるのは非常に良いかなと思いました。
現在、Client streaming
を用いて、クライアントからサーバーに対してファイルアップロード処理を行う処理の実装を進めている最中なので、実装が落ち着いたら記事にまとめたいと思います。
Discussion