WebSocketとGoroutineによるリアルタイムのグラフ表示 – (2)サーバサイド
はじめに
前回はWebSocketとGoogle Chartsを使用したクライアントサイドについて書きました。今回はツイートを受信し、指定された単語の出現回数をカウントし、WebSocketでクライアントに送信するサーバサイドについてです。
サーバサイドの実装
サーバサイドはGolangを使用しました。大きく分けてmain処理、WebSocket、共通処理に分かれます。一つずつソースを載せていこうと思います。
main処理
まずはmain処理のソースです。以下のようになります。
main.go
package main import ( "flag" "fmt" "html/template" "net/http" "net/url" "os" "strings" "アプリのフォルダパス/util" "github.com/ChimeraCoder/anaconda" ) var tweet = make(chan string) var tweetKeyCountsChan = make(chan tweetKeyCounts) type tweetKeyCounts struct { Tweet string `json:"tweet"` KeyCounts map[string]int `json:"keyCounts"` } func createKeyCounts(argKeywords string) map[string]int { m := make(map[string]int) keywords := strings.Split(argKeywords, ",") for _, k := range keywords { m[k] = 0 } return m } func analyzeTweet(keyCounts map[string]int) { for { select { case t := <-tweet: fmt.Println("==========") fmt.Println(t) for k := range keyCounts { cnt := strings.Count(t, k) keyCounts[k] = keyCounts[k] + cnt } fmt.Println(keyCounts) result := tweetKeyCounts{t, keyCounts} tweetKeyCountsChan <- result } } } func receiveTweet(hashtag *string) { conf := util.GetConfig() anaconda.SetConsumerKey(conf.Twitter.ConsumerKey) anaconda.SetConsumerSecret(conf.Twitter.ConsumerSecret) api := anaconda.NewTwitterApi(conf.Twitter.AccessToken, conf.Twitter.AccessTokenSecret) v := url.Values{} v.Set("track", *hashtag) stream := api.PublicStreamFilter(v) for { x := <-stream.C switch tw := x.(type) { case anaconda.Tweet: tweet <- tw.Text case anaconda.StatusDeletionNotice: // throw default: fmt.Fprintln(os.Stderr, "stream unknown type.") } } } func handler(w http.ResponseWriter, r *http.Request) { t, err := template.ParseFiles("templates/index.tpl") util.Check(err) err = t.Execute(w, nil) util.Check(err) } func initialize() (*string, map[string]int) { hashtag := flag.String("hashtag", "", "search hashtag include #.") keywords := flag.String("keywords", "", "alert keywords") flag.Parse() keyCounts := createKeyCounts(*keywords) return hashtag, keyCounts } func main() { hashtag, keyCounts := initialize() hub := newHub() go hub.run() http.HandleFunc("/", handler) http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) { go analyzeTweet(keyCounts) go receiveTweet(hashtag) serveWs(hub, w, r) }) err := http.ListenAndServe(":8080", nil) util.Check(err) }
通常のWebアプリとして実装しており、97行目で呼び出す「handler()」メソッドにて前回記述した「index.tpl」を初期表示ページとして返却しています。
98行目ではWebSocketのハンドラを記述しています。その中でツイートの取得処理(receiveTweet)、ツイート内のキーワードのカウント処理(analyzeTweet)を、それぞれGoroutineとして呼び出しています。ツイートの取得にはanacondaを使用し、ストリーム処理で受信します。
WebSocket
WebSocketについてはgorilla/websocket/example/chatを大いに参考にさせて頂きました。「hub.go」についてそのまま使用し、「client.go」についてもデータを送信する部分を変更しました。
hub.go
// Copyright 2013 The Gorilla WebSocket Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. package main // hub maintains the set of active clients and broadcasts messages to the // clients. type Hub struct { // Registered clients. clients map[*Client]bool // Inbound messages from the clients. broadcast chan []byte // Register requests from the clients. register chan *Client // Unregister requests from clients. unregister chan *Client } func newHub() *Hub { return &Hub{ broadcast: make(chan []byte), register: make(chan *Client), unregister: make(chan *Client), clients: make(map[*Client]bool), } } func (h *Hub) run() { for { select { case client := <-h.register: h.clients[client] = true case client := <-h.unregister: if _, ok := h.clients[client]; ok { delete(h.clients, client) close(client.send) } case message := <-h.broadcast: for client := range h.clients { select { case client.send <- message: default: close(client.send) delete(h.clients, client) } } } } }
client.go
// Copyright 2013 The Gorilla WebSocket Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. package main import ( "encoding/json" "log" "net/http" "time" "github.com/gorilla/websocket" ) const ( // Time allowed to write a message to the peer. writeWait = 10 * time.Second // Time allowed to read the next pong message from the peer. pongWait = 60 * time.Second // Send pings to peer with this period. Must be less than pongWait. pingPeriod = (pongWait * 9) / 10 // Maximum message size allowed from peer. maxMessageSize = 512 ) var ( newline = []byte{'\n'} space = []byte{' '} ) var upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, } // Client is a middleman between the websocket connection and the hub. type Client struct { hub *Hub // The websocket connection. conn *websocket.Conn // Buffered channel of outbound messages. send chan []byte } // writePump pumps messages from the hub to the websocket connection. // // A goroutine running writePump is started for each connection. The // application ensures that there is at most one writer to a connection by // executing all writes from this goroutine. func (c *Client) writePump() { ticker := time.NewTicker(pingPeriod) defer func() { ticker.Stop() c.conn.Close() }() for { select { case message, ok := <-c.send: c.conn.SetWriteDeadline(time.Now().Add(writeWait)) if !ok { // The hub closed the channel. c.conn.WriteMessage(websocket.CloseMessage, []byte{}) return } w, err := c.conn.NextWriter(websocket.TextMessage) if err != nil { return } w.Write(message) // Add queued chat messages to the current websocket message. n := len(c.send) for i := 0; i < n; i++ { w.Write(newline) w.Write(<-c.send) } if err := w.Close(); err != nil { return } case <-ticker.C: c.conn.SetWriteDeadline(time.Now().Add(writeWait)) if err := c.conn.WriteMessage(websocket.PingMessage, []byte{}); err != nil { return } } } } // broadcast message. // this method is original. func (c *Client) broadcast() { for { select { case t := <-tweetKeyCountsChan: b, _ := json.Marshal(t) c.hub.broadcast <- b } } } // serveWs handles websocket requests from the peer. func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) { conn, err := upgrader.Upgrade(w, r, nil) if err != nil { log.Println(err) return } client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)} client.hub.register <- client go client.writePump() go client.broadcast() // original method call. }
hub.goについては参考にしたソースそのままなので解説は省略します。client.goについては参考にしたソースの必要な箇所のみを抜粋し、クライアントにデータを送信する「broadcast()」メソッドを実装して呼び出しました。先のmain.goの「analyzeTweet()」にてカウントした結果を「tweetKeyCountsChan」チャネルに投入すると、この「broadcast()」にてWebSocket経由でクライアントに送信される仕組みです。
共通処理
共通処理についてはエラーをチェックするcheck.goと、定義ファイル(config.yml)からTwitter APIのキー情報を取得するconfig.goを実装しました。
util/check.go
package util func Check(e error) { if e != nil { panic(e) } }
util/config.go
package util import ( "io/ioutil" yaml "gopkg.in/yaml.v2" ) type config struct { Twitter struct { ConsumerKey string `yaml:"consumer_key"` ConsumerSecret string `yaml:"consumer_secret"` AccessToken string `yaml:"access_token"` AccessTokenSecret string `yaml:"access_token_secret"` } } var conf *config func GetConfig() *config { if conf == nil { buf, err := ioutil.ReadFile("config.yml") Check(err) err = yaml.Unmarshal(buf, &conf) Check(err) } return conf }
config.yml
twitter: consumer_key: your consumer key consumer_secret: your consumer secret access_token: your access token access_token_secret: your access token secret
起動方法
前回と合わせ、以上でサンプルを実装することができました。これらの起動方法ですが、以下のコマンドとなります。
$ go build $ ./アプリ名 --hashtag=#hashtag --keywords=key1,key2,key3・・・ #ハッシュタグは「#」から始めること
まとめ
前回と合わせてリアルタイムでツイートを受信し、解析結果をグラフで表示することができました。リアルタイムな処理をそれほど多くないソース量で実現できたかと思います。
参考サイト
以下のサイトを参考にさせて頂きました。ありがとうございました。
gorilla/websocket/example/chat
golang + anacondaでTwitter Streamimg APIを使う
golang で Twitter の UserStream を使う