アニメで分かるgRPC
TL;DR
- gRPCを使ってアニメラジオの新着情報をSlackに通知します
- gRPC ClientをGoで構築します
- gRPC ServerをPythonで構築します
はじめに
どうも、アニメマスターです!
ところで皆さん、アニメラジオっていつ更新されてるか分からなくないですか?
わたしは分かりません!
ということで、今回はアニメラジオが更新されたらSlackに通知するシステムを、わざわざgRPCを使って世界一無駄に構築していきたいと思います!!
概観
GoでYouTube Data APIからアニメラジオの情報を取得し、クライアントからストリーミングでPythonサーバにリクエストを送り、送られた情報をSlackへ通知します。
1 gRPCサービスの作成
gRPCはProtocol BuffersとHTTP/2を用いたRPCフレームワークです。
gRPCがそもそもなにか分からない人は公式や、日本の記事ではこちらが分かりやすかったので適宜ご確認ください。
1-1. protoファイルの作成
それでは今回のプロジェクト用に以下のようなディレクトリを作成していきます。
.
├── api
│ └── proto
├── client
│ ├── api
│ ├── gen
│ └── variable
└── server
├── api
├── gen
はじめに.proto
という拡張子のProtocol Buffersのファイルを作成していきましょう。
anime_radio.ptoto
syntax = "proto3";
option go_package = "./";
service AnimeRadioService {
rpc SendAnimeRadioInfo (stream YouTubeInfo) returns (SlackResponse);
}
message YouTubeInfo {
string title = 1;
string url = 2;
}
message SlackResponse {
string result = 1;
}
今回はAnimeRadioServiceというサービスの中にSendAnimeRadioInfoという関数を一つだけ作成します。
この関数は引数としてYouTubeInfoを、戻り値としてSlackResponseを返します。YouTubeInfoの前につくstream
はストリーミング通信を表し、クライアントから並行して複数のリクエストが走ることを意味しています。
message
からはじまる2つのオブジェクトは引数と戻り値の型を表しています。
ファイルができたら、api/proto
配下に格納してください。
1-2. rpcの自動生成
それでは先ほど作成したprotoファイルからクライアントやサーバで利用するパッケージを自動生成していきましょう。
PythonとGoでそれぞれ事前にインストールするライブラリがあるので、まずはそれを用意します。
PythonはPoetryを使ってプロジェクトを構成していくのでPoetryをインストールしていない人はこちらに従ってインストールしてください。
ただ、必ずしもパッケージマネージャを使う必要はないので、インストールしたくない方はpipを使ってもらうだけで大丈夫です。
Poetryを使う人はプロジェクトを初期化して、必要なライブラリを追加していきましょう。
# ./server
$ cd server
# プロジェクトの初期化
$ poetry init
# ライブラリの追加
$ poetry add grpcio-tools
次にGoです。
# ./client
$ cd client
# プロジェクトの初期化
$ go mod init
# モジュールのインストール
$ go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
$ go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
それぞれ必要なライブラリが用意できたら以下のコマンドを実行して、ファイルを自動生成します。
# server/に移動して実行
$ cd server && poetry run python -m grpc_tools.protoc -I.api/proto --python_out=gen --grpc_python_out=.gen .api/proto/anime_radio.proto
# から実行
$ protoc api/proto/anime_radio.proto --go_out=client/gen --go-grpc_out=client/gen
serverとclientにそれぞれ以下のファイルが生成されたことを確認できたらOKです。
# ./server
gen
├── anime_radio_pb2.py
└── anime_radio_pb2_grpc.py
# ./client
gen
├── anime_radio.pb.go
└── anime_radio_grpc.pb.go
2. GoでのgRPC Clientの作成
gRPC ClientではYouTubeにキーワード検索し、取得されたデータをgRPC Serverにストリーミングで送信する処理を実装します。
2-1. YouTube Data API用パッケージの作成
まずclient/api
配下にYouTube Data API用のパッケージを作成します。
YouTubeのAPIを利用するにはAPI KEYを発行する必要があるので、このあたりを参考にGCPから認証情報を取得してください。
client/api/youtube.go
package api
import (
"context"
"log"
"google.golang.org/api/option"
"google.golang.org/api/youtube/v3"
pb "github.com/miyuki-starmiya/anime-radio-grpc/gen"
)
type YouTubeClient struct {
Service *youtube.Service
}
func NewYouTubeClient() *YouTubeClient {
// init youtube client
apiKey := os.Getenv("YOUTUBE_API_KEY")
service, err := youtube.NewService(context.Background(), option.WithAPIKey(apiKey))
if err != nil {
log.Printf("Error: %v", err)
return nil
}
return &YouTubeClient{
Service: service,
}
}
func (yc *YouTubeClient) SearchByKeyword(keyword string) ([]pb.YouTubeInfo, error) {
yesterdayStr := time.Now().Add(time.Duration(-24) * time.Hour).Format("2006-01-02T15:04:05Z")
call := yc.Service.Search.List([]string{"id", "snippet"})
call = call.Q(keyword).MaxResults(1).PublishedAfter(yesterdayStr)
response, err := call.Do()
if err != nil {
return nil, err
}
// create response
var result []pb.YouTubeInfo
for _, item := range response.Items {
result = append(result, pb.YouTubeInfo{
Title: item.Snippet.Title,
Url: "https://www.youtube.com/watch?v=" + item.Id.VideoId,
})
log.Printf("Title: %s, URL: %s", item.Snippet.Title, "https://www.youtube.com/watch?v="+item.Id.VideoId)
}
return result, nil
}
まずNewYouTubeClientで、google.golang.org/api/youtube/v3モジュールを利用して、YouTubeの動画をキーワード検索するファクトリー関数を作成しています。
次に、SearchByKeywordメソッドを作成していて、自動生成した型([]pb.YouTubeInfo)のスライスをresultとして格納し、それを戻り値として返しています。
callの部分で詳細な検索条件を設定することができ、検索件数をキーワードごとに1件、現在時間から1日前までのもののみを取得しています。
また、os.Getenv()
で環境変数を参照しているので、.env
等に以下の変数を格納しておきます。
YOUTUBE_API_KEY=<YOUR_API_KEY>
次に、検索キーワードを格納するパッケージを作成します。ここは完全にお好みです。
package variable
var (
AnimeRadios = []string{
"100カノRADIO",
"ぼっち・ざ・らじお!",
`姫様\“ラジオ\”の時間です`,
"まほあこラジオ",
"どさこいラジオ",
"スナックバス江 おもてなしラジオ",
}
VoiceActressRadios = []string{
"楠木ともりを灯せていますか?",
}
)
アニメラジオはクールをまたいで放送されているものも多く、わたしはクール外で100カノやぼっち・ざ・ろっく!のラジオを聴いています。
声優ラジオはあまり聴けておらず、楠木ともりさんくらいしか聴いていないので、オススメがあれば是非教えてください。
2-2. mainパッケージの実装
mainパッケージを以下の様に実装していきます。
main関数
package main
import (
"context"
"fmt"
"log"
"sync"
"time"
_ "github.com/joho/godotenv/autoload"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/miyuki-starmiya/anime-radio-grpc/api"
pb "github.com/miyuki-starmiya/anime-radio-grpc/gen"
"github.com/miyuki-starmiya/anime-radio-grpc/variable"
)
const (
retrySecond = 30
maxRetryMinute = 10
)
// Client stream
func SendAnimeRadioInfo(client pb.AnimeRadioServiceClient, dataItems []pb.YouTubeInfo, ctx context.Context) {
stream, err := client.SendAnimeRadioInfo(ctx)
if err != nil {
log.Printf("Error: %v", err)
return
}
for _, data := range dataItems {
if err := stream.Send(&data); err != nil {
log.Printf("Failed to send data: %v", err)
return
}
}
response, err := stream.CloseAndRecv()
if err != nil {
log.Printf("Error: %v", err)
return
}
log.Printf("Response: %s", response.GetResult())
}
func connectWithRetry(serverAddress string) (*grpc.ClientConn, error) {
var conn *grpc.ClientConn
var err error
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(maxRetryMinute)*time.Minute)
defer cancel()
for {
conn, err = grpc.DialContext(ctx, serverAddress, grpc.WithInsecure())
if err != nil {
if status.Code(err) == codes.DeadlineExceeded {
// Deadline exceeded, stop retrying
log.Printf("Deadline %d minutes exceeded: %v", maxRetryMinute, err)
return nil, err
}
// Retry
log.Printf("Retry due to: %v", err)
time.Sleep(time.Duration(retrySecond) * time.Second)
continue
}
// Success
break
}
return conn, nil
}
func main() {
yc := api.NewYouTubeClient()
var wg sync.WaitGroup
keywords := append(variable.AnimeRadios, variable.VoiceActressRadios...)
resChan := make(chan pb.YouTubeInfo, len(keywords))
// Concurrently search YouTube data
for _, keyword := range keywords {
wg.Add(1)
go func(kw string) {
defer wg.Done()
dataItems, err := yc.SearchByKeyword(kw)
if err != nil {
log.Printf("Error: %v", err)
return
}
if len(dataItems) > 0 {
resChan <- dataItems[0]
}
}(keyword)
}
// Wait for all goroutines until done
go func() {
wg.Wait()
close(resChan)
}()
var youTubeInfos []pb.YouTubeInfo
for data := range resChan {
youTubeInfos = append(youTubeInfos, data)
}
log.Printf("youTubeInfos: %v", youTubeInfos)
// Create connection to gRPC server
host := "localhost"
port := 8080
serverAddress := fmt.Sprintf("%s:%d", host, port)
log.Printf("Server address: %s", serverAddress)
conn, err := connectWithRetry(serverAddress)
if err != nil {
log.Printf("Failed to connect: %v", err)
}
defer conn.Close()
// After 10 minutes, cancel the client request
client := pb.NewAnimeRadioServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(maxRetryMinute)*time.Minute)
defer cancel()
SendAnimeRadioInfo(client, youTubeInfos, ctx)
}
mainパッケージでは関数を3つ実装しています。順番に見ていきましょう。
1つ目の関数はSendAnimeRadioInfoです。
ここではclient/gen/anime_radio_grpc.pb.go
で作られた同名のSendAnimeRadioInfo
関数を定義します。
まず自動生成されたパッケージのSendAnimeRadioInfo
メソッドを見てみましょう。
func (c *animeRadioServiceClient) SendAnimeRadioInfo(ctx context.Context, opts ...grpc.CallOption) (AnimeRadioService_SendAnimeRadioInfoClient, error) {
stream, err := c.cc.NewStream(ctx, &AnimeRadioService_ServiceDesc.Streams[0], AnimeRadioService_SendAnimeRadioInfo_FullMethodName, opts...)
if err != nil {
return nil, err
}
x := &animeRadioServiceSendAnimeRadioInfoClient{stream}
return x, nil
}
引数としてコンテキストとオプションを受け取り、ストリーミング通信のクライアント構造体を生成して返していますね。
これを最初に呼び出しています。
func SendAnimeRadioInfo(client pb.AnimeRadioServiceClient, dataItems []pb.YouTubeInfo, ctx context.Context) {
stream, err := client.SendAnimeRadioInfo(ctx)
if err != nil {
log.Printf("Error: %v", err)
return
}
...
次に、引数として受け取ったYouTubeデータのスライスをstream.Send()
でサーバ側にストリーミングで並行で送信しています。
...
for _, data := range dataItems {
if err := stream.Send(&data); err != nil {
log.Printf("Failed to send data: %v", err)
return
}
}
次に、クライアントからデータを送り終えるとstream.CloseAndRecv()
で通信を終端して最後にサーバからのレスポンスを受け取ります。
...
response, err := stream.CloseAndRecv()
if err != nil {
log.Printf("Error: %v", err)
return
}
log.Printf("Response: %s", response.GetResult())
}
2つ目の関数はconnectWithRetryです。
この関数はサーバのIPアドレスを受け取り、クライアントコネクションを返します。
はじめにコネクションに渡すコンテキストを作成しています。
コンテキストは終了条件を付けることができますが、今回はタイムアウトを設定していてmaxRetryMinute
で定義した時間で強制終了するようにしています。
func connectWithRetry(serverAddress string) (*grpc.ClientConn, error) {
var conn *grpc.ClientConn
var err error
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(maxRetryMinute)*time.Minute)
defer cancel()
...
次にサーバへダイアル接続します。
上記で設定したタイムアウトを超過していなければリトライし、タイムアウトを超過すれば接続を終了しています。
for {
conn, err = grpc.DialContext(ctx, serverAddress, grpc.WithInsecure())
if err != nil {
if status.Code(err) == codes.DeadlineExceeded {
// Deadline exceeded, stop retrying
log.Printf("Deadline %d minutes exceeded: %v", maxRetryMinute, err)
return nil, err
}
// Retry
log.Printf("Retry due to: %v", err)
time.Sleep(time.Duration(retrySecond) * time.Second)
continue
}
// Success
break
}
return conn, nil
}
3つ目の関数はmain関数です。
まずYouTube APIへのデータフェッチを非同期で並行処理しています。
Goroutinesで非同期でフェッチしたデータをresChan
に格納します。
func main() {
yc := api.NewYouTubeClient()
var wg sync.WaitGroup
keywords := append(variable.AnimeRadios, variable.VoiceActressRadios...)
resChan := make(chan pb.YouTubeInfo, len(keywords))
// Concurrently search YouTube data
for _, keyword := range keywords {
wg.Add(1)
go func(kw string) {
defer wg.Done()
dataItems, err := yc.SearchByKeyword(kw)
if err != nil {
log.Printf("Error: %v", err)
return
}
if len(dataItems) > 0 {
resChan <- dataItems[0]
}
}(keyword)
}
// Wait for all goroutines until done
go func() {
wg.Wait()
close(resChan)
}()
...
次に、取得したデータをサーバに送信する処理を書きます。
ここで先ほど定義したSendAnimeRadioInfo
とconnectWithRetry
を呼び出しています。
...
var youTubeInfos []pb.YouTubeInfo
for data := range resChan {
youTubeInfos = append(youTubeInfos, data)
}
log.Printf("youTubeInfos: %v", youTubeInfos)
// Create connection to gRPC server
host := "localhost"
port := 8080
serverAddress := fmt.Sprintf("%s:%d", host, port)
log.Printf("Server address: %s", serverAddress)
conn, err := connectWithRetry(serverAddress)
if err != nil {
log.Printf("Failed to connect: %v", err)
}
defer conn.Close()
// After 10 minutes, cancel the client request
client := pb.NewAnimeRadioServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(maxRetryMinute)*time.Minute)
defer cancel()
SendAnimeRadioInfo(client, youTubeInfos, ctx)
}
3. PythonでのgRPC Serverの作成
gRPC ServerではClientから受け取ったデータをSlackへ通知し、最後にClientにレスポンスを送信する処理を実装します。
3-1. Slack API用ファイルの作成
まずSlackチャンネルへ通知する用のWebhookのURLを発行する必要があります。
こちらの手続きに従って発行しておいてください。
できたら、Slackへ通知するクラスを定義します。
Slack API用ファイル
import json
import urllib.request
class SlackClient:
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
def send_message(self, message: str) -> str:
data = {
"text": message
}
headers = {
"Content-Type": "application/json"
}
# Create the request
req = urllib.request.Request(
self.webhook_url,
data=json.dumps(data).encode("utf-8"),
headers=headers
)
# Send the request
try:
with urllib.request.urlopen(req) as res:
return res.read().decode("utf-8")
except urllib.error.URLError as e:
print("Failed to send message: ", e.reason)
return ""
3-2. main関数の作成
最後に、gRPC Serverのmain関数を実装していきます。
main関数
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), 'gen'))
from concurrent import futures
from dotenv import load_dotenv
import grpc
import gen.anime_radio_pb2
import gen.anime_radio_pb2_grpc
from api.slack import SlackClient
# Read variables in .env
load_dotenv()
class AnimeRadioService(gen.anime_radio_pb2_grpc.AnimeRadioServiceServicer):
def SendAnimeRadioInfo(self, request_iterator, context):
MAX_WORKERS = 10
slack_client = SlackClient(os.environ.get("SLACK_WEBHOOK_URL"))
# Create a thread pool
with futures.ThreadPoolExecutor(MAX_WORKERS) as executor:
# Submit the tasks
future_to_url = {
executor.submit(
slack_client.send_message,
f"title: {youtube_data.title}, URL: {youtube_data.url}"
): youtube_data
for youtube_data in request_iterator
}
# Wait for the tasks to complete
for future in futures.as_completed(future_to_url):
youtube_data = future_to_url[future]
try:
data = future.result()
print(f"Successfully sent message: {data}")
except Exception as e:
print(f"Failed to send message: {e}")
return gen.anime_radio_pb2.SlackResponse(
result="success"
)
def main():
server = grpc.server(futures.ThreadPoolExecutor())
gen.anime_radio_pb2_grpc.add_AnimeRadioServiceServicer_to_server(AnimeRadioService(), server)
# listen to all hosts
port = 8080
address = f"[::]:{port}"
server.add_insecure_port(address)
print(f"listen to {address}")
server.start()
server.wait_for_termination()
if __name__ == '__main__':
main()
ServerでもClientと同じように自動生成されたクラスやメソッドを利用していきます。
まず、AnimeRadioServiceクラスをgen.anime_radio_pb2_grpc.AnimeRadioServiceServicerを継承して作成し、SendAnimeRadioInfoメソッドを定義していきます。
このメソッドではrequest_iterator
というClientからストリームで送られてきた配列の引数を展開して、concurrent
ライブラリを用いて並列処理でSlackにデータを通知しています。
最後にgen.anime_radio_pb2.SlackResponse
で自動で定義された型のレスポンスを返します。
次に、main関数です。
ここではgrpc.server()
でサーバのインスタンスを作成し、サーバインスタンスにメソッドを登録して、起動する処理を実装しています。
アドレスは[::]"
として定義していて、これはIPv4, IPv6アドレスのすべてのIPアドレスを指しています。
4. 動作確認
最後に動作確認をしていきましょう。
サーバの起動
$ cd server
$ poetry run python main.py
# listen to [::]:8080
クライアントの実行
$ cd client
$ go run main.go
これを実行後にサーバでSuccessfully sent message: ok
, クライアントでResponse: success
とログ表示されていれば成功です!
こんな感じでSlackに通知されます
おわりに
今回はじめてgRPCを使った実装をしてみました!
Clientという名前こそ付いていますが様々なサーバサイド言語間で通信できて面白かったです。
ストリーミングや並行処理がよく出てくるので、このあたりの理解を深めていきたいですね!
それでは、良きアニメライフを!!
Discussion