この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
はじめに
前回はWebSocketとGoogle Chartsを使用したクライアントサイドについて書きました。今回はツイートを受信し、指定された単語の出現回数をカウントし、WebSocketでクライアントに送信するサーバサイドについてです。
サーバサイドの実装
サーバサイドはGolangを使用しました。大きく分けてmain処理、WebSocket、共通処理に分かれます。一つずつソースを載せていこうと思います。
main処理
まずはmain処理のソースです。以下のようになります。
main.go
package main
import (
"flag"
"fmt"
"html/template"
"net/http"
"net/url"
"os"
"strings"
"アプリのフォルダパス/util"
"github.com/ChimeraCoder/anaconda"
)
var tweet = make(chan string)
var tweetKeyCountsChan = make(chan tweetKeyCounts)
type tweetKeyCounts struct {
Tweet string `json:"tweet"`
KeyCounts map[string]int `json:"keyCounts"`
}
func createKeyCounts(argKeywords string) map[string]int {
m := make(map[string]int)
keywords := strings.Split(argKeywords, ",")
for _, k := range keywords {
m[k] = 0
}
return m
}
func analyzeTweet(keyCounts map[string]int) {
for {
select {
case t := <-tweet:
fmt.Println("==========")
fmt.Println(t)
for k := range keyCounts {
cnt := strings.Count(t, k)
keyCounts[k] = keyCounts[k] + cnt
}
fmt.Println(keyCounts)
result := tweetKeyCounts{t, keyCounts}
tweetKeyCountsChan <- result
}
}
}
func receiveTweet(hashtag *string) {
conf := util.GetConfig()
anaconda.SetConsumerKey(conf.Twitter.ConsumerKey)
anaconda.SetConsumerSecret(conf.Twitter.ConsumerSecret)
api := anaconda.NewTwitterApi(conf.Twitter.AccessToken, conf.Twitter.AccessTokenSecret)
v := url.Values{}
v.Set("track", *hashtag)
stream := api.PublicStreamFilter(v)
for {
x := <-stream.C
switch tw := x.(type) {
case anaconda.Tweet:
tweet <- tw.Text
case anaconda.StatusDeletionNotice:
// throw
default:
fmt.Fprintln(os.Stderr, "stream unknown type.")
}
}
}
func handler(w http.ResponseWriter, r *http.Request) {
t, err := template.ParseFiles("templates/index.tpl")
util.Check(err)
err = t.Execute(w, nil)
util.Check(err)
}
func initialize() (*string, map[string]int) {
hashtag := flag.String("hashtag", "", "search hashtag include #.")
keywords := flag.String("keywords", "", "alert keywords")
flag.Parse()
keyCounts := createKeyCounts(*keywords)
return hashtag, keyCounts
}
func main() {
hashtag, keyCounts := initialize()
hub := newHub()
go hub.run()
http.HandleFunc("/", handler)
http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
go analyzeTweet(keyCounts)
go receiveTweet(hashtag)
serveWs(hub, w, r)
})
err := http.ListenAndServe(":8080", nil)
util.Check(err)
}
通常のWebアプリとして実装しており、97行目で呼び出す「handler()」メソッドにて前回記述した「index.tpl」を初期表示ページとして返却しています。
98行目ではWebSocketのハンドラを記述しています。その中でツイートの取得処理(receiveTweet)、ツイート内のキーワードのカウント処理(analyzeTweet)を、それぞれGoroutineとして呼び出しています。ツイートの取得にはanacondaを使用し、ストリーム処理で受信します。
WebSocket
WebSocketについてはgorilla/websocket/example/chatを大いに参考にさせて頂きました。「hub.go」についてそのまま使用し、「client.go」についてもデータを送信する部分を変更しました。
hub.go
// Copyright 2013 The Gorilla WebSocket Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
// hub maintains the set of active clients and broadcasts messages to the
// clients.
type Hub struct {
// Registered clients.
clients map[*Client]bool
// Inbound messages from the clients.
broadcast chan []byte
// Register requests from the clients.
register chan *Client
// Unregister requests from clients.
unregister chan *Client
}
func newHub() *Hub {
return &Hub{
broadcast: make(chan []byte),
register: make(chan *Client),
unregister: make(chan *Client),
clients: make(map[*Client]bool),
}
}
func (h *Hub) run() {
for {
select {
case client := <-h.register:
h.clients[client] = true
case client := <-h.unregister:
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
close(client.send)
}
case message := <-h.broadcast:
for client := range h.clients {
select {
case client.send <- message:
default:
close(client.send)
delete(h.clients, client)
}
}
}
}
}
client.go
// Copyright 2013 The Gorilla WebSocket Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
import (
"encoding/json"
"log"
"net/http"
"time"
"github.com/gorilla/websocket"
)
const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
// Maximum message size allowed from peer.
maxMessageSize = 512
)
var (
newline = []byte{'\n'}
space = []byte{' '}
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
// Client is a middleman between the websocket connection and the hub.
type Client struct {
hub *Hub
// The websocket connection.
conn *websocket.Conn
// Buffered channel of outbound messages.
send chan []byte
}
// writePump pumps messages from the hub to the websocket connection.
//
// A goroutine running writePump is started for each connection. The
// application ensures that there is at most one writer to a connection by
// executing all writes from this goroutine.
func (c *Client) writePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
// The hub closed the channel.
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
w, err := c.conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
w.Write(message)
// Add queued chat messages to the current websocket message.
n := len(c.send)
for i := 0; i < n; i++ {
w.Write(newline)
w.Write(<-c.send)
}
if err := w.Close(); err != nil {
return
}
case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
return
}
}
}
}
// broadcast message.
// this method is original.
func (c *Client) broadcast() {
for {
select {
case t := <-tweetKeyCountsChan:
b, _ := json.Marshal(t)
c.hub.broadcast <- b
}
}
}
// serveWs handles websocket requests from the peer.
func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)}
client.hub.register <- client
go client.writePump()
go client.broadcast() // original method call.
}
hub.goについては参考にしたソースそのままなので解説は省略します。client.goについては参考にしたソースの必要な箇所のみを抜粋し、クライアントにデータを送信する「broadcast()」メソッドを実装して呼び出しました。先のmain.goの「analyzeTweet()」にてカウントした結果を「tweetKeyCountsChan」チャネルに投入すると、この「broadcast()」にてWebSocket経由でクライアントに送信される仕組みです。
共通処理
共通処理についてはエラーをチェックするcheck.goと、定義ファイル(config.yml)からTwitter APIのキー情報を取得するconfig.goを実装しました。
util/check.go
package util
func Check(e error) {
if e != nil {
panic(e)
}
}
util/config.go
package util
import (
"io/ioutil"
yaml "gopkg.in/yaml.v2"
)
type config struct {
Twitter struct {
ConsumerKey string `yaml:"consumer_key"`
ConsumerSecret string `yaml:"consumer_secret"`
AccessToken string `yaml:"access_token"`
AccessTokenSecret string `yaml:"access_token_secret"`
}
}
var conf *config
func GetConfig() *config {
if conf == nil {
buf, err := ioutil.ReadFile("config.yml")
Check(err)
err = yaml.Unmarshal(buf, &conf)
Check(err)
}
return conf
}
config.yml
twitter:
consumer_key: your consumer key
consumer_secret: your consumer secret
access_token: your access token
access_token_secret: your access token secret
起動方法
前回と合わせ、以上でサンプルを実装することができました。これらの起動方法ですが、以下のコマンドとなります。
$ go build
$ ./アプリ名 --hashtag=#hashtag --keywords=key1,key2,key3・・・ #ハッシュタグは「#」から始めること
まとめ
前回と合わせてリアルタイムでツイートを受信し、解析結果をグラフで表示することができました。リアルタイムな処理をそれほど多くないソース量で実現できたかと思います。
参考サイト
以下のサイトを参考にさせて頂きました。ありがとうございました。
gorilla/websocket/example/chat
golang + anacondaでTwitter Streamimg APIを使う
golang で Twitter の UserStream を使う