ちょっと話題の記事

はじめてのEMR/fluentdでS3にアップロードしたログをElastic MapReduceで検索・集計する

2013.09.12

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

今回解決したい課題

こんにちは植木和樹です。本日の課題はタイトルの通り「fluentdでS3のバケットにアップロードしたログを検索・集計する」です。EC2でサーバを構築した場合、インスタンスがTerminateした後もログが参照できるようfluentdを使ってS3にファイルをアップロードしておくのがAWSのベストプラクティスです。

Amazon Recommends Fluentd as “Best Practice for Data Collection” over Flume and Scribe

しかしS3にアップロードしたログファイルはツールを用いなければアクセスすることができず、このままでは容易に検索できません。EC2からS3をs3fsでマウントしてgrepという方法はありますが、遅すぎて実用的とはいえません。s3cmdでいったんローカルにファイルをダウンロードしてから検索するという方法もありますが、ダウンロードの手間がかかってしまいます。

そこで今回はAmazon EMR(Elastic MapReduce)使ってログを検索・集計する方法を試してみました。ちなみ筆者はEMRを使うのは今回が初めてです。EMRについての技術的な解説はできませんので、行った手順だけご紹介したいと思います。

準備するもの

  • AWSアカウント
  • EC2のsshログインに使うキーペア
  • fluentdを使ってS3のバケットにいくつかのログファイルが保存されていること(扱うのはApacheのアクセスログ)

なおS3には/<bucknet>/yyyy/mm/dd

というパスで、1日ごとにフォルダをわけてログが保存されているものとします。1日ごとのフォルダには、fluentdがログを書き込むごとにファイルが出力されるため複数のファイルがあります。

EMRのクラスターを作成する

まずはEMRのクラスターを作成します。マネージメントコンソールのダッシュボードから「Elastic MapReduce」を選択しEMRの画面を表示します。

20130905_emr_001

[Create New Job Flow]をクリックして新しいジョブフローを作成します。

20130905_emr_002

[Job Flow Name]を入力し[Run your own application]を選択します。アプリケーションのタイプは[Hive Program]を選択します。

20130905_emr_003

今回はバッチモードでなく、コマンドを入力しながらの対話モードで実行するので[Start an Interactive Hive Session]を選択します。

20130905_emr_004

EMRのクラスター全体をコントロールする役目をもつ"Master Instance"と実際に処理を行う"Core Instance"のインスタンスタイプと数を入力します。今回はお試しなのでインスタンスタイプはm1.small、コアインスタンスの数は1にします。

20130905_emr_005

キーペアを選択します。このキーでEMRのマスターインスタンスにsshでログインすることになります。次にVPCのサブネットを選択しますが、PublicDNSで直接ログインしたいため今回はVPCは使いません。念のためデバッグログを有効にし”Termination Protection"や"Visible To All IAM Users"をチェックしておきました。

20130905_emr_007

クラスター構築時の処理は特に必要ないので[Proceed with no Bootstrap Actions]を選択します。

20130905_emr_008

[Create Job Flow]ボタンをクリックしてEMRクラスターを作成します。

20130905_emr_009

[Close]ボタンをクリックしてジョブフローの作成を完了します。これでEMRのクラスターの構築が始まりますが、5〜10分程度時間がかかるので、しばらく待ちます。

20130905_emr_010

EMRの画面に戻り、一覧に表示されたジョブフローのステータスが"STARTING"から"RUNNING"に変わったら準備完了です。

20130905_emr_011

作成したジョブフローを選択するとマスターノードのエンドポイントが表示されます。このホスト名にsshで接続するとEMRのマスターノードにログインし、Hiveコマンドでの操作ができるようになります。sshでの接続はec2-userではなくhadoopユーザーになります。

20130905_emr_012

ssh hadoop@ec2-xxx-xxx-xxx-xxx.ap-northeast-1.compute.amazonaws.com
$ hive

HiveコマンドでS3のログをテーブルにマッピングする

以降の作業は「EMRってなんじゃ?(ログ、ゆりかごから墓場まで)」を参考にさせていただきました。

Hiveを使うと処理対象のログをSQLのようなクエリーで操作できます。

まずはS3のログをテーブルにマッピングします。hiveプロンプトで以下のコマンドを実行します。S3に格納されたログファイルは1日ごとにフォルダが分かれていますが、クエリーするときには、それらをまとめて扱いたいと思います。そこでテーブルは1つにし、パーティションという形で1日ごとのフォルダを対応付けることで実現しています。

なおfluentdで出力されるApacheのアクセスログはタイムスタンプ < TAB > タグ < TAB > JSONというようにタブで分割された3つのフィールドになっています。

hive> 
CREATE EXTERNAL TABLE IF NOT EXISTS fluentLog (dt string, tag string, json string)
PARTITIONED BY ( PT STRING )
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n';

ALTER TABLE fluentLog ADD PARTITION ( pt='2013-08-28' ) LOCATION 's3://my-fluentd-test/2013/08/28';
ALTER TABLE fluentLog ADD PARTITION ( pt='2013-08-29' ) LOCATION 's3://my-fluentd-test/2013/08/29';
ALTER TABLE fluentLog ADD PARTITION ( pt='2013-08-30' ) LOCATION 's3://my-fluentd-test/2013/08/30';
ALTER TABLE fluentLog ADD PARTITION ( pt='2013-08-31' ) LOCATION 's3://my-fluentd-test/2013/08/31';

hive> show tables;
OK
fluentlog
Time taken: 10.91 seconds, Fetched: 2 row(s)

hive> desc fluentlog;
OK
dt                      string                  None
tag                     string                  None
json                    string                  None
pt                      string                  None

# Partition Information
# col_name              data_type               comment

pt                      string                  None
Time taken: 0.534 seconds, Fetched: 9 row(s)

hive> show partitions fluentlog;
OK
pt=2013-08-28
pt=2013-08-29
pt=2013-08-30
pt=2013-08-31
Time taken: 0.445 seconds, Fetched: 5 row(s)

年月日でわかれたS3上のフォルダをパーティションに対応させることができました。これで日付をまたいだ検索もできますし、この後にご紹介するようパーティション(年月日)を指定して検索することもできます。

Hiveコマンドでログを検索する

Hive QLというSQLによく似たクエリーでログファイルを検索することができます。まずはCSVを検索してみましょう。次のクエリーを実行します。

SELECT
  unix_timestamp(), dt, host, user, method, path, code, size, referer, CONCAT('"', agent ,'"') 
FROM
  fluentLog LATERAL VIEW
    json_tuple(fluentLog.json, 'host', 'user', 'method', 'path', 'code', 'size', 'referer', 'agent') j
      AS host, user, method, path, code, size, referer, agent
WHERE agent LIKE '%AppleWebKit%'
ORDER BY dt;

SQLでは見かけない「LATERAL VIEW」と「json_tuple」キーワードが出てきています。LATERAL VIEWはユーザー関数の実行結果をビューとして扱うためのキーワードで、今回の場合はjson_tupleで展開されたJSONをビューとして参照できます。json_tupleはfluentdテーブルのjson列に格納されたJSON文字列をパースして値を返す関数です。つまりこのクエリーではカラムに含まれたJSONを分解して、あたかも事前定義されたテーブルのように扱っています。

Hiveコマンドでログを検索し結果をS3のファイルへ出力する

検索した結果は画面に表示されますが、「出力テーブル」を定義することでS3に保存することができます。まずは下記を実行してarchivelog_201308テーブルをS3上のフォルダs3://my-fluentd-emr/archives/2013/08に紐付けます。

CREATE EXTERNAL TABLE IF NOT EXISTS archivelog_201308 (
  version bigint,
  dt string,
  host string,
  user string,
  method string,
  path string,
  code string,
  size string,
  referer string,
  agent string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
LOCATION 's3://my-fluentd-emr/archives/2013/08';

次に先ほどのSELECTクエリーを実行しますが、SELECT文の前にINSERT句を加えることで実行結果をテーブルに出力します。

INSERT OVERWRITE TABLE archivelog_201308
SELECT
  unix_timestamp(), dt, host, user, method, path, code, size, referer, agent
FROM fluentLog LATERAL VIEW
  json_tuple(fluentLog.json, 'host', 'user', 'method', 'path', 'code', 'size', 'referer', 'agent') j
    AS host, user, method, path, code, size, referer, agent
WHERE pt >= '2013-08-01' AND pt <= '2013-08-31'
ORDER BY dt;

Hiveコマンドでログを集計し結果をS3のファイルへ出力する

集計もできます。Apacheのステータスコードごとに件数を集計してみましょう。まずは下記を実行してstatusSummary_201308テーブルをS3上のフォルダs3://my-fluentd-emr/summary/2013/08に紐付けます。

hive> CREATE EXTERNAL TABLE IF NOT EXISTS statusSummary_201308 (code string, count bigint)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
LOCATION 's3://my-fluentd-emr/summary/2013/08';

次に作成した出力テーブルにクエリーの実行結果をINSERTすると、S3にファイルが出力されます。

INSERT OVERWRITE TABLE statusSummary_201308
SELECT
  code,count(1)
FROM
  fluentLog LATERAL VIEW
    json_tuple(fluentLog.json, 'host', 'user', 'method', 'path', 'code', 'size', 'referer', 'agent') j
      AS host,user,method, path, code, size, referer, agent
WHERE pt >= '2013-08-28' AND pt <= '2013-08-31'
GROUP BY code;

S3に接続してs3://my-fluentd-emr/summary/2013/08に出力されたCSVファイルを見てみましょう。

\N,1
200,1
304,2
400,5
403,11
404,136

ちゃんと集計結果が出力されていますね!

まとめ

Elastic MapReduceとHiveを使ってfluentdのログを直接検索する方法を試してみました。EMRは初めて触ってみたのですが、Hiveを使うと見慣れたSQLを使ってデータ操作が行えるため、当初想像していたよりも簡単にMapReduceを経験することができました。ただHiveでS3のファイルを検索する方法は簡単は簡単なのですが、やはり直接S3のファイルを操作しているので遅いです。手軽にgrep間隔で使うにはコストが高すぎます。

やはりEMRは大量のデータを一斉に処理して別の形式に変換する目的で使うのが良さそうですね。最近ではAmazon RedshiftというデータウェアハウスにデータをインポートしてBIツールで検索するのが良さそうです。今度はそちらも試してみたいと思います。