Google Cloud Runを使って双方向ストリーミングなgRPCサーバーを構築してみる
どうもMAD事業部の新井です。
本エントリは クラスメソッド Google Cloud Advent Calendar 2021 の 4日目の記事です。
今回はgRPCの双方向ストリーミングをCloud Runで試してみたいと思います。
Cloud Runは、アプリケーションを実行できるサービスの中でも「エンドツーエンドの HTTP/2 接続、WebSocket のサポート、gRPC の双方向ストリーミングが可能」という特徴があります。詳しくはこちらをご覧ください。
双方向ストリーミングにも対応しているということなので、勉強がてら試していこうと思います。
お題: おじさんとチャットする
最近寒くなってきましたね。
人肌恋しいこの時期には「おじさんとチャットして心を温めたい」と思う人も多いんじゃないでしょうか?
そんなあなたのために、今回はおじさんとチャットできるアプリをgRPC双方向ストリーミングをつかって実装してみたいと思います。
おじさん構文の作成にはOjichatを使わせて頂こうと思います。
細かい仕様は下記にリストアップしてみました。
- チャットルームに接続するおじさんとの会話がスタートする
- おじさんは自分の好きなタイミングでチャットを送ってくる(チョット気持ち悪い)
- こちらが送ったメッセージに対しては一定のリアクションはしてくれる(自分の会話優先)
- おじさんとの会話のキャッチボールは成立しない(会話は一方通行)
- 接続のたびに違うタイプのおじさんと会話ができる (クセの強いおじさんもいる)
アプリケーションの作成
ここではポイントだけ抜粋して紹介します。 詳しくはGitHubのソースコードを参考にしてください。
Protocol Buffer
rpcは1つだけにしました。
リクエスト時にはname
を送ることで、おじさんが自分の名前を認識してくれます。
syntax = "proto3"; package ojichat; option go_package = "./;ojichat"; service Ojichat { rpc Chat (stream ChatRequest) returns (stream ChatResponse) {} } message ChatRequest { string name = 1; string message = 2; } message ChatResponse { string message = 1; }
Server
サーバー側は主に次の2つの処理があります。
- クライアントからのメッセージに対して返答する
- おじさんが送りたいタイミングでメッセージを送る
これを実現するために、receive
とreply
という関数をgoroutineで実行し、それぞれrecvCh
とreplyCh
というチャネルにメッセージを流しています。
receive
関数ではクライアントからのメッセージを待ち受けています。一方、reply
関数では0~10秒ほどスリープした後、おじさんが会話開始するのを待っています。
recvCh
にメッセージが届くと、クライアントに返信ありがとう (^o^)
という固定メッセージを返します。一方replyCh
にメッセージが届くと、クライアントにOjichatで自動生成したテキストが送信されます。Ojichatのパラメーターはクライアント毎にランダムです。つまり、接続の度にいろんなタイプのおじさんと会話がたのしめます。
package main import ( "io" "log" "math/rand" "net" "time" pb "ojichat-stream/proto/gen" ojc "github.com/greymd/ojichat/generator" "google.golang.org/grpc" ) type server struct { pb.UnimplementedOjichatServer } func receive(ch chan<- pb.ChatRequest, stream pb.Ojichat_ChatServer) { for { in, err := stream.Recv() if err == io.EOF { continue } if err != nil { log.Println(err) return } ch <- *in } } func reply(ch chan<- bool, stream pb.Ojichat_ChatServer) { for { time.Sleep(time.Second * time.Duration(rand.Intn(10))) ch <- true } } func (s *server) Chat(stream pb.Ojichat_ChatServer) error { ojiConf := ojc.Config{EmojiNum: rand.Intn(10), PunctuationLevel: rand.Intn(3)} recvCh := make(chan pb.ChatRequest) go receive(recvCh, stream) replyCh := make(chan bool) go reply(replyCh, stream) for { select { case v := <-recvCh: name := v.GetName() msg := v.GetMessage() log.Printf("name: %v, message: %v ", name, msg) ojiConf.TargetName = name if msg == "" { continue } if err := stream.Send(&pb.ChatResponse{Message: "返信ありがとう (^o^)"}); err != nil { return err } case <-replyCh: reply, err := ojc.Start(ojiConf) if err != nil { return err } if err := stream.Send(&pb.ChatResponse{Message: reply}); err != nil { return err } } } } func main() { port := ":50051" lis, err := net.Listen("tcp", port) if err != nil { log.Fatal(err) } s := grpc.NewServer() pb.RegisterOjichatServer(s, &server{}) if err != nil { log.Fatal(err) } if err := s.Serve(lis); err != nil { log.Fatal(err) } }
Client
クライアント側は主に次の2つの処理があります。先ほどのサーバー側の処理とかなり似ています。
- おじさんからのメッセージを表示する
- おじさんにメッセージを送る
receive
とinput
という関数をgoroutineで実行し、それぞれrecvCh
とinputCh
というチャネルにメッセージを流しています。
receive
関数ではおじさんからのメッセージを待ち受けています。一方、input
関数では、ユーザーからのメッセージを標準入力で待ち受けています。
recvCh
にメッセージが届くと、標準出力におじさんからのメッセージが表示されます。一方inputCh
にメッセージが届くと、おじさんにメッセージを送ることができます。
また、クライアント側ではcobraを使ってコマンドライン操作がしやすいよう実装しています。
package main import ( "bufio" "context" "crypto/tls" "fmt" "io" "log" "os" pb "ojichat-stream/proto/gen" "github.com/spf13/cobra" "google.golang.org/grpc" "google.golang.org/grpc/credentials" ) func receive(ch chan<- pb.ChatResponse, stream pb.Ojichat_ChatClient) { for { in, err := stream.Recv() if err == io.EOF { close(ch) return } if err != nil { log.Fatal(err) } ch <- *in } } func input(ch chan<- pb.ChatRequest, r io.Reader) { s := bufio.NewScanner(r) fmt.Printf("\n\x1b[36menter message:\x1b[0m") for s.Scan() { input := pb.ChatRequest{Message: s.Text()} ch <- input } } func exec(name string, addr string, secure bool) { var conn *grpc.ClientConn var err error if secure { tlsCredentials := credentials.NewTLS(&tls.Config{}) conn, err = grpc.Dial(addr, grpc.WithTransportCredentials(tlsCredentials), grpc.WithBlock()) } else { conn, err = grpc.Dial(addr, grpc.WithInsecure(), grpc.WithBlock()) } defer conn.Close() if err != nil { log.Fatal(err) } client := pb.NewOjichatClient(conn) stream, err := client.Chat(context.Background()) if err != nil { log.Fatal(err) } inputCh := make(chan pb.ChatRequest) go input(inputCh, os.Stdin) recvCh := make(chan pb.ChatResponse) go receive(recvCh, stream) ctx, cancel := context.WithCancel(context.TODO()) defer cancel() for { select { case <-ctx.Done(): fmt.Println("done") stream.CloseSend() return case v := <-recvCh: fmt.Printf("\n\x1b[32mおじさん>\x1b[0m %v\n\n", v.Message) fmt.Printf("\n\x1b[36menter message:\x1b[0m") case v := <-inputCh: v.Name = name if err := stream.Send(&v); err != nil { log.Fatal(err) } } } } var rootCmd = &cobra.Command{ Use: "client", Short: "gRPC client for Ojichat server", Run: func(cmd *cobra.Command, args []string) { name, err := cmd.Flags().GetString("name") if err != nil { log.Fatal(err) } addr, err := cmd.Flags().GetString("addr") if err != nil { log.Fatal(err) } secure, err := cmd.Flags().GetBool("secure") if err != nil { log.Fatal(err) } exec(name, addr, secure) }, } func init() { rootCmd.Flags().StringP("name", "n", "unknown", "enter your name") rootCmd.Flags().StringP("addr", "a", "localhost:50051", "enter server address") rootCmd.Flags().BoolP("secure", "s", false, "enable secure access") if err := rootCmd.MarkFlagRequired("name"); err != nil { log.Fatal(err) } } func main() { if err := rootCmd.Execute(); err != nil { log.Fatal(err) } }
デプロイ
デプロイはdocker
およびgcloud
コマンドでおこないます。
めちゃくちゃ簡単です。
# build docker build -t gcr.io/<your-account-id>/ojichat-stream:latest . # push docker push gcr.io/<your-account-id>/ojichat-stream:latest # deploy gcloud run deploy --image gcr.io/<your-account-id>/ojichat-stream:latest --platform managed --region asia-northeast1 --use-http2 --allow-unauthenticated --port 50051
GCR(Google Container Registory)にイメージをPushしていますが、初回アクセスだと下記のコマンドでの認証まわりの設定が必要です。
# ref: https://cloud.google.com/sdk/gcloud/reference/auth/configure-docker # ref: https://cloud.google.com/container-registry/docs/advanced-authentication#gcloud-helper gcloud auth configure-docker
動作確認
クライアントのコマンドは下記のような設定になっています。
$ go run cmd/client/main.go -h gRPC client for Ojichat server Usage: client [flags] Flags: -a, --addr string enter server address (default "localhost:50051") -h, --help help for client -n, --name string enter your name (default "unknown") -s, --secure enable secure access
下記のコマンドを4分割したターミナルで実行して、いろんなタイプのおじさんと会話してみたいと思います。
go run cmd/client/main.go --name arai --addr <your-endpoint>:443 -s
Vivaおじさん
左上のおじさんはかなりおとなしいのに比べて、右下のおじさんはかなり騒がしいです。(笑)
まとめ
思ったよりサクッとデプロイできてました。
Cloud Runの特徴の一つでもある「デプロイの容易さ」が体感できました。
あと、初めて使ったんですがOjichatすごいですね。裏側の仕組みも気になるところです。
明日 12/5 は ikeda さんです。よろしくお願いします!