WebSocketとGoroutineによるリアルタイムのグラフ表示 – (2)サーバサイド

WebSocketとGoroutineによるリアルタイムのグラフ表示 – (2)サーバサイド

Clock Icon2017.05.23

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

前回はWebSocketとGoogle Chartsを使用したクライアントサイドについて書きました。今回はツイートを受信し、指定された単語の出現回数をカウントし、WebSocketでクライアントに送信するサーバサイドについてです。

サーバサイドの実装

サーバサイドはGolangを使用しました。大きく分けてmain処理、WebSocket、共通処理に分かれます。一つずつソースを載せていこうと思います。

main処理

まずはmain処理のソースです。以下のようになります。

main.go
package main

import (
	"flag"
	"fmt"
	"html/template"
	"net/http"
	"net/url"
	"os"
	"strings"

	"アプリのフォルダパス/util"
	"github.com/ChimeraCoder/anaconda"
)

var tweet = make(chan string)
var tweetKeyCountsChan = make(chan tweetKeyCounts)

type tweetKeyCounts struct {
	Tweet     string         `json:"tweet"`
	KeyCounts map[string]int `json:"keyCounts"`
}

func createKeyCounts(argKeywords string) map[string]int {
	m := make(map[string]int)
	keywords := strings.Split(argKeywords, ",")
	for _, k := range keywords {
		m[k] = 0
	}
	return m
}

func analyzeTweet(keyCounts map[string]int) {
	for {
		select {
		case t := <-tweet:
			fmt.Println("==========")
			fmt.Println(t)

			for k := range keyCounts {
				cnt := strings.Count(t, k)
				keyCounts[k] = keyCounts[k] + cnt
			}

			fmt.Println(keyCounts)

			result := tweetKeyCounts{t, keyCounts}
			tweetKeyCountsChan <- result
		}
	}
}

func receiveTweet(hashtag *string) {
	conf := util.GetConfig()
	anaconda.SetConsumerKey(conf.Twitter.ConsumerKey)
	anaconda.SetConsumerSecret(conf.Twitter.ConsumerSecret)
	api := anaconda.NewTwitterApi(conf.Twitter.AccessToken, conf.Twitter.AccessTokenSecret)

	v := url.Values{}
	v.Set("track", *hashtag)
	stream := api.PublicStreamFilter(v)

	for {
		x := <-stream.C
		switch tw := x.(type) {
		case anaconda.Tweet:
			tweet <- tw.Text
		case anaconda.StatusDeletionNotice:
			// throw
		default:
			fmt.Fprintln(os.Stderr, "stream unknown type.")
		}
	}
}

func handler(w http.ResponseWriter, r *http.Request) {
	t, err := template.ParseFiles("templates/index.tpl")
	util.Check(err)
	err = t.Execute(w, nil)
	util.Check(err)
}

func initialize() (*string, map[string]int) {
	hashtag := flag.String("hashtag", "", "search hashtag include #.")
	keywords := flag.String("keywords", "", "alert keywords")
	flag.Parse()
	keyCounts := createKeyCounts(*keywords)
	return hashtag, keyCounts
}

func main() {
	hashtag, keyCounts := initialize()

	hub := newHub()
	go hub.run()

	http.HandleFunc("/", handler)
	http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
		go analyzeTweet(keyCounts)
		go receiveTweet(hashtag)
		serveWs(hub, w, r)
	})
	err := http.ListenAndServe(":8080", nil)
	util.Check(err)
}

通常のWebアプリとして実装しており、97行目で呼び出す「handler()」メソッドにて前回記述した「index.tpl」を初期表示ページとして返却しています。
98行目ではWebSocketのハンドラを記述しています。その中でツイートの取得処理(receiveTweet)、ツイート内のキーワードのカウント処理(analyzeTweet)を、それぞれGoroutineとして呼び出しています。ツイートの取得にはanacondaを使用し、ストリーム処理で受信します。

WebSocket

WebSocketについてはgorilla/websocket/example/chatを大いに参考にさせて頂きました。「hub.go」についてそのまま使用し、「client.go」についてもデータを送信する部分を変更しました。

hub.go
// Copyright 2013 The Gorilla WebSocket Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package main

// hub maintains the set of active clients and broadcasts messages to the
// clients.
type Hub struct {
	// Registered clients.
	clients map[*Client]bool

	// Inbound messages from the clients.
	broadcast chan []byte

	// Register requests from the clients.
	register chan *Client

	// Unregister requests from clients.
	unregister chan *Client
}

func newHub() *Hub {
	return &Hub{
		broadcast:  make(chan []byte),
		register:   make(chan *Client),
		unregister: make(chan *Client),
		clients:    make(map[*Client]bool),
	}
}

func (h *Hub) run() {
	for {
		select {
		case client := <-h.register:
			h.clients[client] = true
		case client := <-h.unregister:
			if _, ok := h.clients[client]; ok {
				delete(h.clients, client)
				close(client.send)
			}
		case message := <-h.broadcast:
			for client := range h.clients {
				select {
				case client.send <- message:
				default:
					close(client.send)
					delete(h.clients, client)
				}
			}
		}
	}
}

hub.goより

client.go
// Copyright 2013 The Gorilla WebSocket Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package main

import (
	"encoding/json"
	"log"
	"net/http"
	"time"

	"github.com/gorilla/websocket"
)

const (
	// Time allowed to write a message to the peer.
	writeWait = 10 * time.Second

	// Time allowed to read the next pong message from the peer.
	pongWait = 60 * time.Second

	// Send pings to peer with this period. Must be less than pongWait.
	pingPeriod = (pongWait * 9) / 10

	// Maximum message size allowed from peer.
	maxMessageSize = 512
)

var (
	newline = []byte{'\n'}
	space   = []byte{' '}
)

var upgrader = websocket.Upgrader{
	ReadBufferSize:  1024,
	WriteBufferSize: 1024,
}

// Client is a middleman between the websocket connection and the hub.
type Client struct {
	hub *Hub

	// The websocket connection.
	conn *websocket.Conn

	// Buffered channel of outbound messages.
	send chan []byte
}

// writePump pumps messages from the hub to the websocket connection.
//
// A goroutine running writePump is started for each connection. The
// application ensures that there is at most one writer to a connection by
// executing all writes from this goroutine.
func (c *Client) writePump() {
	ticker := time.NewTicker(pingPeriod)
	defer func() {
		ticker.Stop()
		c.conn.Close()
	}()
	for {
		select {
		case message, ok := <-c.send:
			c.conn.SetWriteDeadline(time.Now().Add(writeWait))
			if !ok {
				// The hub closed the channel.
				c.conn.WriteMessage(websocket.CloseMessage, []byte{})
				return
			}

			w, err := c.conn.NextWriter(websocket.TextMessage)
			if err != nil {
				return
			}
			w.Write(message)

			// Add queued chat messages to the current websocket message.
			n := len(c.send)
			for i := 0; i < n; i++ {
				w.Write(newline)
				w.Write(<-c.send)
			}

			if err := w.Close(); err != nil {
				return
			}
		case <-ticker.C:
			c.conn.SetWriteDeadline(time.Now().Add(writeWait))
			if err := c.conn.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
				return
			}
		}
	}
}

// broadcast message.
// this method is original.
func (c *Client) broadcast() {
	for {
		select {
		case t := <-tweetKeyCountsChan:
			b, _ := json.Marshal(t)
			c.hub.broadcast <- b
		}
	}
}

// serveWs handles websocket requests from the peer.
func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
	conn, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		log.Println(err)
		return
	}
	client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)}
	client.hub.register <- client
	go client.writePump()
	go client.broadcast() // original method call.
}

client.goより一部改変

hub.goについては参考にしたソースそのままなので解説は省略します。client.goについては参考にしたソースの必要な箇所のみを抜粋し、クライアントにデータを送信する「broadcast()」メソッドを実装して呼び出しました。先のmain.goの「analyzeTweet()」にてカウントした結果を「tweetKeyCountsChan」チャネルに投入すると、この「broadcast()」にてWebSocket経由でクライアントに送信される仕組みです。

共通処理

共通処理についてはエラーをチェックするcheck.goと、定義ファイル(config.yml)からTwitter APIのキー情報を取得するconfig.goを実装しました。

util/check.go
package util

func Check(e error) {
	if e != nil {
		panic(e)
	}
}
util/config.go
package util

import (
	"io/ioutil"

	yaml "gopkg.in/yaml.v2"
)

type config struct {
	Twitter struct {
		ConsumerKey       string `yaml:"consumer_key"`
		ConsumerSecret    string `yaml:"consumer_secret"`
		AccessToken       string `yaml:"access_token"`
		AccessTokenSecret string `yaml:"access_token_secret"`
	}
}

var conf *config

func GetConfig() *config {
	if conf == nil {
		buf, err := ioutil.ReadFile("config.yml")
		Check(err)
		err = yaml.Unmarshal(buf, &conf)
		Check(err)
	}
	return conf
}
config.yml
    twitter:
        consumer_key: your consumer key
        consumer_secret: your consumer secret
        access_token: your access token
        access_token_secret: your access token secret

起動方法

前回と合わせ、以上でサンプルを実装することができました。これらの起動方法ですが、以下のコマンドとなります。

$ go build
$ ./アプリ名 --hashtag=#hashtag --keywords=key1,key2,key3・・・ #ハッシュタグは「#」から始めること

まとめ

前回と合わせてリアルタイムでツイートを受信し、解析結果をグラフで表示することができました。リアルタイムな処理をそれほど多くないソース量で実現できたかと思います。

参考サイト

以下のサイトを参考にさせて頂きました。ありがとうございました。
gorilla/websocket/example/chat
golang + anacondaでTwitter Streamimg APIを使う
golang で Twitter の UserStream を使う

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.