💊

アニメで分かるgRPC

2024/02/18に公開

TL;DR

  • gRPCを使ってアニメラジオの新着情報をSlackに通知します
  • gRPC ClientをGoで構築します
  • gRPC ServerをPythonで構築します

はじめに

どうも、アニメマスターです!

ところで皆さん、アニメラジオっていつ更新されてるか分からなくないですか?
わたしは分かりません!

ということで、今回はアニメラジオが更新されたらSlackに通知するシステムを、わざわざgRPCを使って世界一無駄に構築していきたいと思います!!

概観

01-architecture
GoでYouTube Data APIからアニメラジオの情報を取得し、クライアントからストリーミングでPythonサーバにリクエストを送り、送られた情報をSlackへ通知します。

1 gRPCサービスの作成

gRPCはProtocol BuffersHTTP/2を用いたRPCフレームワークです。

gRPCがそもそもなにか分からない人は公式や、日本の記事ではこちらが分かりやすかったので適宜ご確認ください。

1-1. protoファイルの作成

それでは今回のプロジェクト用に以下のようなディレクトリを作成していきます。

.
├── api
│   └── proto
├── client
│   ├── api
│   ├── gen
│   └── variable
└── server
    ├── api
    ├── gen

はじめに.protoという拡張子のProtocol Buffersのファイルを作成していきましょう。

anime_radio.ptoto
api/proto/anime_radio.proto
syntax = "proto3";

option go_package = "./";

service AnimeRadioService {
  rpc SendAnimeRadioInfo (stream YouTubeInfo) returns (SlackResponse);
}

message YouTubeInfo {
  string title = 1;
  string url = 2;
}

message SlackResponse {
  string result = 1;
}

今回はAnimeRadioServiceというサービスの中にSendAnimeRadioInfoという関数を一つだけ作成します。
この関数は引数としてYouTubeInfoを、戻り値としてSlackResponseを返します。YouTubeInfoの前につくstreamはストリーミング通信を表し、クライアントから並行して複数のリクエストが走ることを意味しています。
messageからはじまる2つのオブジェクトは引数と戻り値の型を表しています。

ファイルができたら、api/proto配下に格納してください。

1-2. rpcの自動生成

それでは先ほど作成したprotoファイルからクライアントやサーバで利用するパッケージを自動生成していきましょう。
PythonとGoでそれぞれ事前にインストールするライブラリがあるので、まずはそれを用意します。

PythonはPoetryを使ってプロジェクトを構成していくのでPoetryをインストールしていない人はこちらに従ってインストールしてください。
ただ、必ずしもパッケージマネージャを使う必要はないので、インストールしたくない方はpipを使ってもらうだけで大丈夫です。

Poetryを使う人はプロジェクトを初期化して、必要なライブラリを追加していきましょう。

# ./server
$ cd server
# プロジェクトの初期化
$ poetry init
# ライブラリの追加
$ poetry add grpcio-tools

次にGoです。

# ./client
$ cd client
# プロジェクトの初期化
$ go mod init
# モジュールのインストール
$ go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
$ go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

それぞれ必要なライブラリが用意できたら以下のコマンドを実行して、ファイルを自動生成します。

# server/に移動して実行
$ cd server && poetry run python -m grpc_tools.protoc -I.api/proto --python_out=gen --grpc_python_out=.gen .api/proto/anime_radio.proto
# から実行
$ protoc api/proto/anime_radio.proto --go_out=client/gen --go-grpc_out=client/gen

serverとclientにそれぞれ以下のファイルが生成されたことを確認できたらOKです。

# ./server
gen
├── anime_radio_pb2.py
└── anime_radio_pb2_grpc.py
# ./client
gen
├── anime_radio.pb.go
└── anime_radio_grpc.pb.go

2. GoでのgRPC Clientの作成

gRPC ClientではYouTubeにキーワード検索し、取得されたデータをgRPC Serverにストリーミングで送信する処理を実装します。

2-1. YouTube Data API用パッケージの作成

まずclient/api配下にYouTube Data API用のパッケージを作成します。

YouTubeのAPIを利用するにはAPI KEYを発行する必要があるので、このあたりを参考にGCPから認証情報を取得してください。

client/api/youtube.go
client/api/youtube.go
package api

import (
	"context"
	"log"

	"google.golang.org/api/option"
	"google.golang.org/api/youtube/v3"

	pb "github.com/miyuki-starmiya/anime-radio-grpc/gen"
)

type YouTubeClient struct {
	Service *youtube.Service
}

func NewYouTubeClient() *YouTubeClient {
	// init youtube client
	apiKey := os.Getenv("YOUTUBE_API_KEY")
	service, err := youtube.NewService(context.Background(), option.WithAPIKey(apiKey))
	if err != nil {
		log.Printf("Error: %v", err)
		return nil
	}

	return &YouTubeClient{
		Service: service,
	}
}

func (yc *YouTubeClient) SearchByKeyword(keyword string) ([]pb.YouTubeInfo, error) {
	yesterdayStr := time.Now().Add(time.Duration(-24) * time.Hour).Format("2006-01-02T15:04:05Z")
	call := yc.Service.Search.List([]string{"id", "snippet"})
	call = call.Q(keyword).MaxResults(1).PublishedAfter(yesterdayStr)
	response, err := call.Do()
	if err != nil {
		return nil, err
	}

	// create response
	var result []pb.YouTubeInfo
	for _, item := range response.Items {
		result = append(result, pb.YouTubeInfo{
			Title: item.Snippet.Title,
			Url:   "https://www.youtube.com/watch?v=" + item.Id.VideoId,
		})
		log.Printf("Title: %s, URL: %s", item.Snippet.Title, "https://www.youtube.com/watch?v="+item.Id.VideoId)
	}

	return result, nil
}

まずNewYouTubeClientで、google.golang.org/api/youtube/v3モジュールを利用して、YouTubeの動画をキーワード検索するファクトリー関数を作成しています。

次に、SearchByKeywordメソッドを作成していて、自動生成した型([]pb.YouTubeInfo)のスライスをresultとして格納し、それを戻り値として返しています。
callの部分で詳細な検索条件を設定することができ、検索件数をキーワードごとに1件、現在時間から1日前までのもののみを取得しています。

また、os.Getenv()で環境変数を参照しているので、.env等に以下の変数を格納しておきます。

.env
YOUTUBE_API_KEY=<YOUR_API_KEY>

次に、検索キーワードを格納するパッケージを作成します。ここは完全にお好みです。

client/variable/youtube.go
package variable

var (
	AnimeRadios = []string{
		"100カノRADIO",
		"ぼっち・ざ・らじお!",
		`姫様\“ラジオ\”の時間です`,
		"まほあこラジオ",
		"どさこいラジオ",
		"スナックバス江 おもてなしラジオ",
	}
	VoiceActressRadios = []string{
		"楠木ともりを灯せていますか?",
	}
)

アニメラジオはクールをまたいで放送されているものも多く、わたしはクール外で100カノやぼっち・ざ・ろっく!のラジオを聴いています。
声優ラジオはあまり聴けておらず、楠木ともりさんくらいしか聴いていないので、オススメがあれば是非教えてください。

2-2. mainパッケージの実装

mainパッケージを以下の様に実装していきます。

main関数
client/main.go
package main

import (
	"context"
	"fmt"
	"log"
	"sync"
	"time"

  _ "github.com/joho/godotenv/autoload"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"

	"github.com/miyuki-starmiya/anime-radio-grpc/api"
	pb "github.com/miyuki-starmiya/anime-radio-grpc/gen"
	"github.com/miyuki-starmiya/anime-radio-grpc/variable"
)

const (
	retrySecond    = 30
	maxRetryMinute = 10
)

// Client stream
func SendAnimeRadioInfo(client pb.AnimeRadioServiceClient, dataItems []pb.YouTubeInfo, ctx context.Context) {
	stream, err := client.SendAnimeRadioInfo(ctx)
	if err != nil {
		log.Printf("Error: %v", err)
		return
	}

	for _, data := range dataItems {
		if err := stream.Send(&data); err != nil {
			log.Printf("Failed to send data: %v", err)
			return
		}
	}

	response, err := stream.CloseAndRecv()
	if err != nil {
		log.Printf("Error: %v", err)
		return
	}
	log.Printf("Response: %s", response.GetResult())
}

func connectWithRetry(serverAddress string) (*grpc.ClientConn, error) {
	var conn *grpc.ClientConn
	var err error

	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(maxRetryMinute)*time.Minute)
	defer cancel()

	for {
		conn, err = grpc.DialContext(ctx, serverAddress, grpc.WithInsecure())
		if err != nil {
			if status.Code(err) == codes.DeadlineExceeded {
				// Deadline exceeded, stop retrying
				log.Printf("Deadline %d minutes exceeded: %v", maxRetryMinute, err)
				return nil, err
			}
			// Retry
			log.Printf("Retry due to: %v", err)
			time.Sleep(time.Duration(retrySecond) * time.Second)
			continue
		}
		// Success
		break
	}
	return conn, nil
}

func main() {
	yc := api.NewYouTubeClient()

	var wg sync.WaitGroup
	keywords := append(variable.AnimeRadios, variable.VoiceActressRadios...)
	resChan := make(chan pb.YouTubeInfo, len(keywords))

	// Concurrently search YouTube data
	for _, keyword := range keywords {
		wg.Add(1)
		go func(kw string) {
			defer wg.Done()
			dataItems, err := yc.SearchByKeyword(kw)
			if err != nil {
				log.Printf("Error: %v", err)
				return
			}
			if len(dataItems) > 0 {
				resChan <- dataItems[0]
			}
		}(keyword)
	}

	// Wait for all goroutines until done
	go func() {
		wg.Wait()
		close(resChan)
	}()

	var youTubeInfos []pb.YouTubeInfo
	for data := range resChan {
		youTubeInfos = append(youTubeInfos, data)
	}
	log.Printf("youTubeInfos: %v", youTubeInfos)

	// Create connection to gRPC server
  host := "localhost"
	port := 8080
	serverAddress := fmt.Sprintf("%s:%d", host, port)
	log.Printf("Server address: %s", serverAddress)
	conn, err := connectWithRetry(serverAddress)
	if err != nil {
		log.Printf("Failed to connect: %v", err)
	}
	defer conn.Close()

	// After 10 minutes, cancel the client request
	client := pb.NewAnimeRadioServiceClient(conn)
	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(maxRetryMinute)*time.Minute)
	defer cancel()

	SendAnimeRadioInfo(client, youTubeInfos, ctx)
}

mainパッケージでは関数を3つ実装しています。順番に見ていきましょう。

1つ目の関数はSendAnimeRadioInfoです。

ここではclient/gen/anime_radio_grpc.pb.goで作られた同名のSendAnimeRadioInfo関数を定義します。

まず自動生成されたパッケージのSendAnimeRadioInfoメソッドを見てみましょう。

client/gen/anime_radio_grpc.pb.go
func (c *animeRadioServiceClient) SendAnimeRadioInfo(ctx context.Context, opts ...grpc.CallOption) (AnimeRadioService_SendAnimeRadioInfoClient, error) {
	stream, err := c.cc.NewStream(ctx, &AnimeRadioService_ServiceDesc.Streams[0], AnimeRadioService_SendAnimeRadioInfo_FullMethodName, opts...)
	if err != nil {
		return nil, err
	}
	x := &animeRadioServiceSendAnimeRadioInfoClient{stream}
	return x, nil
}

引数としてコンテキストとオプションを受け取り、ストリーミング通信のクライアント構造体を生成して返していますね。

これを最初に呼び出しています。

SendAnimeRadioInfo
func SendAnimeRadioInfo(client pb.AnimeRadioServiceClient, dataItems []pb.YouTubeInfo, ctx context.Context) {
	stream, err := client.SendAnimeRadioInfo(ctx)
	if err != nil {
		log.Printf("Error: %v", err)
		return
	}

  ...

次に、引数として受け取ったYouTubeデータのスライスをstream.Send()でサーバ側にストリーミングで並行で送信しています。

SendAnimeRadioInfo
  ...

  	for _, data := range dataItems {
		if err := stream.Send(&data); err != nil {
			log.Printf("Failed to send data: %v", err)
			return
		}
	}

次に、クライアントからデータを送り終えるとstream.CloseAndRecv()で通信を終端して最後にサーバからのレスポンスを受け取ります。

SendAnimeRadioInfo
  ...

	response, err := stream.CloseAndRecv()
	if err != nil {
		log.Printf("Error: %v", err)
		return
	}
	log.Printf("Response: %s", response.GetResult())
}

2つ目の関数はconnectWithRetryです。

この関数はサーバのIPアドレスを受け取り、クライアントコネクションを返します。

はじめにコネクションに渡すコンテキストを作成しています。
コンテキストは終了条件を付けることができますが、今回はタイムアウトを設定していてmaxRetryMinuteで定義した時間で強制終了するようにしています。

connectWithRetry
func connectWithRetry(serverAddress string) (*grpc.ClientConn, error) {
	var conn *grpc.ClientConn
	var err error

	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(maxRetryMinute)*time.Minute)
	defer cancel()

  ...

次にサーバへダイアル接続します。
上記で設定したタイムアウトを超過していなければリトライし、タイムアウトを超過すれば接続を終了しています。

connectWithRetry
	for {
		conn, err = grpc.DialContext(ctx, serverAddress, grpc.WithInsecure())
		if err != nil {
			if status.Code(err) == codes.DeadlineExceeded {
				// Deadline exceeded, stop retrying
				log.Printf("Deadline %d minutes exceeded: %v", maxRetryMinute, err)
				return nil, err
			}
			// Retry
			log.Printf("Retry due to: %v", err)
			time.Sleep(time.Duration(retrySecond) * time.Second)
			continue
		}
		// Success
		break
	}
	return conn, nil
}

3つ目の関数はmain関数です。

まずYouTube APIへのデータフェッチを非同期で並行処理しています。
Goroutinesで非同期でフェッチしたデータをresChanに格納します。

main
func main() {
	yc := api.NewYouTubeClient()

	var wg sync.WaitGroup
	keywords := append(variable.AnimeRadios, variable.VoiceActressRadios...)
	resChan := make(chan pb.YouTubeInfo, len(keywords))

	// Concurrently search YouTube data
	for _, keyword := range keywords {
		wg.Add(1)
		go func(kw string) {
			defer wg.Done()
			dataItems, err := yc.SearchByKeyword(kw)
			if err != nil {
				log.Printf("Error: %v", err)
				return
			}
			if len(dataItems) > 0 {
				resChan <- dataItems[0]
			}
		}(keyword)
	}

	// Wait for all goroutines until done
	go func() {
		wg.Wait()
		close(resChan)
	}()

  ...

次に、取得したデータをサーバに送信する処理を書きます。
ここで先ほど定義したSendAnimeRadioInfoconnectWithRetryを呼び出しています。

main
  ...

	var youTubeInfos []pb.YouTubeInfo
	for data := range resChan {
		youTubeInfos = append(youTubeInfos, data)
	}
	log.Printf("youTubeInfos: %v", youTubeInfos)

	// Create connection to gRPC server
  host := "localhost"
	port := 8080
	serverAddress := fmt.Sprintf("%s:%d", host, port)
	log.Printf("Server address: %s", serverAddress)
	conn, err := connectWithRetry(serverAddress)
	if err != nil {
		log.Printf("Failed to connect: %v", err)
	}
	defer conn.Close()

	// After 10 minutes, cancel the client request
	client := pb.NewAnimeRadioServiceClient(conn)
	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(maxRetryMinute)*time.Minute)
	defer cancel()

	SendAnimeRadioInfo(client, youTubeInfos, ctx)
}

3. PythonでのgRPC Serverの作成

gRPC ServerではClientから受け取ったデータをSlackへ通知し、最後にClientにレスポンスを送信する処理を実装します。

3-1. Slack API用ファイルの作成

まずSlackチャンネルへ通知する用のWebhookのURLを発行する必要があります。
こちらの手続きに従って発行しておいてください。

できたら、Slackへ通知するクラスを定義します。

Slack API用ファイル
server/api/slack.py
import json
import urllib.request

class SlackClient:
    def __init__(self, webhook_url: str):
        self.webhook_url = webhook_url

    def send_message(self, message: str) -> str:
        data = {
            "text": message
        }
        headers = {
            "Content-Type": "application/json"
        }

        # Create the request
        req = urllib.request.Request(
            self.webhook_url,
            data=json.dumps(data).encode("utf-8"),
            headers=headers
        )

        # Send the request
        try:
            with urllib.request.urlopen(req) as res:
                return res.read().decode("utf-8")
        except urllib.error.URLError as e:
            print("Failed to send message: ", e.reason)
            return ""

3-2. main関数の作成

最後に、gRPC Serverのmain関数を実装していきます。

main関数
server/main.py
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), 'gen'))

from concurrent import futures
from dotenv import load_dotenv
import grpc
import gen.anime_radio_pb2
import gen.anime_radio_pb2_grpc
from api.slack import SlackClient


# Read variables in .env
load_dotenv()


class AnimeRadioService(gen.anime_radio_pb2_grpc.AnimeRadioServiceServicer):
    def SendAnimeRadioInfo(self, request_iterator, context):
        MAX_WORKERS = 10
        slack_client = SlackClient(os.environ.get("SLACK_WEBHOOK_URL"))

        # Create a thread pool
        with futures.ThreadPoolExecutor(MAX_WORKERS) as executor:
            # Submit the tasks
            future_to_url = {
                executor.submit(
                    slack_client.send_message,
                    f"title: {youtube_data.title}, URL: {youtube_data.url}"
                ): youtube_data
                for youtube_data in request_iterator
            }

            # Wait for the tasks to complete
            for future in futures.as_completed(future_to_url):
                youtube_data = future_to_url[future]
                try:
                    data = future.result()
                    print(f"Successfully sent message: {data}")
                except Exception as e:
                    print(f"Failed to send message: {e}")

        return gen.anime_radio_pb2.SlackResponse(
            result="success"
        )


def main():
    server = grpc.server(futures.ThreadPoolExecutor())
    gen.anime_radio_pb2_grpc.add_AnimeRadioServiceServicer_to_server(AnimeRadioService(), server)

    # listen to all hosts
    port = 8080
    address = f"[::]:{port}"
    server.add_insecure_port(address)
    print(f"listen to {address}")
    server.start()
    server.wait_for_termination()


if __name__ == '__main__':
    main()

ServerでもClientと同じように自動生成されたクラスやメソッドを利用していきます。
まず、AnimeRadioServiceクラスをgen.anime_radio_pb2_grpc.AnimeRadioServiceServicerを継承して作成し、SendAnimeRadioInfoメソッドを定義していきます。

このメソッドではrequest_iteratorというClientからストリームで送られてきた配列の引数を展開して、concurrentライブラリを用いて並列処理でSlackにデータを通知しています。
最後にgen.anime_radio_pb2.SlackResponseで自動で定義された型のレスポンスを返します。

次に、main関数です。
ここではgrpc.server()でサーバのインスタンスを作成し、サーバインスタンスにメソッドを登録して、起動する処理を実装しています。
アドレスは[::]"として定義していて、これはIPv4, IPv6アドレスのすべてのIPアドレスを指しています。

4. 動作確認

最後に動作確認をしていきましょう。

サーバの起動

$ cd server
$ poetry run python main.py
# listen to [::]:8080

クライアントの実行

$ cd client
$ go run main.go

これを実行後にサーバでSuccessfully sent message: ok, クライアントでResponse: successとログ表示されていれば成功です!

02-result
こんな感じでSlackに通知されます

おわりに

今回はじめてgRPCを使った実装をしてみました!
Clientという名前こそ付いていますが様々なサーバサイド言語間で通信できて面白かったです。
ストリーミングや並行処理がよく出てくるので、このあたりの理解を深めていきたいですね!

それでは、良きアニメライフを!!

References

Discussion