[Apache Spark]ストリーミング処理でツイートを取得してJOINする
はじめに
最近はApache Sparkをちょこちょこと触っている、t.hondaです。前回に引き続きストリーミング処理についてです。今回は複数のキーワードでツイートを取得し、それぞれのツイートをユーザIDでJOINしてみたいと思います。
ソースコード
ということで、いきなりソースコードです。尚、ScalaとSparkのバージョンは以下の通りです。
- Scala 2.10.4
- Apache Spark 1.2.0
import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.SparkContext._ import org.apache.spark.streaming.twitter._ import org.apache.spark.SparkConf import org.apache.spark.streaming.StreamingContext._ object Startup{ def getTweets(ssc: StreamingContext, keywords: Array[String]) = { val stream = TwitterUtils.createStream(ssc, None, keywords) val tweets = stream.map(status => { val hashmap = new scala.collection.mutable.HashMap[String, Object]() hashmap.put("id", status.getUser.getId.toString) hashmap.put("user_name", status.getUser.getName) hashmap.put("user_lang", status.getUser.getLang) hashmap.put("text", status.getText) hashmap.put("create_at", new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssX").format(status.getCreatedAt)) (status.getUser.getId.toString, hashmap) }).window(Seconds(3600), Seconds(2)) tweets.foreachRDD(rdd => { rdd.foreach {r => { println("keywords = " + keywords.mkString(",")) println(r) } } }) tweets } def main(args: Array[String]) :Unit = { // Set Twitter Access Keys val config = new java.util.Properties config.load(this.getClass().getClassLoader().getResourceAsStream("config.properties")) System.setProperty("twitter4j.oauth.consumerKey", config.get("twitter_consumerKey").toString) System.setProperty("twitter4j.oauth.consumerSecret", config.get("twitter_consumerSecret").toString) System.setProperty("twitter4j.oauth.accessToken", config.get("twitter_accessToken").toString) System.setProperty("twitter4j.oauth.accessTokenSecret", config.get("twitter_accessTokenSecret").toString) // Create StreamingContext val sparkConf = new SparkConf().setAppName("TwitterPopularTags").setMaster("local[*]") val ssc = new StreamingContext(sparkConf, Seconds(2)) // Get Tweets val tweets1 = getTweets(ssc, Array("iPhone")) // Get Tweets val tweets2 = getTweets(ssc, Array("android")) // Join val joined = tweets1.join(tweets2) joined.foreachRDD(rdd => { rdd.foreach {r => { println("joined RDD") println(r) } } }) // Start println("\n====================== Start. ======================") ssc.start() ssc.awaitTermination() } }
ポイントとなるのはツイートを取得するgetTweets()メソッドと、51行目付近のJOINを行っているところです。それ以外の部分については前回と同じなので、そちらを参考にしてください。
ツイートの取得(getTweetsメソッド)
上記ソースの46・49行目でgetTweets()メソッドを呼び出し、キーワードを含むツイートを取得しています。引数としては、一つ目でStreamingContextのインスタンスを、二つ目で検索するキーワードを渡しています。
getTweets()の中身を見てみると、先ず「TwitterUtils.createStream」でTwitterのストリームを生成しています。次に「stream.map」でストリームをバラしています。13〜17行目ではストリームをバラして取得したツイートより、ユーザIDや本文を取得してhashに格納しています。18行目がポイントとなるのですが、後でツイート同士をJOINできるようにkey-valueの形にしています。具体的にはkeyにユーザID、valueにツイートを格納したhashを指定しています。ここまでで
- ストリームより連続してツイートを取得
- ツイートの中身をhashに格納
- ユーザIDをキー・hashをValueという形のDStreamを生成する
という所までできました。
次に「window」メソッドにてツイートを取得する時間、時間軸をスライドさせる幅を指定しています。今回はそれぞれ3600秒、2秒を指定しています。
21行目からは、取得したツイートを出力しています。
ツイートのJOIN
46行目・49行目で「getTweets」メソッドを呼び出し、「iPhone」「Android」のキーワードを含むツイートを取得しました。上記の「getTweets」メソッド内で、ツイート(を格納したDStream)のkeyにはユーザIDが設定されています。52行目では「join」メソッドを用いて、2つのツイートを文字通りJOINしています。53行目以降で、JOINした結果を出力しています。
まとめ
ストリーミングによる連続したデータの取得、その中での別々のキーワードの両方に合致するデータの抽出等に、今回のサンプルは応用できるのではと考えております。今回はJOINを扱ってみましたが、次回からはApache Sparkによるデータの基礎となるRDDについて扱ってみたいと思います。前回にも書きましたが、DStream自体がRDDが連続したようなものと言えます。Apache Sparkにて何かを行う際の一助になれば幸いです。