[Apache Spark]ストリーミング処理で直近の人気ハッシュタグを取得する

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

Apache Sparkの用途の一つとして、ストリーミング処理があります。今回はその例として、Twitterから直近の人気のハッシュタグを取得する処理を取り上げてみたいと思います。

処理の流れと用語について

ソースをお見せする前に、大まかな処理の流れと、用語について説明したいと思います。先ず処理の流れですが、以下のようになります。

  1. Streamの作成
  2. ハッシュタグを持つRDDの取得
  3. DStreamの取得
  4. DStream内のRDDよりハッシュタグを取得


見慣れない用語ばかりかと思いますが、以下で処理の流れに沿って解説したと思います。

1.Streamの作成

データを連続して取得するためのStreamを作成する処理です。今回はTwitterよりデータを取得するストリームを作りますが、他にも Kinesis、Kafka、Flume、ZeroMQ、TCPなどから取得することが出来るようです。

2.ハッシュタグを持つRDDの取得

RDD(Resilient Distributed Dataset)とは不変(イミュータブル)で並列実行可能な(分割された)コレクション、です。以下にドキュメントの説明を抜粋します。

A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel.

org.apache.spark.rdd

今のところはストリームより取得したデータから(だけではないが)必要なモノだけを格納するコレクション、くらいに考えておけばいいかと思います(すいません、今ひとつ私も分かっていないです)。尚、並列実行可能と書いたのは、パフォーマンスを上げる為に分散処理させることが可能という意味です。

今回はTwitterのストリーミングデータよりハッシュタグを抜き出し、RDDに格納して取得します。

3.DStreamの取得

DStreamとはストリームの一種で、インプットデータのストリームそのものや、インプットデータを連続して処理するストリームです。Apache Sparkの内部ではRDDを連続して並べたものとして実装されています。こちらもドキュメントの説明を抜粋します。

Discretized Stream or DStream is the basic abstraction provided by Spark Streaming. It represents a continuous stream of data, either the input data stream received from source, or the processed data stream generated by transforming the input stream. Internally, a DStream is represented by a continuous series of RDDs,

Spark Streaming Programming Guide

今回は先に取得したハッシュタグを含むRDDより、直近60秒と10秒につぶやかれたハッシュタグを取得するDStreamを作成します。

4.DStream内のRDDよりハッシュタグを取得

最後にハッシュタグを取得する処理です。上でDStreamは「RDDを連続して並べたもの」と書きましたが、DStreamよりRDDを取得し、RDDより上位10件のハッシュタグを取得してターミナルに出力します。

ソースと実行結果について

ではソースコードです。GitHub上のサンプルのほぼそのままですが、そのままではローカルで動かなかったので改修をしているのと、コメントを適時加えています。

1.実行環境

Apache SparkはScala 2.10系で動作します。今回は以下の環境で実行しました。

  • Scala 2.10.4
  • sbt 0.13
  • Apache Spark 1.2.0

2.ソースコード

以下のようになります。

/src/main/scala/Startup.scala
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext._

object Startup{
  def main(args: Array[String]) :Unit = {
    
    // Set Twitter Access Keys
    val config = new java.util.Properties
    config.load(this.getClass().getClassLoader().getResourceAsStream("config.properties"))
    System.setProperty("twitter4j.oauth.consumerKey", config.get("twitter_consumerKey").toString)
    System.setProperty("twitter4j.oauth.consumerSecret", config.get("twitter_consumerSecret").toString)
    System.setProperty("twitter4j.oauth.accessToken", config.get("twitter_accessToken").toString)
    System.setProperty("twitter4j.oauth.accessTokenSecret", config.get("twitter_accessTokenSecret").toString)

    // Create Stream
    val filters = Array("Coffee", "Tea", "Alcohol")
    val sparkConf = new SparkConf().setAppName("TwitterPopularTags").setMaster("local[*]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    val stream = TwitterUtils.createStream(ssc, None, filters)

    // Get RDD that has hashtags
    val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))

    // Get DStream
    val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
                        .map{case (topic, count) => (count, topic)}
                        .transform(_.sortByKey(false))

    val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10))
                        .map{case (topic, count) => (count, topic)}
                        .transform(_.sortByKey(false))

    // Print popular hashtags
    topCounts60.foreachRDD(rdd => {
      val topList = rdd.take(10)
      println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
      topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
    })

    topCounts10.foreachRDD(rdd => {
      val topList = rdd.take(10)
      println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
      topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
    })

    println("\n====================== Start. ======================")
    ssc.start()
    ssc.awaitTermination()
  }
}

だいたい先で説明した処理の流れに沿っていると思います。以下にソースのポイントについて書きます。

「// Set Twitter Access Keys〜」ではTwitterへのアクセスキーを設定しています。アクセスキー自体は外出ししたconfigファイルに持たせました。

「// Create Streamから」ではストリームを作成しています。ポイントしてはローカルで動かすため「.setMaster("local[*]")」を行っていることです。このとき「local」としてしまうと最後のRDDよりハッシュタグを取得するところが動かなくなり、途方にくれることになります(経験者談)。以下に設定値についてドキュメントから抜粋します。

When running a Spark Streaming program locally, do not use “local” or “local[1]” as the master URL. Either of these means that only one thread will be used for running tasks locally.

Spark Streaming Programming Guide

また「StreamingContext(sparkConf, Seconds(2))」で2秒毎にストリームにアクセスするよう設定しています。

「// Get DStream〜」ではDStreamを取得しています。「reduceByKeyAndWindow」の引数で直近60秒・10秒のデータを取得してソートしています。

「// Print popular hashtags〜」ではDStreamよりRDDを取得し、「rdd.take(10)」で上位10件を取得してターミナルに出力しています。

「// Start〜」で、ここまでに書いた一連の処理を開始しています。

build.sbtは以下の通りです。

build.sbt
lazy val root = (project in file(".")).
settings(
name := "TwitterPopularTags",
version := "1.0",
scalaVersion := "2.10.4"
)
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.2.0",
"org.apache.spark" % "spark-streaming_2.10" % "1.2.0",
"org.apache.spark" % "spark-streaming-twitter_2.10" % "1.2.0"
)

3.実行結果

実行すると以下のようにターミナルに表示されます。

[info] Running Startup 
(中略)
15/02/15 15:38:56 INFO BlockManagerMaster: Registered BlockManager

====================== Start. ======================
15/02/15 15:38:57 INFO ReceiverTracker: ReceiverTracker started
(中略)
15/02/15 15:39:02 INFO DAGScheduler: Job 17 finished: foreachRDD at Startup.scala:37, took 0.015701 s

Popular topics in last 60 seconds (6 total):
#mom (1 tweets)
#earlgrey (1 tweets)
#여왕님과함께영광입니다 (1 tweets)
#따뜻따뜻 (1 tweets)
#smile (1 tweets)
#imsocold (1 tweets)
(中略)
15/02/15 15:39:02 INFO DAGScheduler: Stage 76 (foreachRDD at Startup.scala:43) finished in 0.011 s

Popular topics in last 10 seconds (6 total):
15/02/15 15:39:02 INFO DAGScheduler: Job 20 finished: foreachRDD at Startup.scala:43, took 0.017018 s
#mom (1 tweets)
#earlgrey (1 tweets)
#여왕님과함께영광입니다 (1 tweets)
#따뜻따뜻 (1 tweets)
#smile (1 tweets)
#imsocold (1 tweets)
(中略)
15/02/15 15:39:04 INFO DAGScheduler: Job 25 finished: foreachRDD at Startup.scala:37, took 0.019642 s

Popular topics in last 60 seconds (6 total):
#mom (1 tweets)
#earlgrey (1 tweets)
#여왕님과함께영광입니다 (1 tweets)
#따뜻따뜻 (1 tweets)
#smile (1 tweets)
#imsocold (1 tweets)
(中略)
15/02/15 15:39:04 INFO DAGScheduler: Stage 122 (foreachRDD at Startup.scala:43) finished in 0.008 s

Popular topics in last 10 seconds (6 total):
#mom (1 tweets)
#earlgrey (1 tweets)
#여왕님과함께영광입니다 (1 tweets)
#따뜻따뜻 (1 tweets)
#smile (1 tweets)
#imsocold (1 tweets)
(以降略)

2秒おきに、直近60秒と10秒の人気ハッシュタグを取得することができました。ポイントは「Start.」が最初に出力され、人気ハッシュタグを出力する「Popular topics・・・」は後に繰り返し出力されているところです。Apache Sparkでは「ssc.start()」を呼び出すことで初めて処理を開始し、DStreamは指定した間隔(今回は2秒)毎にTwitterのストリームよりハッシュタグを取得していることで、このように表示されます。

まとめ

ストリームよりデータを取得し、RDDに格納して、以降はScalaの構文で弄くる、といった流れでストリーミングデータ処理が行える点が非常に興味深いと思いました。また今回はローカルでしか動かしていませんが、ビッグデータの解析のために複数マシン上で分散処理させることも可能です(というか、そっちが本命)。今回の例は、何かのイベントに向けてTwitter上でのツイートをリアルタイムに分析するシステムに応用できるのではないかと感じました。引き続き、Apache Sparkについては見ていきたいと思います。

参考サイト

これまでにも適時挙げてきましたが、改めて参考サイトを挙げておきます。 TwitterPopularTags.scala
Spark Streaming Programming Guide
RDD