Black Beltオンラインセミナー「Amazon Athena」レポート

こんにちは、虎塚です。

2017年3月1日(水)のBlack Beltオンラインセミナーを受講したので、レポートします。今回のテーマは、Amazon Athenaです。

講師は、アマゾンウェブサービスジャパン株式会社のソリューションアーキテクト、志村誠さんでした。

Amazon Athena概要

Amazon Athenaは、一言でいうと、S3上のデータに標準SQLでインタラクティブなクエリを投げて、データ分析ができるサービス

2016年11月に開催されたre:Invent 2016で発表された。現時点では、バージニア北部、オレゴン、オハイオリージョンで利用できる。

内部では、分散クエリエンジンのPrestoと、Hive Metastore互換のデータカタログを使用している。

Athenaの特徴

  • サーバレスで単にクエリを投げればよく、インフラ管理を気にする必要がない
  • 数百TBの大規模データに対しても高速にクエリを投げられる
  • 事前のデータロードが不要で、S3に置いたログに直接クエリできる
  • クエリ内で実際にスキャンしたデータに対して課金される
  • すでに使っているBIツールから、JDBC経由で直接クエリできる

Athenaの想定ユースケース

  • アナリストが、新しく取得したデータをデータウェアハウスに入れる価値があるか、探索的に検証する
  • アナリストが、コールドデータ (使用頻度の低いデータや過去のデータ) に対して、BIツールを使って低頻度にアドホックな分析をする
    • すべてのデータをデータウェアハウスに入れると料金が高くなるので、あまり使わないデータはS3に置きたい場合など
  • サーバ運用担当が、Webサーバで障害が起きた時に、S3に蓄積したアクセスログを漁って原因を調査する
  • 開発者が、大規模でない生データに対して、低頻度でETL処理をする

クイックスタート (Athenaを使ったデモ)

データをS3にデータを置いて、Athenaでクエリするデモ

  • 使用データ: 客船タイタニック号の乗客データ (CSV)
    • 列は、左からID、船室、乗客の年齢、乗客の性別、乗客の生死

デモ準備

S3バケット (ここではblackbelt-athena-demo) を作成して、上のCSVファイルをアップロードする。

Athenaでは、ディレクトリパスを指定してクエリを発行するので、バケット直下にデータをアップロードしてはいけない。データを入れるバケットには、必ずディレクトリを作成する。

バケットにディレクトリ (titanic)を作り、その中にCSVをアップロードした。

デモ 1

AWS Management Consoleで、Athenaのコンソール画面を開く。

まず、データベースを作成するクエリを入力し、実行する。

create database blackbelt_demo

次に、テーブルを作成するクエリを入力し、実行する。

create external table blackbelt_demo.titanic (
  id int,
  class string,
  age int,
  sex string, 
  survived int
)
row format delimited fields terminated by ',' escaped by '\\'
lines terminated by '\n'
location 's3://blackbelt-athena-demo/titanic/'

それから、簡単なクエリを投げてみる。

select * from blackbelt_demo.titanic limit 3

クエリ結果が、[Results]タブに表示される。Resultsタブ右側のボタンを押すと、結果をファイルでダウンロードできる。

また、Athenaコンソール上部の[History]タブを開くと、これまでに実行したクエリが一覧で表示される。実行したクエリを選び、クエリ結果をダウンロードすることもできる。この画面で、各クエリによって発生したData scanサイズも確認できる。

なお、クエリ結果は、S3バケットにそのまま保存される。保存先のバケットは、Athenaコンソール右側の[Settings]メニューのQuery result locationから設定できる。

さらに、分析クエリを投げてみる。船室クラスと性別でGROUP BYして、カテゴリごとに生存者を表示する。

select
  class
  , sex
  , count(survived) as total_cnt
  , sum(survived) as survived_cnt
from
  blackbelt_demo.titanic
group by
  class
  , sex
order by class, sex

続けて、各カテゴリで何パーセントの人が生き残ったかを表示するクエリを発行する。

select
  class
  , sex
  , total_cnt
  , survived_cnt
  , round (cast (survived_cnt as double) / cast (total_cnt as double), 2) as survival_rate
from (
  select
    class
    , sex
    , count(survived) as total_cnt
    , sum(survived) as survived_cnt
  from
    blackbelt_demo.titanic
  where
    class != '*'
group by
  class
  , sex
)
order by survival_rate desc

生存率順に降順にデータが出る(タイタニック号では避難時に女性と子供を優先させた背景がある。そのことを示すデータが出る)。

デモ 2: QuickSight

SQLでクエリを書かなくても、BIツールでデータをそのまま可視化したり分析したりできる。今回はAmazon QuickSightを使う。

QuickSightのコンソールを開き、新しいデータセットとしてAthenaをクリックする。モーダルダイアログの[Data source name]でtitanicを選び、先ほどのデモで使ったデータベース (blackbelt_demo) とテーブル (titanic) を選択する。

画面が表示されると、[Field list]に項目が出てくる。[Visual types]で表形式を選び、画面を数回クリックしてフィールド設定をするだけで、数秒でグラフを表示できる。

さらに、Add visualでピボットテーブルを追加する。生存率と平均を表示する表を作成することも簡単にできる。

このように、SQLを一切書かずに、ツール上でデータを可視化して分析できる。

Amazon Athenaの構成

Presto: 高速な分散クエリエンジン

  • Prestoとは、Athenaで使用しているクエリエンジンで、すべてをメモリ上で処理する
  • ノードが処理中に故障したり、データが大きすぎてあふれたりしたときは、クエリ自体が失敗する
  • バッチ処理ではなく、インタラクティブ処理に向いている

  • 参照: Prestoのアーキテクチャ Presto | Overview

Hive メタストア

  • Hiveメタストアは、Athenaのデータカタログと互換性がある
    • Hive自体は、SQLライクな記法でHadoop上のバッチ処理を書くための言語
  • データソースに対してスキーマを定義し、テーブルのように扱える
  • HDFS上のデータをベースとしていて、標準SQLと異なるDDLを持つ

Athenaのテーブル定義

テーブル定義の後に、データ形式や圧縮形式などを指定する。

CREATE EXTERNAL TABLE IF NOT EXISTS action_log (
  user_id string
  , action_category string
[...]
)
  PARTITIONED BY (year int, month int)   -- パーティション
  STORED AS PARQUET    -- データ形式
  LOCATION 's3://athena-examples/action-log/'    -- データの場所
  TBLPROPERTIES ('PARQUET.COMPRESS'='SNAPPY')    -- 圧縮形式
  • HiveのDDLと同じ形式なので、すでにEMRを使っていてHive DDLでテーブル定義している場合は、それをAthena上で実行すれば、すぐにクエリを投げられる
    • 既存のEMRのHiveメタストアに、直接アクセスできないので、DDLをAthena上で実行する必要がある
  • クエリを発行する際にスキーマを当てはめる (schema-on-read) ので、同一データに複数のスキーマを当てはめることができる

Athenaのデータ型

Athenaのデータ形式 / 圧縮形式

  • データ形式: CSV, TSV, Parquet, ORC, JSON, Regex, Avro
    • 2017年2月にAvroとOpenCSVSerDeをサポート
    • JSONは、Hive-JsonSerDeとOpenx-JsonSerDeの2つが利用できる
  • 圧縮形式: Snappy, Zlib, GZIP
    • Lzoは未サポート

Athenaのクエリ

  • 標準ANSI SQLに準拠
    • WITH句やJOINなどに対応
  • Prestoが用意する関数は、基本的に使用できる

AthenaのDDL/クエリの注意点

  • 現状、CREATE TABLE AS SELECT句は使用できない
  • EXTER TABLEのみ利用できる
    • Athena自身は内部にデータを保持しないので、データを外部表としてのみ読み込む
  • トランザクション処理、UDF/UDAF、ストアドプロシージャ、ビューなどは未サポート
  • 変更処理はACIDを保証する
    • 同時に複数のDDLを発行しても、成功するのは1つだけ

データ設計とクエリ特性

データ設計に影響するAthenaの特性

  • オンライントランザクション処理 (OLTP) でなくオンライン分析処理 (OLAP) 向け
  • ETLでなく分析向け
    • データのフルスキャンして変換すると高コストになる設計
    • リトライがないため、安定したバッチ処理はできない
  • 読み込むデータ量を減らすことが重要
    • パーティション、列指向フォーマット、圧縮を工夫

典型的なオンライン分析処理のクエリ

SELECT
  col1
  , col2
  , COUNT(col3)
  , SUM(col3)
FROM
  table1
  INNER JOIN table2
  ON table1.id = table2.id
WHERE
  table1.id = table2.id
  AND col4 = 1
  AND col5 = "good"
GROUP BY
  col1
  , col2
  • SELECTして、集合関数を使い、JOINし、WHERE句で条件を絞り、GROUP BYで集約する
  • データ量を削るためにWHEREとSELECTが重要
  • スキャン量が減れば速度が上がり、利用料金が安くなる

パーティション

  • 実際にファイルが置かれたS3のオブジェクトキーの構成をCREATE TABLEに反映する
  • WHERE句で絞ると、当該ディレクトリだけ読み込む
    • 上手くパーティション設計をすると、読み込み範囲を限定できる
  • 1テーブルあたりの最大パーティション数は20000個

CREATE TABLE文の例を示す。

[...]
PARTITIONED BY (year int, month int, day int)
STORED AS PARQUET
LOCATION 's3://athena-examples/action-log'
[...]
s3://athena-examples/action-log/year=2017/month=03/day=01/data.gz

パーティションの効果的な使い方

  • Where句で読み込み範囲を絞るときに、頻繁に使うカラムをキーに指定すると、絞り込み効果が高い
    • ログデータなら日付が定番
    • "year/month/day"と階層で指定する
WHERE
  year = 2016
  AND month >= 4
  AND month < 7

上のように指定すると、次のS3パスにあるデータだけを読み込む。

s3://athena-examples/actino-log/year=2016/month=04/day=01/
s3://athena-examples/actino-log/year=2016/month=04/day=02/
[...]
s3://athena-examples/actino-log/year=2016/month=07/day=31/

パーティションの分け方

  • カラム名あり (Hive標準)
    • 形式: col1=val1/col2=val2/
    • 特徴
    • CREATE TABLE実行後に、MSCK REPAIR TABLEを実行する。これによって、含まれるすべてのパーティションが認識される
    • パーティションが増えた時も、MSCK REPAIR TABLEを1回だけ実行すればよい
    • この形式にするために前処理が必要
  • カラム名なし
    • 形式: val1/val2/
    • 特徴
    • ALTER TABLE ADD PARTITIONをパーティションの数だけ実行する
    • パーティションが増えた時は、ALTER TABLE ADD PARTITIONを増えた数だけ実行する
    • あらかじめ未来の期間までパーティションを作っておけば、ファイルを置いた瞬間にクエリを発行できる

列指向フォーマット

  • 列指向
    • カラムごとにデータを保存するので、特定の列を扱うときはファイル全体を読む必要がない
    • OLAP向き
    • ORC, Parquetなど
  • 行指向
    • レコード単位でデータを保存するので、特定の列だけ扱うときでもレコード全体を読む必要がある
    • OLTP向き
    • 行ごとにロックをかけたい場合は、行指向が効率的
    • TEXTFILE (CSV, TSV) など

列指向フォーマットを使うメリット

  • OLAP系クエリを効率的に実行できる
    • たいていの分析クエリでは、100個カラムがあっても5個、10個しか使わない
    • 統計データを持っているので、単純な統計ならメタデータを参照すればよく、データスキャンすら発生しない
  • I/O効率を上げやすい - 圧縮と同時に使うことでさらに効率が上がる

    • 同じカラムに似たような中身のデータが入るため、圧縮効率がよい

列指向フォーマットと圧縮の使い方

  • CREATE TABLE文でフォーマットを指定するだけ
  • データは事前にフォーマット変換し、圧縮する必要がある

monthとaction_categoryだけみる

パフォーマンスの比較

次の2種類のデータに対するパフォーマンス比較が紹介された。

  • Parquet形式を使い、Snappy形式で圧縮
  • Regex形式を使い、圧縮なし

  • 列指向フォーマットを使うことで、S3上のデータサイズが1TBから130GBに削減

  • 列指向フォーマットに対するクエリのほうが、30倍以上高速
  • 列指向フォーマットを使うことで、スキャンデータ (≒利用料金) を99%削減できた

  • 検証の詳細: S3のデータをAmazon Athenaを使って分析する | Amazon Web Services ブログ

Hive on EMRで列指向フォーマットに変換

  • EMRを使ってデータを列指向フォーマットへ変換できる
    • Hiveでテーブルを定義してデータをロードし、INSERT OVERWRITEで書き込
    • パーティション分けも同様に実施
  • Sparkでの変換も同様
    • 詳細: https://github.com/awslabs/aws-big-data-blog/tree/master/aws-blog-spark-parquet-conversion

Athenaの使いどころ / Update / Tips

Athenaは、アドホックで低頻度の小規模なデータ分析に向く。詳細は、前掲の「Athenaの想定ユースケース」を参照。

Athenaが向いていない処理

Athenaにはリトライの仕組みがなく、データを絞って高速にスキャンするため、バッチ処理には向かない。また、大量データを定期的に長時間処理する用途には向かない。

Athenaに不向きなユースケースごとに、適したサービスを次に示す。用途に合ったサービスを使ったほうがコスト効率がよい。

  • 大規模データにフルスキャンを定期的におこなう → EMR
  • テンポラリテーブルを使った多段ETL処理 → EMR, Glue
  • サブクエリやJOINを駆使した複雑な集計処理 → Redshift
  • 高頻度なレポーティングのための大量分析処理 → Redshift

リファレンスアーキテクチャ

データソースから読み込んだデータをEMRで加工してS3に出力し、Redshiftにロードして、QuickSightで見るようなアーキテクチャがあるとする。Athenaは、このデータパイプラインを補完する形で、次のような箇所に活用できる。

  • 障害発生時に生ログを調査、検証するためのアドホックなクエリ発行
  • データウェアハウスにデータをロードする前の軽い分析
  • Redshiftにデータをロードする手間を省きたいくらい小規模の新サービス
    • ちょっとしたダッシュボード表示のバックエンド

事例

DataXu社では、KinesisからETL (Spark SQL) をはさんでS3に置いた180 TB / dayのデータを、Athenaで分析している。次のスライドにユースケースが載っている。

また、Athenaの利用企業であるGunosy社やJapan Taxi社のコメントを次のリリースで見ることができる。

リソースに関する制約

  • 1アカウントあたりの最大クエリ同時実行数: 5個
  • クエリのタイムアウト 30分
  • 1アカウントあたりの最大データベース数: 100個
  • 1データベースあたりの最大テーブル数: 100個
    • したがって、1アカウントあたりのテーブル数: 10,000個
  • 1テーブルあたりの最大パーティション数: 20,000個

上記はすべてソフトリミットのため、上限緩和申請で増やせる。

I/Oに関する制約

入力の制約

  • どのリージョンのS3からでもデータ読み込みできる
    • オハイオリージョンのみ同一リージョンのS3バケットからのデータ読み込みにだけ対応
  • ただし、リージョンをまたぐとデータ転送量がかかり、転送時間も長くなる
  • 暗号化データ読み込みは、SSE-S3SSE-KMSに対応

出力の制約

  • Athena実行リージョンのS3バケットにヘッダ付きCSVでのみ出力できる
  • 結果ファイルの暗号化は未対応
  • SELECT INSERTには未対応

データアクセスに関する制約

料金体系

  • クエリ単位の従量課金
  • S3のデータスキャンに対して、$5 / 1 TB
    • バイト数はMB単位で切り上げ。10MB未満のクエリは10MBと計算される
  • 別リージョンのS3からデータを読み込む場合は、別途S3のデータ転送料がかかる
  • DDLのクエリや、実行に失敗したクエリは無料

Athenaでは、パーティション、列指向フォーマット、圧縮を利用してスキャンデータ量を減らすことで利用料金を削減できる。

2017年2月のリリース

新機能

性能改善

  • 大規模なParquetテーブルのクエリ速度が改善された

OpenCSVSerDe (補足)

  • 囲み文字をquoteCharで指定できる
  • OpenCSVSerDe自体の制約に注意
    • カラム型はstringのみサポートされる
    • 文字列内の改行文字は非サポート

オハイオリージョンのローンチ (補足)

オハイオリージョンで作成したテーブルのデータは、オハイオリージョンのS3バケットに保存されている必要がある

Tips

JOINクエリのテーブルの並べ方

  • JOINを書くとき、大きなテーブルを左側に持ってくる
    • 右側のテーブルを各ノードにブロードキャストしてJOINするため
    • PrestoにはCost-based optimizerがないので、クエリを書く側が注意する必要がある
FROM
  large_table
  INNER JOIN small_table
  ON large_table.id = small_table.id

テーブル / カラム名についての注意点

  • アンダースコアから始まるテーブル / カラム名は、バッククォートで囲む
  • アンダースコア以外の記号を使えない
CREATE EXTERNAL TABLE `_action_log` (
  `_user_id` string
  , action_category string
[...]
  • テーブル / カラム名には、大文字小文字の区別がない

  • テーブル / カラム名には、予約語を使える

    • ただし、DDLではバッククォートで、クエリではダブルクォートで囲む
    • クエリをシングルクォートで囲むと文字列として扱われる
CREATE EXTERNAL TABLE `join` (
  `where` string
  , action_category string
[...]

データの場所についての注意点

  • S3のキーは、必ず末尾をスラッシュ (/) にする
    • スラッシュまで一致するキーすべてが、テーブルの中身として認識される
    • ワイルドカードやオブジェクト名の直指定は不可
  • パーティションがある場合は、先にMSCKやADD PARTITIONを実行する
LOCATION 's3://athena-examples/action-log/'    -- ok
LOCATION 's3://athena-examples/action-log'    -- NG
LOCATION 's3://athena-examples/action-log/*'    -- NG
LOCATION 's3://athena-examples/action-log/data.csv'    -- NG

JSONデータ読み込みについての注意点

  • 正しいSerDeを指定する
  • JSONレコードは1行で書く (SerDeの仕様)
  • 不正な形式のレコードはオプションで無視できる
WITH SERDEPROPERTIES ('ignore.malformed.json' = 'true')

まとめ

  • Athenaは、S3上のデータに標準SQLでインタラクティブなクエリを投げて、データ分析ができるサービス
  • 分析系のアドホックなクエリに向く
  • QuickSightやJDBCから簡単にアクセスできる
  • パーティション、データフォーマット、圧縮形式でパフォーマンスが向上する
  • すべてをAthenaでやるのでなく、適材適所で使い分ける

Q&A

  • Q: ログ解析用途で、複数ファイルを一テーブルとして扱い、クエリを発行できますか。
    • A: Yes. ディレクトリの下 (スラッシュより後) のファイルは、すべて同一パーティション内のデータとして扱えます。
  • Q: 固定長テキストは扱えますか
    • A: 正規表現で対応できると思います。
  • Q: Athenaをアプリから呼び出すにはどうすればよいですか。
    • A: AthenaにはJDBCから接続できるので、Javaアプリケーションを作って、その中から呼び出してください。PHPやRubyでアプリケーションを書くときは、いったんJavaでAthenaにアクセスするAPIを作って、PHPやRubyからはそのAPIを呼び出します。
  • Q: Athenaに向かない処理として「高頻度な分析」とありましたが、高頻度とはどれくらいの頻度ですか。
    • A: Athenaのクエリ同時並列実行数の制約が5なので、それを恒常的に上回る場合が該当します。もちろん上限緩和が可能ですが、恒常的に100や200などのクエリが発行されるようなケースはきびしいです。
  • Q: 分析用途ということで、Redshiftとの使い分けを教えてください。
    • A: Redshiftは、リソースをあらかじめ確保しておき、それに対していくらクエリを投げても利用料が定額であることが特徴です。クエリ数が膨大でRedshiftを使ったほうが安いときはRedshiftを、Athenaのほうが安いときはAthenaを選んでいただければと思います。

おわりに

Athenaに適したユースケースがよくわかるセミナーでした。障害が発生した時には、S3に溜め込んだアクセスログに素早くアクセスして、「こんな時にはAthenaがあってな」と言いながら原因を調査できるように、キャッチアップしておきます。

Black Beltオンラインセミナーのスケジュールは、次のページで公開されています。

それでは、また。