[Apache Spark]ストリーミング処理でツイートを取得してJOINする

2015.03.16

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

最近はApache Sparkをちょこちょこと触っている、t.hondaです。前回に引き続きストリーミング処理についてです。今回は複数のキーワードでツイートを取得し、それぞれのツイートをユーザIDでJOINしてみたいと思います。

ソースコード

ということで、いきなりソースコードです。尚、ScalaとSparkのバージョンは以下の通りです。

  • Scala 2.10.4
  • Apache Spark 1.2.0

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext._

object Startup{
  def getTweets(ssc: StreamingContext, keywords: Array[String]) = {
    val stream = TwitterUtils.createStream(ssc, None, keywords)

    val tweets = stream.map(status => {
      val hashmap = new scala.collection.mutable.HashMap[String, Object]()
        hashmap.put("id", status.getUser.getId.toString)
        hashmap.put("user_name", status.getUser.getName)
        hashmap.put("user_lang", status.getUser.getLang)
        hashmap.put("text", status.getText)
        hashmap.put("create_at", new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssX").format(status.getCreatedAt))
      (status.getUser.getId.toString, hashmap)
    }).window(Seconds(3600), Seconds(2))

    tweets.foreachRDD(rdd => {
      rdd.foreach {r => {
          println("keywords = " + keywords.mkString(","))
          println(r)
        }
      }
    })

    tweets
  }

  def main(args: Array[String]) :Unit = {
    // Set Twitter Access Keys
    val config = new java.util.Properties
    config.load(this.getClass().getClassLoader().getResourceAsStream("config.properties"))
    System.setProperty("twitter4j.oauth.consumerKey", config.get("twitter_consumerKey").toString)
    System.setProperty("twitter4j.oauth.consumerSecret", config.get("twitter_consumerSecret").toString)
    System.setProperty("twitter4j.oauth.accessToken", config.get("twitter_accessToken").toString)
    System.setProperty("twitter4j.oauth.accessTokenSecret", config.get("twitter_accessTokenSecret").toString)

    // Create StreamingContext
    val sparkConf = new SparkConf().setAppName("TwitterPopularTags").setMaster("local[*]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
 
    // Get Tweets
    val tweets1 = getTweets(ssc, Array("iPhone"))
 
    // Get Tweets
    val tweets2 = getTweets(ssc, Array("android"))

    // Join
    val joined = tweets1.join(tweets2)
    joined.foreachRDD(rdd => {
      rdd.foreach {r => {
          println("joined RDD")
          println(r)
        }
      }
    })
 
    // Start
    println("\n====================== Start. ======================")
    ssc.start()
    ssc.awaitTermination()
  }
}

ポイントとなるのはツイートを取得するgetTweets()メソッドと、51行目付近のJOINを行っているところです。それ以外の部分については前回と同じなので、そちらを参考にしてください。

ツイートの取得(getTweetsメソッド)

上記ソースの46・49行目でgetTweets()メソッドを呼び出し、キーワードを含むツイートを取得しています。引数としては、一つ目でStreamingContextのインスタンスを、二つ目で検索するキーワードを渡しています。

getTweets()の中身を見てみると、先ず「TwitterUtils.createStream」でTwitterのストリームを生成しています。次に「stream.map」でストリームをバラしています。13〜17行目ではストリームをバラして取得したツイートより、ユーザIDや本文を取得してhashに格納しています。18行目がポイントとなるのですが、後でツイート同士をJOINできるようにkey-valueの形にしています。具体的にはkeyにユーザID、valueにツイートを格納したhashを指定しています。ここまでで

  1. ストリームより連続してツイートを取得
  2. ツイートの中身をhashに格納
  3. ユーザIDをキー・hashをValueという形のDStreamを生成する

という所までできました。

次に「window」メソッドにてツイートを取得する時間、時間軸をスライドさせる幅を指定しています。今回はそれぞれ3600秒、2秒を指定しています。

21行目からは、取得したツイートを出力しています。

ツイートのJOIN

46行目・49行目で「getTweets」メソッドを呼び出し、「iPhone」「Android」のキーワードを含むツイートを取得しました。上記の「getTweets」メソッド内で、ツイート(を格納したDStream)のkeyにはユーザIDが設定されています。52行目では「join」メソッドを用いて、2つのツイートを文字通りJOINしています。53行目以降で、JOINした結果を出力しています。

まとめ

ストリーミングによる連続したデータの取得、その中での別々のキーワードの両方に合致するデータの抽出等に、今回のサンプルは応用できるのではと考えております。今回はJOINを扱ってみましたが、次回からはApache Sparkによるデータの基礎となるRDDについて扱ってみたいと思います。前回にも書きましたが、DStream自体がRDDが連続したようなものと言えます。Apache Sparkにて何かを行う際の一助になれば幸いです。