Google Cloud Runを使って双方向ストリーミングなgRPCサーバーを構築してみる

Google Cloud Runを使って双方向ストリーミングなgRPCサーバーを構築してみる

Clock Icon2021.12.04

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

どうもMAD事業部の新井です。

本エントリは クラスメソッド Google Cloud Advent Calendar 2021 の 4日目の記事です。

今回はgRPCの双方向ストリーミングをCloud Runで試してみたいと思います。

Cloud Runは、アプリケーションを実行できるサービスの中でも「エンドツーエンドの HTTP/2 接続、WebSocket のサポート、gRPC の双方向ストリーミングが可能」という特徴があります。詳しくはこちらをご覧ください。

双方向ストリーミングにも対応しているということなので、勉強がてら試していこうと思います。

お題: おじさんとチャットする

最近寒くなってきましたね。

人肌恋しいこの時期には「おじさんとチャットして心を温めたい」と思う人も多いんじゃないでしょうか?

そんなあなたのために、今回はおじさんとチャットできるアプリをgRPC双方向ストリーミングをつかって実装してみたいと思います。

おじさん構文の作成にはOjichatを使わせて頂こうと思います。

細かい仕様は下記にリストアップしてみました。

  • チャットルームに接続するおじさんとの会話がスタートする
  • おじさんは自分の好きなタイミングでチャットを送ってくる(チョット気持ち悪い)
  • こちらが送ったメッセージに対しては一定のリアクションはしてくれる(自分の会話優先)
  • おじさんとの会話のキャッチボールは成立しない(会話は一方通行)
  • 接続のたびに違うタイプのおじさんと会話ができる (クセの強いおじさんもいる)

アプリケーションの作成

ここではポイントだけ抜粋して紹介します。 詳しくはGitHubのソースコードを参考にしてください。

Protocol Buffer

rpcは1つだけにしました。

リクエスト時にはnameを送ることで、おじさんが自分の名前を認識してくれます。

syntax = "proto3";
package ojichat;
option go_package = "./;ojichat";

service Ojichat {
  rpc Chat (stream ChatRequest) returns (stream ChatResponse) {}
}


message ChatRequest {
  string name = 1;
  string message = 2;
}

message ChatResponse {
  string message = 1;
}

Server

サーバー側は主に次の2つの処理があります。

  1. クライアントからのメッセージに対して返答する
  2. おじさんが送りたいタイミングでメッセージを送る

これを実現するために、receivereplyという関数をgoroutineで実行し、それぞれrecvChreplyChというチャネルにメッセージを流しています。

receive関数ではクライアントからのメッセージを待ち受けています。一方、reply関数では0~10秒ほどスリープした後、おじさんが会話開始するのを待っています。

recvChにメッセージが届くと、クライアントに返信ありがとう (^o^)という固定メッセージを返します。一方replyChにメッセージが届くと、クライアントにOjichatで自動生成したテキストが送信されます。Ojichatのパラメーターはクライアント毎にランダムです。つまり、接続の度にいろんなタイプのおじさんと会話がたのしめます。

package main

import (
 "io"
 "log"
 "math/rand"
 "net"
 "time"

 pb "ojichat-stream/proto/gen"

 ojc "github.com/greymd/ojichat/generator"

 "google.golang.org/grpc"
)

type server struct {
 pb.UnimplementedOjichatServer
}

func receive(ch chan<- pb.ChatRequest, stream pb.Ojichat_ChatServer) {
 for {
  in, err := stream.Recv()
  if err == io.EOF {
   continue
  }
  if err != nil {
   log.Println(err)
   return
  }
  ch <- *in
 }
}

func reply(ch chan<- bool, stream pb.Ojichat_ChatServer) {
 for {
  time.Sleep(time.Second * time.Duration(rand.Intn(10)))
  ch <- true
 }
}

func (s *server) Chat(stream pb.Ojichat_ChatServer) error {
 ojiConf := ojc.Config{EmojiNum: rand.Intn(10), PunctuationLevel: rand.Intn(3)}
 recvCh := make(chan pb.ChatRequest)
 go receive(recvCh, stream)

 replyCh := make(chan bool)
 go reply(replyCh, stream)

 for {
  select {
  case v := <-recvCh:
   name := v.GetName()
   msg := v.GetMessage()
   log.Printf("name: %v, message: %v ", name, msg)
   ojiConf.TargetName = name
   if msg == "" {
    continue
   }
   if err := stream.Send(&pb.ChatResponse{Message: "返信ありがとう (^o^)"}); err != nil {
    return err
   }
  case <-replyCh:
   reply, err := ojc.Start(ojiConf)
   if err != nil {
    return err
   }
   if err := stream.Send(&pb.ChatResponse{Message: reply}); err != nil {
    return err
   }
  }
 }
}

func main() {
 port := ":50051"
 lis, err := net.Listen("tcp", port)
 if err != nil {
  log.Fatal(err)
 }

 s := grpc.NewServer()
 pb.RegisterOjichatServer(s, &server{})
 if err != nil {
  log.Fatal(err)
 }

 if err := s.Serve(lis); err != nil {
  log.Fatal(err)
 }
}

Client

クライアント側は主に次の2つの処理があります。先ほどのサーバー側の処理とかなり似ています。

  1. おじさんからのメッセージを表示する
  2. おじさんにメッセージを送る

receiveinputという関数をgoroutineで実行し、それぞれrecvChinputChというチャネルにメッセージを流しています。

receive関数ではおじさんからのメッセージを待ち受けています。一方、input関数では、ユーザーからのメッセージを標準入力で待ち受けています。

recvChにメッセージが届くと、標準出力におじさんからのメッセージが表示されます。一方inputChにメッセージが届くと、おじさんにメッセージを送ることができます。

また、クライアント側ではcobraを使ってコマンドライン操作がしやすいよう実装しています。

package main

import (
 "bufio"
 "context"
 "crypto/tls"
 "fmt"
 "io"
 "log"
 "os"

 pb "ojichat-stream/proto/gen"

 "github.com/spf13/cobra"
 "google.golang.org/grpc"
 "google.golang.org/grpc/credentials"
)

func receive(ch chan<- pb.ChatResponse, stream pb.Ojichat_ChatClient) {
 for {
  in, err := stream.Recv()
  if err == io.EOF {
   close(ch)
   return
  }
  if err != nil {
   log.Fatal(err)
  }
  ch <- *in
 }
}

func input(ch chan<- pb.ChatRequest, r io.Reader) {
 s := bufio.NewScanner(r)
 fmt.Printf("\n\x1b[36menter message:\x1b[0m")
 for s.Scan() {
  input := pb.ChatRequest{Message: s.Text()}
  ch <- input
 }
}

func exec(name string, addr string, secure bool) {
 var conn *grpc.ClientConn
 var err error
 if secure {
  tlsCredentials := credentials.NewTLS(&tls.Config{})
  conn, err = grpc.Dial(addr, grpc.WithTransportCredentials(tlsCredentials), grpc.WithBlock())
 } else {
  conn, err = grpc.Dial(addr, grpc.WithInsecure(), grpc.WithBlock())
 }
 defer conn.Close()
 if err != nil {
  log.Fatal(err)
 }

 client := pb.NewOjichatClient(conn)
 stream, err := client.Chat(context.Background())
 if err != nil {
  log.Fatal(err)
 }

 inputCh := make(chan pb.ChatRequest)
 go input(inputCh, os.Stdin)

 recvCh := make(chan pb.ChatResponse)
 go receive(recvCh, stream)

 ctx, cancel := context.WithCancel(context.TODO())
 defer cancel()

 for {
  select {
  case <-ctx.Done():
   fmt.Println("done")
   stream.CloseSend()
   return
  case v := <-recvCh:
   fmt.Printf("\n\x1b[32mおじさん>\x1b[0m %v\n\n", v.Message)
   fmt.Printf("\n\x1b[36menter message:\x1b[0m")
  case v := <-inputCh:
   v.Name = name
   if err := stream.Send(&v); err != nil {
    log.Fatal(err)
   }
  }
 }
}

var rootCmd = &cobra.Command{
 Use:   "client",
 Short: "gRPC client for Ojichat server",
 Run: func(cmd *cobra.Command, args []string) {
  name, err := cmd.Flags().GetString("name")
  if err != nil {
   log.Fatal(err)
  }
  addr, err := cmd.Flags().GetString("addr")
  if err != nil {
   log.Fatal(err)
  }
  secure, err := cmd.Flags().GetBool("secure")
  if err != nil {
   log.Fatal(err)
  }
  exec(name, addr, secure)
 },
}

func init() {
 rootCmd.Flags().StringP("name", "n", "unknown", "enter your name")
 rootCmd.Flags().StringP("addr", "a", "localhost:50051", "enter server address")
 rootCmd.Flags().BoolP("secure", "s", false, "enable secure access")
 if err := rootCmd.MarkFlagRequired("name"); err != nil {
  log.Fatal(err)
 }
}

func main() {
 if err := rootCmd.Execute(); err != nil {
  log.Fatal(err)
 }
}

デプロイ

デプロイはdockerおよびgcloudコマンドでおこないます。 めちゃくちゃ簡単です。

# build
docker build -t gcr.io/<your-account-id>/ojichat-stream:latest .
# push
docker push gcr.io/<your-account-id>/ojichat-stream:latest
# deploy
gcloud run deploy --image gcr.io/<your-account-id>/ojichat-stream:latest --platform managed --region asia-northeast1 --use-http2 --allow-unauthenticated --port 50051

GCR(Google Container Registory)にイメージをPushしていますが、初回アクセスだと下記のコマンドでの認証まわりの設定が必要です。

# ref: https://cloud.google.com/sdk/gcloud/reference/auth/configure-docker
# ref: https://cloud.google.com/container-registry/docs/advanced-authentication#gcloud-helper
gcloud auth configure-docker

動作確認

クライアントのコマンドは下記のような設定になっています。

$ go run cmd/client/main.go -h
gRPC client for Ojichat server

Usage:
  client [flags]

Flags:
  -a, --addr string   enter server address (default "localhost:50051")
  -h, --help          help for client
  -n, --name string   enter your name (default "unknown")
  -s, --secure        enable secure access

下記のコマンドを4分割したターミナルで実行して、いろんなタイプのおじさんと会話してみたいと思います。

go run cmd/client/main.go --name arai --addr <your-endpoint>:443 -s

Vivaおじさん

左上のおじさんはかなりおとなしいのに比べて、右下のおじさんはかなり騒がしいです。(笑)

まとめ

思ったよりサクッとデプロイできてました。

Cloud Runの特徴の一つでもある「デプロイの容易さ」が体感できました。

あと、初めて使ったんですがOjichatすごいですね。裏側の仕組みも気になるところです。

明日 12/5 は ikeda さんです。よろしくお願いします!

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.