この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
はじめに
最近はApache Sparkをちょこちょこと触っている、t.hondaです。前回に引き続きストリーミング処理についてです。今回は複数のキーワードでツイートを取得し、それぞれのツイートをユーザIDでJOINしてみたいと思います。
ソースコード
ということで、いきなりソースコードです。尚、ScalaとSparkのバージョンは以下の通りです。
- Scala 2.10.4
- Apache Spark 1.2.0
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext._
object Startup{
def getTweets(ssc: StreamingContext, keywords: Array[String]) = {
val stream = TwitterUtils.createStream(ssc, None, keywords)
val tweets = stream.map(status => {
val hashmap = new scala.collection.mutable.HashMap[String, Object]()
hashmap.put("id", status.getUser.getId.toString)
hashmap.put("user_name", status.getUser.getName)
hashmap.put("user_lang", status.getUser.getLang)
hashmap.put("text", status.getText)
hashmap.put("create_at", new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssX").format(status.getCreatedAt))
(status.getUser.getId.toString, hashmap)
}).window(Seconds(3600), Seconds(2))
tweets.foreachRDD(rdd => {
rdd.foreach {r => {
println("keywords = " + keywords.mkString(","))
println(r)
}
}
})
tweets
}
def main(args: Array[String]) :Unit = {
// Set Twitter Access Keys
val config = new java.util.Properties
config.load(this.getClass().getClassLoader().getResourceAsStream("config.properties"))
System.setProperty("twitter4j.oauth.consumerKey", config.get("twitter_consumerKey").toString)
System.setProperty("twitter4j.oauth.consumerSecret", config.get("twitter_consumerSecret").toString)
System.setProperty("twitter4j.oauth.accessToken", config.get("twitter_accessToken").toString)
System.setProperty("twitter4j.oauth.accessTokenSecret", config.get("twitter_accessTokenSecret").toString)
// Create StreamingContext
val sparkConf = new SparkConf().setAppName("TwitterPopularTags").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
// Get Tweets
val tweets1 = getTweets(ssc, Array("iPhone"))
// Get Tweets
val tweets2 = getTweets(ssc, Array("android"))
// Join
val joined = tweets1.join(tweets2)
joined.foreachRDD(rdd => {
rdd.foreach {r => {
println("joined RDD")
println(r)
}
}
})
// Start
println("\n====================== Start. ======================")
ssc.start()
ssc.awaitTermination()
}
}
ポイントとなるのはツイートを取得するgetTweets()メソッドと、51行目付近のJOINを行っているところです。それ以外の部分については前回と同じなので、そちらを参考にしてください。
ツイートの取得(getTweetsメソッド)
上記ソースの46・49行目でgetTweets()メソッドを呼び出し、キーワードを含むツイートを取得しています。引数としては、一つ目でStreamingContextのインスタンスを、二つ目で検索するキーワードを渡しています。
getTweets()の中身を見てみると、先ず「TwitterUtils.createStream」でTwitterのストリームを生成しています。次に「stream.map」でストリームをバラしています。13〜17行目ではストリームをバラして取得したツイートより、ユーザIDや本文を取得してhashに格納しています。18行目がポイントとなるのですが、後でツイート同士をJOINできるようにkey-valueの形にしています。具体的にはkeyにユーザID、valueにツイートを格納したhashを指定しています。ここまでで
- ストリームより連続してツイートを取得
- ツイートの中身をhashに格納
- ユーザIDをキー・hashをValueという形のDStreamを生成する
という所までできました。
次に「window」メソッドにてツイートを取得する時間、時間軸をスライドさせる幅を指定しています。今回はそれぞれ3600秒、2秒を指定しています。
21行目からは、取得したツイートを出力しています。
ツイートのJOIN
46行目・49行目で「getTweets」メソッドを呼び出し、「iPhone」「Android」のキーワードを含むツイートを取得しました。上記の「getTweets」メソッド内で、ツイート(を格納したDStream)のkeyにはユーザIDが設定されています。52行目では「join」メソッドを用いて、2つのツイートを文字通りJOINしています。53行目以降で、JOINした結果を出力しています。
まとめ
ストリーミングによる連続したデータの取得、その中での別々のキーワードの両方に合致するデータの抽出等に、今回のサンプルは応用できるのではと考えております。今回はJOINを扱ってみましたが、次回からはApache Sparkによるデータの基礎となるRDDについて扱ってみたいと思います。前回にも書きましたが、DStream自体がRDDが連続したようなものと言えます。Apache Sparkにて何かを行う際の一助になれば幸いです。