ELB のアクセスログを Parquet 形式に変換して、Amazon Athena からクエリする #reinvent

こんにちは、藤本です。

以前、Amazon AthenaでELBログをSQLで解析する #reinventで ELB のアクセスログを Amazon Athena からクエリする方法をにしざわがご紹介しました。

Amazon Athena はクエリ先のデータサイズや、データの形式、パーティションの構成などでパフォーマンス、コスト効率が変わります。

そこで Amazon Athena はパフォーマンス、コスト効率が良いカラムナ指向フォーマットの Parquet 形式をサポートしています。Parquet に関してはAmazon Athena: カラムナフォーマット『Parquet』でクエリを試してみた #reinventをご参照ください。

パフォーマンスに関してはクエリ方法によって変わりますが、料金に関わるスキャンサイズに関しては多くのケースで大きく削減が確認されています。

AWS が ELB のアクセスログで試したケースでは下記のような差異を確認しています。

データセット Amazon S3上のサイズ クエリ実行時間 スキャンされたデータ コスト
テキストファイルで保存されたデータ 1 TB 236 秒 1.15 TB $5.75
Apache Parquet形式で保存されたデータ 130 GB 6.78 秒 2.51 GB $0.013
削減 / スピードアップ Parquetで87%削減 34倍高速 99%減のデータしかスキャンされない 99.7%削減

そこで今回は下記ブログで紹介されている ELB のアクセスログを Parquet 形式に変換する手順を試してみました。

S3のデータをAmazon Athenaを使って分析する

試してみた

今回の流れは以下のようになります。

  • EMR クラスタ立ち上げ
  • スクリプトダウンロード
  • データ変換
  • 変換したデータを S3 へ転送
  • Amazon Athena からクエリ

EMR クラスタ立ち上げ

データ変換するスクリプトを実行する EMR クラスタを立ち上げます。

マネジメントコンソールから EMR の画面を遷移します。

Create cluster から立ち上げる EMR クラスタを設定します。

EMR_–_AWS_Console 8

Go to advanced options から詳細設定します。

EMR_–_AWS_Console

今回は Spark を利用するので Spark をチェックします。

EMR_–_AWS_Console_2

インスタンスは Core node に r3.8xlarge を 4台利用しました。コスト節約でスポットインスタンスにしました。

EMR_–_AWS_Console_7

オプションはそのまま

EMR_–_AWS_Console_4

Master Node でコマンド実行するので、key pair を設定します。

EMR_–_AWS_Console_5

10分ぐらいで起動しました。

EMR_–_AWS_Console

スクリプトダウンロード

マスターノードへデータ変換するスクリプトをダウンロードします。

EMR クラスタが立ち上がったら、マスターノードへ SSH 接続します。

$ ssh -i <<private key>> hadoop@<<master node ip address>>

スクリプトは Github に公開されていますので、git コマンドでダウンロードします。

awslabs/aws-big-data-blog

$ sudo yum install git -y
Loaded plugins: priorities, update-motd, upgrade-helper
:
Installed:
  git.x86_64 0:2.7.4-1.47.amzn1

Dependency Installed:
  perl-Error.noarch 1:0.17020-2.9.amzn1                             perl-Git.noarch 0:2.7.4-1.47.amzn1                             perl-TermReadKey.x86_64 0:2.30-20.9.amzn1

Complete!

$ git clone https://github.com/awslabs/aws-big-data-blog.git
Cloning into 'aws-big-data-blog'...
remote: Counting objects: 3190, done.
remote: Compressing objects: 100% (10/10), done.
remote: Total 3190 (delta 2), reused 0 (delta 0), pack-reused 3180
Receiving objects: 100% (3190/3190), 30.83 MiB | 21.08 MiB/s, done.
Resolving deltas: 100% (826/826), done.
Checking connectivity... done.

$ cd aws-big-data-blog/aws-blog-spark-parquet-conversion/
$ tree .
.
├── addpartitions.gvy
├── addpartitions.hql
├── convert2parquet.py
├── createtable.hql
└── README.md

0 directories, 5 files
  • createtable.hql : Hive のテーブル作成
  • addpartitions.hql : テーブルにパーティション設定
  • convert2parquet.py : データを取得し、Parquet 形式で出力する Spark スクリプト

データ変換

ダウンロードしたスクリプトを利用して ELB のアクセスログを Parquet 形式へデータ変換し、HDFS へ保存します。

今回はサンプルで用意されている ELB のアクセスログを利用します。
まずは HiveQL のスクリプトcreatetable.hqladdpartitions.hql を実行します。自身の環境で利用する場合はそれぞれの環境に合わせて hql ファイルを修正してください。弊社の Amazon Athena のブログをいくつか読めば、修正は簡単です。

テーブル作成
$ hive -f createtable.hql

Logging initialized using configuration in file:/etc/hive/conf.dist/hive-log4j2.properties Async: false
OK
Time taken: 2.355 seconds
OK
Time taken: 4.562 seconds
パーティション作成
$ hive -f addpartitions.hql

Logging initialized using configuration in file:/etc/hive/conf.dist/hive-log4j2.properties Async: false
OK
Time taken: 4.217 seconds
OK
Time taken: 0.3 seconds
:
OK
Time taken: 0.394 seconds
OK
Time taken: 0.24 seconds
データ変換

ELB のアクセスログを Python 実装の Sparkアプリケーションにより Parquet 形式に変換します。

$ spark-submit  --num-executors 85  --executor-memory 5g convert2parquet.py
16/12/27 15:49:34 INFO SparkContext: Running Spark version 2.0.2
16/12/27 15:49:35 INFO SecurityManager: Changing view acls to: hadoop
16/12/27 15:49:35 INFO SecurityManager: Changing modify acls to: hadoop
16/12/27 15:49:35 INFO SecurityManager: Changing view acls groups to:
16/12/27 15:49:35 INFO SecurityManager: Changing modify acls groups to:
16/12/27 15:49:35 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(hadoop); groups with view permissions: Set(); users  with modify permissions: Set(hadoop); groups with modify permissions: Set()
16/12/27 15:49:35 INFO Utils: Successfully started service 'sparkDriver' on port 33278.

:

16/12/27 16:37:40 INFO SchedulerExtensionServices: Stopping SchedulerExtensionServices
(serviceOption=None,
 services=List(),
 started=false)
16/12/27 16:37:40 INFO YarnClientSchedulerBackend: Stopped
16/12/27 16:37:40 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/12/27 16:37:40 INFO MemoryStore: MemoryStore cleared
16/12/27 16:37:40 INFO BlockManager: BlockManager stopped
16/12/27 16:37:40 INFO BlockManagerMaster: BlockManagerMaster stopped
16/12/27 16:37:40 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/12/27 16:37:40 INFO SparkContext: Successfully stopped SparkContext
16/12/27 16:37:40 INFO ShutdownHookManager: Shutdown hook called
16/12/27 16:37:40 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-6807a65e-8e69-42d7-bafe-a272b87394a3
16/12/27 16:37:40 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-6807a65e-8e69-42d7-bafe-a272b87394a3/pyspark-bcdb1647-2213-4f14-b987-30c12c4fee8a

1時間弱で完了しました。

変換したデータを S3 へ転送

Apache DistCp の拡張コマンドs3-dist-cpを利用して、HDFS に置かれた変換したデータを S3 へ転送します。

$ s3-dist-cp --src="hdfs:///user/hadoop/elblogs_pq" --dest="s3://<<bucket name>>/<<key prefix>>/"
16/12/27 22:58:40 INFO s3distcp.S3DistCp: Running with args: -libjars /usr/share/aws/emr/s3-dist-cp/lib/guava-15.0.jar,/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp-2.4.0.jar,/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar --src=hdfs:///user/hadoop/elblogs_pq --dest=s3://<<bucket name>>/
16/12/27 22:58:41 INFO s3distcp.S3DistCp: S3DistCp args: --src=hdfs:///user/hadoop/elblogs_pq --dest=s3://<<bucket name>>/
16/12/27 22:58:41 INFO s3distcp.S3DistCp: Using output path 'hdfs:/tmp/a5dbe6ce-6168-4f7a-a555-5d9473eed13c/output'

:

16/12/27 23:00:51 INFO mapreduce.Job: Job job_1482853503455_0004 completed successfully
16/12/27 23:00:51 INFO mapreduce.Job: Counters: 54
    File System Counters
        FILE: Number of bytes read=39055
        FILE: Number of bytes written=12609556
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=139824014636
        HDFS: Number of bytes written=0
        HDFS: Number of read operations=655
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=190
        S3: Number of bytes read=0
        S3: Number of bytes written=139823896747
        S3: Number of read operations=0
        S3: Number of large read operations=0
        S3: Number of write operations=0
    Job Counters
        Launched map tasks=1
        Launched reduce tasks=95
        Rack-local map tasks=1
        Total time spent by all maps in occupied slots (ms)=308570
        Total time spent by all reduces in occupied slots (ms)=1261182348
        Total time spent by all map tasks (ms)=2615
        Total time spent by all reduce tasks (ms)=5343993
        Total vcore-milliseconds taken by all map tasks=2615
        Total vcore-milliseconds taken by all reduce tasks=5343993
        Total megabyte-milliseconds taken by all map tasks=9874240
        Total megabyte-milliseconds taken by all reduce tasks=40357835136
    Map-Reduce Framework
        Map input records=366
        Map output records=366
        Map output bytes=152740
        Map output materialized bytes=38675
        Input split bytes=155
        Combine input records=0
        Combine output records=0
        Reduce input groups=366
        Reduce shuffle bytes=38675
        Reduce input records=366
        Reduce output records=0
        Spilled Records=732
        Shuffled Maps =95
        Failed Shuffles=0
        Merged Map outputs=95
        GC time elapsed (ms)=40870
        CPU time spent (ms)=4191750
        Physical memory (bytes) snapshot=111239815168
        Virtual memory (bytes) snapshot=825331716096
        Total committed heap usage (bytes)=231069974528
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters
        Bytes Read=117734
    File Output Format Counters
        Bytes Written=0
16/12/27 23:00:51 INFO s3distcp.S3DistCp: Try to recursively delete hdfs:/tmp/a5dbe6ce-6168-4f7a-a555-5d9473eed13c/tempspace

3分ほどで完了です。

ちなみにデータサイズを比較したところ、12% ぐらいになっていました。

  • 元データサイズ: 1.14 TB
  • 変換後データサイズ: 0.14 TB

Amazon Athena からクエリ

S3 へアップロードした Parquet 形式の ELB アクセスログへ Amazon Athena からクエリしてみましょう。

Athena のテーブルを作成します。データベースは default を利用しています。

CREATE EXTERNAL TABLE IF NOT EXISTS elb_logs_pq (
  request_timestamp string,
  elb_name string,
  request_ip string,
  request_port int,
  backend_ip string,
  backend_port int,
  request_processing_time double,
  backend_processing_time double,
  client_response_time double,
  elb_response_code string,
  backend_response_code string,
  received_bytes bigint,
  sent_bytes bigint,
  request_verb string,
  url string,
  protocol string,
  user_agent string,
  ssl_cipher string,
  ssl_protocol string )
PARTITIONED BY(year int, month int, day int) 
STORED AS PARQUET
LOCATION 's3://<<bucket name>>/'
tblproperties ("parquet.compress"="SNAPPY");

Athena

続いて、パーティションを設定します。今回のスクリプトでは year=xxxx/month=xx/day=xx という形式のパスとなっているので、パーティションを自動生成できます。

msck repair table elb_logs_pq

Athena 2

パーティションが生成されました。

それではクエリしてみましょう。ステータスコード毎のデータ件数を集計します。

Athena 3

圧縮ファイル、38億レコードの集計が 8秒強で返ってきました!驚異的ですね!

まとめ

Amazon Athena は使い方によって、コスト効率、パフォーマンスが大きく変わります。月1,2回クエリ頻度であれば、データ変換するコストの方がかかるかもしれません。QuickSight で色々な可視化をするようであれば、何度もクエリが発生すると思いますので、今回ご紹介したような方法でコスト効率を向上させることができるかと思います。