この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
こんにちは、藤本です。
以前、Amazon AthenaでELBログをSQLで解析する #reinventで ELB のアクセスログを Amazon Athena からクエリする方法をにしざわがご紹介しました。
Amazon Athena はクエリ先のデータサイズや、データの形式、パーティションの構成などでパフォーマンス、コスト効率が変わります。
そこで Amazon Athena はパフォーマンス、コスト効率が良いカラムナ指向フォーマットの Parquet 形式をサポートしています。Parquet に関してはAmazon Athena: カラムナフォーマット『Parquet』でクエリを試してみた #reinventをご参照ください。
パフォーマンスに関してはクエリ方法によって変わりますが、料金に関わるスキャンサイズに関しては多くのケースで大きく削減が確認されています。
AWS が ELB のアクセスログで試したケースでは下記のような差異を確認しています。
データセット | Amazon S3上のサイズ | クエリ実行時間 | スキャンされたデータ | コスト |
---|---|---|---|---|
テキストファイルで保存されたデータ | 1 TB | 236 秒 | 1.15 TB | $5.75 |
Apache Parquet形式で保存されたデータ | 130 GB | 6.78 秒 | 2.51 GB | $0.013 |
削減 / スピードアップ | Parquetで87%削減 | 34倍高速 | 99%減のデータしかスキャンされない | 99.7%削減 |
そこで今回は下記ブログで紹介されている ELB のアクセスログを Parquet 形式に変換する手順を試してみました。
試してみた
今回の流れは以下のようになります。
- EMR クラスタ立ち上げ
- スクリプトダウンロード
- データ変換
- 変換したデータを S3 へ転送
- Amazon Athena からクエリ
EMR クラスタ立ち上げ
データ変換するスクリプトを実行する EMR クラスタを立ち上げます。
マネジメントコンソールから EMR の画面を遷移します。
Create cluster から立ち上げる EMR クラスタを設定します。
Go to advanced options から詳細設定します。
今回は Spark を利用するので Spark をチェックします。
インスタンスは Core node に r3.8xlarge を 4台利用しました。コスト節約でスポットインスタンスにしました。
オプションはそのまま
Master Node でコマンド実行するので、key pair を設定します。
10分ぐらいで起動しました。
スクリプトダウンロード
マスターノードへデータ変換するスクリプトをダウンロードします。
EMR クラスタが立ち上がったら、マスターノードへ SSH 接続します。
$ ssh -i <<private key>> hadoop@<<master node ip address>>
スクリプトは Github に公開されていますので、git コマンドでダウンロードします。
$ sudo yum install git -y
Loaded plugins: priorities, update-motd, upgrade-helper
:
Installed:
git.x86_64 0:2.7.4-1.47.amzn1
Dependency Installed:
perl-Error.noarch 1:0.17020-2.9.amzn1 perl-Git.noarch 0:2.7.4-1.47.amzn1 perl-TermReadKey.x86_64 0:2.30-20.9.amzn1
Complete!
$ git clone https://github.com/awslabs/aws-big-data-blog.git
Cloning into 'aws-big-data-blog'...
remote: Counting objects: 3190, done.
remote: Compressing objects: 100% (10/10), done.
remote: Total 3190 (delta 2), reused 0 (delta 0), pack-reused 3180
Receiving objects: 100% (3190/3190), 30.83 MiB | 21.08 MiB/s, done.
Resolving deltas: 100% (826/826), done.
Checking connectivity... done.
$ cd aws-big-data-blog/aws-blog-spark-parquet-conversion/
$ tree .
.
├── addpartitions.gvy
├── addpartitions.hql
├── convert2parquet.py
├── createtable.hql
└── README.md
0 directories, 5 files
- createtable.hql : Hive のテーブル作成
- addpartitions.hql : テーブルにパーティション設定
- convert2parquet.py : データを取得し、Parquet 形式で出力する Spark スクリプト
データ変換
ダウンロードしたスクリプトを利用して ELB のアクセスログを Parquet 形式へデータ変換し、HDFS へ保存します。
今回はサンプルで用意されている ELB のアクセスログを利用します。
まずは HiveQL のスクリプトcreatetable.hql
、addpartitions.hql
を実行します。自身の環境で利用する場合はそれぞれの環境に合わせて hql ファイルを修正してください。弊社の Amazon Athena のブログをいくつか読めば、修正は簡単です。
テーブル作成
$ hive -f createtable.hql
Logging initialized using configuration in file:/etc/hive/conf.dist/hive-log4j2.properties Async: false
OK
Time taken: 2.355 seconds
OK
Time taken: 4.562 seconds
パーティション作成
$ hive -f addpartitions.hql
Logging initialized using configuration in file:/etc/hive/conf.dist/hive-log4j2.properties Async: false
OK
Time taken: 4.217 seconds
OK
Time taken: 0.3 seconds
:
OK
Time taken: 0.394 seconds
OK
Time taken: 0.24 seconds
データ変換
ELB のアクセスログを Python 実装の Sparkアプリケーションにより Parquet 形式に変換します。
$ spark-submit --num-executors 85 --executor-memory 5g convert2parquet.py
16/12/27 15:49:34 INFO SparkContext: Running Spark version 2.0.2
16/12/27 15:49:35 INFO SecurityManager: Changing view acls to: hadoop
16/12/27 15:49:35 INFO SecurityManager: Changing modify acls to: hadoop
16/12/27 15:49:35 INFO SecurityManager: Changing view acls groups to:
16/12/27 15:49:35 INFO SecurityManager: Changing modify acls groups to:
16/12/27 15:49:35 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); groups with view permissions: Set(); users with modify permissions: Set(hadoop); groups with modify permissions: Set()
16/12/27 15:49:35 INFO Utils: Successfully started service 'sparkDriver' on port 33278.
:
16/12/27 16:37:40 INFO SchedulerExtensionServices: Stopping SchedulerExtensionServices
(serviceOption=None,
services=List(),
started=false)
16/12/27 16:37:40 INFO YarnClientSchedulerBackend: Stopped
16/12/27 16:37:40 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/12/27 16:37:40 INFO MemoryStore: MemoryStore cleared
16/12/27 16:37:40 INFO BlockManager: BlockManager stopped
16/12/27 16:37:40 INFO BlockManagerMaster: BlockManagerMaster stopped
16/12/27 16:37:40 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/12/27 16:37:40 INFO SparkContext: Successfully stopped SparkContext
16/12/27 16:37:40 INFO ShutdownHookManager: Shutdown hook called
16/12/27 16:37:40 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-6807a65e-8e69-42d7-bafe-a272b87394a3
16/12/27 16:37:40 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-6807a65e-8e69-42d7-bafe-a272b87394a3/pyspark-bcdb1647-2213-4f14-b987-30c12c4fee8a
1時間弱で完了しました。
変換したデータを S3 へ転送
Apache DistCp の拡張コマンドs3-dist-cp
を利用して、HDFS に置かれた変換したデータを S3 へ転送します。
$ s3-dist-cp --src="hdfs:///user/hadoop/elblogs_pq" --dest="s3://<<bucket name>>/<<key prefix>>/"
16/12/27 22:58:40 INFO s3distcp.S3DistCp: Running with args: -libjars /usr/share/aws/emr/s3-dist-cp/lib/guava-15.0.jar,/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp-2.4.0.jar,/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar --src=hdfs:///user/hadoop/elblogs_pq --dest=s3://<<bucket name>>/
16/12/27 22:58:41 INFO s3distcp.S3DistCp: S3DistCp args: --src=hdfs:///user/hadoop/elblogs_pq --dest=s3://<<bucket name>>/
16/12/27 22:58:41 INFO s3distcp.S3DistCp: Using output path 'hdfs:/tmp/a5dbe6ce-6168-4f7a-a555-5d9473eed13c/output'
:
16/12/27 23:00:51 INFO mapreduce.Job: Job job_1482853503455_0004 completed successfully
16/12/27 23:00:51 INFO mapreduce.Job: Counters: 54
File System Counters
FILE: Number of bytes read=39055
FILE: Number of bytes written=12609556
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=139824014636
HDFS: Number of bytes written=0
HDFS: Number of read operations=655
HDFS: Number of large read operations=0
HDFS: Number of write operations=190
S3: Number of bytes read=0
S3: Number of bytes written=139823896747
S3: Number of read operations=0
S3: Number of large read operations=0
S3: Number of write operations=0
Job Counters
Launched map tasks=1
Launched reduce tasks=95
Rack-local map tasks=1
Total time spent by all maps in occupied slots (ms)=308570
Total time spent by all reduces in occupied slots (ms)=1261182348
Total time spent by all map tasks (ms)=2615
Total time spent by all reduce tasks (ms)=5343993
Total vcore-milliseconds taken by all map tasks=2615
Total vcore-milliseconds taken by all reduce tasks=5343993
Total megabyte-milliseconds taken by all map tasks=9874240
Total megabyte-milliseconds taken by all reduce tasks=40357835136
Map-Reduce Framework
Map input records=366
Map output records=366
Map output bytes=152740
Map output materialized bytes=38675
Input split bytes=155
Combine input records=0
Combine output records=0
Reduce input groups=366
Reduce shuffle bytes=38675
Reduce input records=366
Reduce output records=0
Spilled Records=732
Shuffled Maps =95
Failed Shuffles=0
Merged Map outputs=95
GC time elapsed (ms)=40870
CPU time spent (ms)=4191750
Physical memory (bytes) snapshot=111239815168
Virtual memory (bytes) snapshot=825331716096
Total committed heap usage (bytes)=231069974528
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=117734
File Output Format Counters
Bytes Written=0
16/12/27 23:00:51 INFO s3distcp.S3DistCp: Try to recursively delete hdfs:/tmp/a5dbe6ce-6168-4f7a-a555-5d9473eed13c/tempspace
3分ほどで完了です。
ちなみにデータサイズを比較したところ、12% ぐらいになっていました。
- 元データサイズ: 1.14 TB
- 変換後データサイズ: 0.14 TB
Amazon Athena からクエリ
S3 へアップロードした Parquet 形式の ELB アクセスログへ Amazon Athena からクエリしてみましょう。
Athena のテーブルを作成します。データベースは default を利用しています。
CREATE EXTERNAL TABLE IF NOT EXISTS elb_logs_pq (
request_timestamp string,
elb_name string,
request_ip string,
request_port int,
backend_ip string,
backend_port int,
request_processing_time double,
backend_processing_time double,
client_response_time double,
elb_response_code string,
backend_response_code string,
received_bytes bigint,
sent_bytes bigint,
request_verb string,
url string,
protocol string,
user_agent string,
ssl_cipher string,
ssl_protocol string )
PARTITIONED BY(year int, month int, day int)
STORED AS PARQUET
LOCATION 's3://<<bucket name>>/'
tblproperties ("parquet.compress"="SNAPPY");
続いて、パーティションを設定します。今回のスクリプトでは year=xxxx/month=xx/day=xx
という形式のパスとなっているので、パーティションを自動生成できます。
msck repair table elb_logs_pq
パーティションが生成されました。
それではクエリしてみましょう。ステータスコード毎のデータ件数を集計します。
圧縮ファイル、38億レコードの集計が 8秒強で返ってきました!驚異的ですね!
まとめ
Amazon Athena は使い方によって、コスト効率、パフォーマンスが大きく変わります。月1,2回クエリ頻度であれば、データ変換するコストの方がかかるかもしれません。QuickSight で色々な可視化をするようであれば、何度もクエリが発生すると思いますので、今回ご紹介したような方法でコスト効率を向上させることができるかと思います。