👏

gRPCのstream方式でCSV出力処理を実装する

2022/05/21に公開

はじめに

マイクロサービス方式の開発で選定されることが多いgRPCを用いてCSV出力処理を実装してみました。

gRPCの通信方式

  • Unary RPCs
    シンプルな通信方式。1つのrequestに対して1つのresponseが返ってくる方式になります。gRPCの通信方式の中で一番REST方式に近いものになると思います。
  • Server streaming RPCs
    クライアントからの1つのrequestに対して、サーバーから複数のresponseを返すことができる通信方式になります。サーバーからクライアントに向けてサイズの大きいファイルなどを送信したい場合に使うことが多いと思います。
  • Client streaming RPCs
    クライアントからの複数のrequestに対して、サーバーから1つのresponseを返す通信方式になります。クライアントからサーバーに向けてサイズの大きいファイルなどをアップロードする際に使うことが多いと思います。
  • Bidirectional streaming RPCs
    クライアントからの複数のrequestに対して、サーバーから複数のresposeを返すことができる通信方式になります。私自身、こちらの通信方式を用いての実装を未だ行ったことがありません。今後ユースケースを探っていきたいと思います。

実装方針

今回はCSV出力データをサーバー側で作成し、クライアントに返す形式で実装を進めたいと思います。
CSV出力処理の大まかな流れは以下のようなイメージになります。

  • クライアント側からのリクエストを受信した後、サーバー側でCSV文字列をバイナリデータに変換してクライアントに送信。
  • サーバー側から送信されてきたバイナリデータをクライアント側でCSV文字列に変換後、指定したファイル名でダウンロード処理を実行。

CSV出力データのサイズが大きなものになるケースを想定して、 今回は、server streaming RPCs 方式で実装を進めます。

protocol buffersの定義

Server streaming RPCs を使用する場合には、protoのrpc関数定義箇所にて、以下のように stream という設定を追加する必要があります。
クライアントから送信するrequestのフィールド定義には、group_idを追加し、グループ条件を指定してオペレーター一覧を取得できる様にします。
一方でサーバーから送信するresponseのフィールド定義には、 file_name とCSV文字列の本体である csv_data をbyte型を指定して追加します。

sample.proto
service OperatorService {
  rpc OutputCsv(OutputCsvOperatorRequest) returns (stream OperatorCsvFile) {}
}


message OutputCsvOperatorRequest {
  int32 group_id = 1;
}
message OperatorCsvFile {
  string file_name = 1;
  bytes csv_data = 2;
}

server側の実装

先ほどprotocol buffersにてstrema方式で OutputCsv rpc関数を定義したと思います。
stream方式で定義されたrpc関数を呼び出すとサーバーとクライアント間で通信のコネクションが発生します。
サーバー側で stream.Send(&genproto.OperatorCsvFile{FieldName: data}) を1回呼び出すと、クライアント側に1つのレスポンスが送信されます。
stream.Sendを複数回呼び出すことでクライアントからの1回のrequestに対して複数のresponseを送信することができます。
この特徴を活かして、容量の大きなデータをいくつかに分割し、複数回のresponseとして送信するということが実現可能となります。

operator_service.go
package services

import (
	"backend-go/ent"
	"backend-go/ent/operatormaster"
	"backend-go/genproto"
	"context"
	"strconv"
	"time"
)

// OperatorService implements OperatorServiceServer
type OperatorService struct {
	client *ent.Client
	genproto.UnimplementedOperatorServiceServer
}

// NewOperatorService returns a new OperatorService
func NewOperatorService(client *ent.Client) *OperatorService {
	return &OperatorService{
		client: client,
	}
}

func (svc *OperatorService) OutputCsv(req *genproto.OutputCsvOperatorRequest, stream genproto.OperatorService_OutputCsvServer) error {
	var (
		err       error
		operators []*ent.OperatorMaster
	)

	m := svc.client.OperatorMaster.Query()

	department := req.GetDepartment()
	if department != genproto.OperatorDepartment_OPERARTOR_DEPARTMENT_NONE {
		m.Where(operatormaster.DepartmentEQ(uint32(department)))
	}

	availableFlg := req.GetAvailableFlg()
	if availableFlg != genproto.AvailableFlg(genproto.Operator_AVAILABLE_FLG_NONE) {
		m.Where(operatormaster.AuthorizationFlgEQ(int32(availableFlg)))
	}

	operators, err = m.All(stream.Context())

	if err != nil {
		return err
	}

	var csvData string

	fileName := "operator_list." + time.Now().Format("20060102") + ".csv"
	stream.Send(&genproto.OperatorCsvFile{FileName: fileName})

	csvData += "状態,ID,社員名,部署,第2所属部署,役職,有効日時,無効日時\n"
	for _, v := range operators {
		csvData +=
			convertAvailableFlg(genproto.Operator_AvailableFlg(v.AvailableFlg)) + "," +
				strconv.Itoa(int(v.ID)) + "," +
				v.OperatorName + "," +
				convertDepartment(genproto.OperatorDepartment(v.Department)) + "," +
				convertDepartment(genproto.OperatorDepartment(v.SubDepartment)) + "," +
				convertPosition(genproto.Operator_OperatorPosition(v.Position)) + "," +
				v.CreatedOn.String() + "," +
				v.StoppedOn.String() + "\n"
	}

	stream.Send(&genproto.OperatorCsvFile{CsvData: []byte(csvData)})

	return nil
}

client側の実装

クライアント側でfor文を回して複数のレスポンスを受け取る処理を書かなければいけないので、クライアント側の実装については少々面倒な印象を持ちました。
複数のレスポンスを受け取る処理に関しては、ヘルパー関数を定義して、抽象化したいところではあります。

operator_service_test.go
package services

import (
	"backend-go/genproto"
	"context"
	"io"
	"log"
	"os"
	"testing"

	_ "github.com/go-sql-driver/mysql"
	"github.com/stretchr/testify/assert"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
)

func TestOperatorServiceOutputCsv(t *testing.T) {
	if err := fixtures.Load(); err != nil {
		log.Fatalf("failed load fixtures: %v", err)
	}

	ctx := context.Background()
	conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(bufDialer), grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		t.Fatalf("failed connect grpc: %v", err)
	}
	defer conn.Close()

	client := genproto.NewOperatorServiceClient(conn)
	resp, err := client.OutputCsv(ctx, &genproto.OutputCsvOperatorRequest{})
	assert.Nil(t, err)

	var filePath string

	for {
		reply, err := resp.Recv()
		if err == io.EOF {
			break
		}
		if reply.FileName != "" {
			filePath = "../testdata/" + reply.FileName
			continue
		}

		if err != nil {
			t.Fatalf("failed receive csv data: %v", err)
		}
		f, _ := os.Create(filePath)
		defer f.Close()
		f.Write(reply.CsvData)
	}
	if err != nil {
		t.Fatalf("failed receive csv data: %v", err)
	}

	assert.FileExists(t, filePath)

	os.Remove(filePath)
}

終わりに

今回は、CSV出力処理をgRPCのServer streaming方式を用いて実装してみました。
出力するデータサイズが大きくなってくると、どうしてもダウンロード処理が重たくなってきてしまうので、クライアント側にストリーミング方式でresponseを並列で送信でできるのは非常に良いかなと思いました。
現在、Client streamingを用いて、クライアントからサーバーに対してファイルアップロード処理を行う処理の実装を進めている最中なので、実装が落ち着いたら記事にまとめたいと思います。

Discussion