Logstash + Snowflakeを活用してTwitterエゴサーチ分析基盤を作る

拝啓 上司様 私が業務時間中にTwitterを見ているのは仕事に活かすためでもあるのです 敬具
2021.02.08

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、データアナリティクス事業本部・新納(にいの)です。突然ですが皆さんはTwitterでエゴサーチ(自分の名前の検索)していますか?私はしていません。

エゴサーチとは無縁の暮らしをしていた私ですが、去る2021年2月1日、クラスメソッドが「Zenn」を買収したプレスリリースを発表したことで状況が変わります。

このニュースがTwitterでも話題となり、一時は「Zenn」「クラスメソッド」がトレンドにランクインしました。社員の中には「世間ではどんな反応をされているのかな...」とドキドキしながら検索ウィンドウに社名を入れた者もいるでしょう。私です。

社長や取締役を昼夜問わずタイムラインで見かけるくらいなので業務時間中にTwitterをして怒られるような会社ではありませんが、せっかくなので自動で継続的にTwitterをエゴサーチして分析してくれる環境を作ってみました。

こんな分析基盤を作りました

完成図はこちら。

  • EC2インスタンス上で実行しているLogstashでツイート取得・S3バケットへのアウトプットを行う
  • ツイート(JSON形式)をSnowflake上のデータベースに連携する
    • データパイプライン・SnowpipeでS3からSnowflake上のデータベースへ連携
    • ストリーム(Snowflakeの機能)でレコードに対する差分を検知
    • タスク(Snowflakeの機能)でJSONをパースして別テーブルに格納する処理を定期実行
  • Alteryx・Tableauを使ってデータ加工・可視化を行う

この構成の良いところは以下の通り。

  • ELT(Extract・Load・Transform)により、格納先のデータベース内でデータの変換処理を行える
  • プログラムを書かなくてもLogstashがツイートの取得からS3への出力まで対応してくれる
  • 半構造化データを強力にサポートするSnowflakeを使用することで、JSON形式のデータを簡単にテーブル定義できる
  • データパイプラインの作成がSnowflake内で完結できる(Snowpipe、ストリーム、タスク)

前提条件

今回の検証で使用した環境は以下の通り。

Logstashを実行するAmazon EC2

  • AMI: Amazon Linux 2 AMI (HVM), SSD Volume Type - ami-0992fc94ca0f1415a
    • インスタンスタイプ:t2.xlarge

Snowflake

  • Snowflake 5.3.1

その他

  • Tableau Desktop 2020.4
  • Alteryx Designer 2020.4

また、Twitter API利用のためにDeveloper Accountに申し込みを済ませる必要があります。

Logstashを実行する

まずはツイートを取得するための環境をAmazon EC2上に構築します。余談ですが、Windows10であればsshクライアントを使用しなくてもコマンドプロンプトでsshできることに今更ながら気が付きました。本当にありがとう。

Logstashのインストール

下記サイトの手順に従ってLogstashをインストールします。

公開署名キーをダウンロード・インストールします。

sudo rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

LogstashのリポジトリURLを追加するため、リポジトリのあるディレクトリへ移動します。

 cd /etc/yum.repos.d/

リポジトリを作成します。

sudo touch logstash.repo

作成したリポジトリをviコマンドで編集します。

 sudo vi logstash.repo

以下内容を追記し、保存します。

[logstash-7.x]
name=Elastic repository for 7.x packages
baseurl=https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md

Logstashをインストールします。

sudo yum install logstash

インストールが完了すると以下のように表示されます。

 Installed:
  logstash.x86_64 1:7.10.2-1

Complete!

プラグインのインストール

Logstashではデータソースに応じてインプット・アウトプットに使用するプラグインが提供されています。使用可能なプラグインは以下の通り。

Twitter input plugin

インプットにはTwitter input Pluginを使用します。Twitter Streaming APIを使用しており、ストリーム形式でリアルタイムにツイートが取得できます。(条件に合致するツイートがあるとどんどんデータが生まれる)

 sudo /usr/share/logstash/bin/logstash-plugin install logstash-input-twitter

S3 output plugin

アウトプット先に合わせてS3 output pluginを使用します。

 sudo /usr/share/logstash/bin/logstash-plugin install logstash-output-s3

設定ファイルの作成

設定ファイルにはインプットやアウトプット先の情報を記述します。保存先のディレクトリは/etc/logstash/conf.dで、ファイル名は任意のものに設定します。(今回はlogstash.confにしました)viコマンドで内容を編集します。

vi /etc/logstash/conf.d/logstash.conf

inputにはTwitterの、outputにはS3の認証情報を設定します。今回の検証で設定した項目は以下の通り。

Twitter

項目
consumer_key TwitterのDeveloper Portalから取得できるConsumer Key(必須)
consumer_secret 同上のConsumer Secret(必須)
oauth_token TwitterのDeveloper Portalから取得できるAccess token(必須)
oauth_token_secret 同上のAccess secret(必須)
keywords 検索キーワード(オプション)
full_tweet 完全なツイートの取得。デフォルトはfalse(オプション)

S3

アクセスキー、シークレットキーを指定しない場合はYAMLで記述した認証情報ファイルをaws_credentials_fileに指定する必要があります。

項目
access_key_id S3 PutObjectの権限を持つIAMユーザーのアクセスキー
secret_access_key 同上のシークレットキー
region S3バケットが配置されたリージョン(デフォルトはus-east-1
bucket S3バケット名(必須)
codec エンコードしたいファイル形式
size_file ファイルをローテーションするサイズ
time_file ファイルをローテーションする時間

設定内容の一例はこちら。

input {
 twitter {
  consumer_key => ""
  consumer_secret => ""
  oauth_token => ""
  oauth_token_secret => ""
  keywords => ["クラスメソッド","クラメソ"]
  full_tweet => true
 }
}
output {
  s3
   { access_key_id => ""
    secret_access_key => ""
    region => "ap-northeast-1"
    bucket => "cm-niino-logstash"
    codec => "json_lines"
    size_file => 2048
    time_file => 5
   }
}

Logstashの実行

やっとLogstashを実行できる環境が整いました。以下のコマンドを実行します。

sudo -u logstash /usr/share/logstash/bin/logstash --path.settings "/etc/logstash" -f /etc/logstash/conf.d/logstash.conf

[logstash.agent ] Successfully started Logstash API endpointといったメッセージが表示されれば無事に実行されています。

うまくデータが生成されれば、ターゲットに指定したS3バケットに以下のような命名のファイルが生成されます。

ls.s3.ip-10-228-27-95.2013-04-18T10.00.tag_hello.part0.txt

Snowflakeでデータパイプラインを作る

クラウド型データウェアハウスであるSnowflakeにはデータパイプラインを実現する機能群が提供されています。つまり、Snowflake側でデータパイプラインを構築しておけば、勝手にデータの生成を検知してSnowflakeにデータを引っ張ってきてくれて、勝手に差分を検知して勝手にデータを加工して、勝手にテーブルに格納しておいてくれます。本当にありがとう。

以下のデータパイプラインを作成します。

  • Snowpipe:連続データロード
  • ストリーム:変更データの追跡
  • タスク:定期的なタスク実行

以下のエントリの手順に沿って進めていきます。

Snowpipeを作成

S3バケットにデータが生成されると自動でSnowflake上のテーブルにデータを挿入してくれるSnowpipeを作成します。

Logstashのアウトプット先に指定しているS3バケットをSnowflakeの外部ステージとして設定します。GUI画面で作成する場合はデータベース→ステージから作成可能です。今回はLOGSTASHTESTというステージを作成しました。

詳しい設定方法は以下エントリをご参照ください。

まず、ツイートのデータを格納しておくテーブルを作成します。JSON形式のデータを蓄積させておくため、型にはVARIANTを指定しておきます。

CREATE TABLE "NIINO_TEST_PUBLIC"."TWITTER"."LOGSTASH_TWEETS"(
       record VARIANT
);

パイプを作成します。先ほど設定したS3バケットの外部テーブルLOGSTASHTESTから、今しがた作成したテーブルLOGSTASH_TESTへデータをJSON形式で自動コピーするよう設定するSQL文です。

CREATE PIPE "NIINO_TEST_PUBLIC"."TWITTER".LOGSTASH_PIPE AUTO_INGEST=TRUE AS
  COPY INTO "NIINO_TEST_PUBLIC"."TWITTER"."LOGSTASH_TWEETS"
  FROM @LOGSTASHTEST
  FILE_FORMAT = (TYPE = 'JSON')
 ;

外部テーブルのS3バケットにイベント通知設定をします。今回の検証ではS3バケット上のファイルにあらゆる変更があった場合にAmazon SQSキューへイベント通知を送ります。

以下SQL文を実行し、ARN(Amazonリソースネーム)を取得します。notification_channelに記載されたarn:aws:sqsから始まる文字列をコピーしておきます。

SHOW PIPES;

AWSマネジメントコンソールに移動し、S3バケットの画面へ移動してイベント通知設定を行います。バケット→プロパティ→イベント通知から「イベント通知を作成」へ遷移します。

任意のイベント名を、特定のフォルダ内にデータが連携されるように設定している場合はプレフィックスにフォルダ名を入力します。

イベントタイプでは「すべてのオブジェクト作成イベント」を選択します。

送信先にSQSキューを選択し、「SQSキューARNを入力」を選択して先ほどコピーしたARNを入力します。

すでにS3バケットにデータが生成されている場合は以下のSQL文を実行します。ALTER PIPE REFRESH文を実行すると過去7日間にステージングされたデータファイルを取り込みます。7日以上前にステージングされている場合はCOPY INTOを実行する必要があります。

ALTER PIPE LOGSTASH_PIPE REFRESH;

データがテーブルに取り込まれていればSnowpipeの準備は完了です。

ストリームを作成

LOGSTASH_TWEETSテーブルに差分があれば検出するようなストリームを以下のSQL文で作成します。

USE DATABASE "NIINO_TEST_PUBLIC";
USE SCHEMA TWITTER;
CREATE OR REPLACE STREAM TWITTER_STREAM ON TABLE LOGSTASH_TWEETS

;

タスクを作成

LOGSTASH_TWEETSテーブルに蓄積されたJSON形式のデータをパースして分析しやすい形に整形する処理を、タスクを作成することで定期的に実行させます。

今回はParse_logstash_tweetsという名前のタスクを作成し、JSON形式のデータをパースしてPARSED_TWEETSテーブルにインサートする処理を1分ごとに実行します。

まずは格納させる先のPARSED_TWEETSテーブルを作成しておきます。

USE DATABASE "NIINO_TEST_PUBLIC";
USE SCHEMA TWITTER;
CREATE OR REPLACE TABLE PARSED_TWEETS (
  timestamp_ms timestamp,
  retweet_count number,
  favorite_count number,
  quote_count number,
  reply_count number,
  hashtags varchar(280),
  text varchar(280),
  source varchar(280),
  user_screen_name varchar(280),
  user_name varchar(280),
  user_location varchar(280),
  user_description varchar(280)
 );

次にタスクを作成します。

USE DATABASE "NIINO_TEST_PUBLIC";
USE SCHEMA TWITTER;
CREATE OR REPLACE TASK Parse_Logstash_tweets
 WAREHOUSE = X_SMALL_WH
 SCHEDULE = '1 minute'
WHEN
 SYSTEM$STREAM_HAS_DATA('TWITTER_STREAM')
AS
 INSERT INTO PARSED_TWEETS
 SELECT
   record:timestamp_ms::timestamp as timestamp_ms,
   record:retweet_count::number as retweet_count,
   record:favorite_count::number as favorite_count,
   record:quote_count::number as quote_count,
   record:reply_count::number as reply_count,
   record:entities.hashtags::varchar as hashtags,
   record:text::varchar as text,
   record:source::varchar as source,
   record:user.screen_name::varchar as user_screen_name,
   record:user.name::varchar as user_name,
   record:user.location::varchar as user_location,
   record:user.description::varchar as user_description
 from Logstash_tweets
;

タスクの実行状況は以下SQL文で確認可能です。

SELECT *
 FROM TABLE(information_schema.task_history(
    scheduled_time_range_start=>dateadd('hour',-1,current_timestamp()),
    result_limit => 100,
    task_name=>'Parse_Logstash_tweets'));

PARSED_TWEETSを確認するとパースされたデータが格納されています。

可視化する

分析の一番楽しいところ、可視化です。今回はデータ加工にAlteryxを、可視化にTableauを使用しました。

まずはAlteryxでツイート内容を形態素解析し、文章を単語に分解します。形態素解析のためのツールは以下エントリを参考にインストールしてください。

PARSED_TWEETSテーブルにAlteryx Desktopで接続し、RecordIDツールでそれぞれのレコードに連番を振ったあと、Janome Tokenizerツールを実行すると単語に分解されます。今回はhyperファイルに出力してTableauで取り込みするようにしました。

実行すると以下のように単語に分解されています。

AlteryxからSnowflakeへの接続は以下エントリをご参照ください。

次にTableau DesktopでSnowflake上のPARSED_TWEETSテーブルとAlteryxで生成したhyperファイルに接続します。可視化した内容は以下の通り。

  • 形態素解析した結果を使ってワードクラウドを生成
  • どのユーザーが「クラスメソッド」「クラメソ」を含むツイートをしているのかカウントする
  • 日別でツイートをカウントする

今回はTableau Desktopで作成しましたが、データソースをライブ抽出に設定してTableau Server/Onlineに出力すればデータが更新されるたびにダッシュボードも更新されます。

まとめ

Snowflakeをフル活用してTwitterをエゴサーチした結果を分析に使うの巻でした。今回はLogstashを実行している時間も短く、取得できたツイートも少なかったのであんまり映える結果になりませんでしたが、おおむねニュースやプレスリリースがよくシェアされているようです。 今回は可視化するだけに留めていますが、Amazon Comprehendなどの感情分析にかけるのも面白そうですね。そんな内容も以下のウェビナーでやっていました。

この記事がどなたかのTwitter分析ライフのお役に立てば幸いです。

参考資料

記事中でURLを貼ったもの以外の参考資料は以下の通りです。