Amazon AthenaでETLした結果をRedshiftにロードしてみる

2017.03.08

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

先日の Black Beltオンラインセミナー「Amazon Athena」で、Athenaの想定ユースケースとして「開発者が、大規模でない生データに対して、低頻度でETL処理をする」という話を聞き、実際に Athena で ETL して Redshift にロードしてみることにしました。

サンプルデータ

サンプルデータは下記データをダウンロードし、対象シートを[Orders]のみをS3ファイルに保存して、Athenaで orders_jp テーブルとして定義したものを利用します。

Superstoreサンプルデータ(不具合修正版) |Tableau Community

シナリオ

日本国内の売上情報が含まれるデータファイルがS3上に存在する。データファイルから製品カテゴリ〜製品毎の売上と利益率のデータマートを作成して、BIツールから参照できるようにこのデータマートをRedshiftにロードしたい。

さあ、やってみましょう。

データマートの作成とロード

以下の手順で、AthenaでS3上のデータファイルをETLして、その結果をRedshiftにロードします。

  1. Athena でデータファイルを orders_jp テーブルとして定義する
  2. Athena の orders_jp テーブルに対して集計クエリを実行する
  3. S3に保存される集計クエリの実行結果をロード用ディレクトリにコピー
  4. Redshiftにデータマートをロードする

1. Athena でデータファイルを orders_jp テーブルとして定義する

最初はAthenaでS3上のデータファイルを以下の orders_jp というテーブルにマッピングします。

CREATE EXTERNAL TABLE IF NOT EXISTS orders_jp (
  order_id                string,
  order_date              string,
  order_priority          string,
  order_quantity          int,
  sales                   double,
  discount                double,
  ship_mode               string,
  profit                  int,
  unit_price              int,
  advertising_expenses    double,
  shipping_cost           int,
  customer_name           string,
  prefecture              string,
  city                    string,
  region                  string,
  shop_name               string,
  customer_segment        string,
  product_category        string,
  product_sub_category    string,
  product_id              string,
  product_name            string,
  product_description     string,
  product_container       string,
  product_base_margin     double,
  supplier                string,
  delivery_scheduled_date string,
  shipping_date           string
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = '\t',
  'field.delim' = '\t'
) LOCATION 's3://cm-datalake/orders_jp/';

クエリーが実行できるようになりました。S3に格納されたデータは以下のようなデータです。 20170309-etl-with-athena-1

2. Athena の orders_jp テーブルに対して集計クエリを実行する

シナリオに従い、「データファイルから製品カテゴリ〜製品毎の売上と利益率」のデータマートを作成する集計クエリーを実行します。

SELECT 
  product_category, 
  product_sub_category, 
  product_name, 
  cast(sum(sales) AS bigint) AS sales, 
  cast(sum(profit) AS bigint) AS profit 
FROM orders_jp
GROUP BY 1,2,3
ORDER BY 1,2,3
;

以下のような結果が得られました。 20170309-etl-with-athena-2

3. S3に保存される集計クエリの実行結果をロード用ディレクトリにコピー

クエリーの集計結果は、Settingsの「Query result location」に指定したパスのUnsavedディレクトリの下に、更に年・月・日のディレクトリ作成してデータファイルを自動的に保存します。1回のクエリ毎にデータ(.csv)とメタデータ(.metadata)の2つのファイルが保存されます。 20170309-etl-with-athena-3

今回は必要となるデータ(.csv)のファイルのみをRedshiftのロードデータ用のディレクトリにコピーします。

$ aws s3 cp s3://aws-athena-query-results-xxxxxxxxxxxx-us-east-1/Unsaved/2017/03/07/4db339da-201b-45b9-b083-4612288047c0.csv s3://cm-datalake/athena_etl/order_summary.csv
copy: s3://aws-athena-query-results-xxxxxxxxxxxx-us-east-1/Unsaved/2017/03/07/4db339da-201b-45b9-b083-4612288047c0.csv to s3://cm-datalake/athena_etl/order_summary.csv

4. Redshiftにデータマートをロードする

まず、Redshiftにデータマートをロードするためのテーブルを準備します。

cmdb=> CREATE TABLE orders_summary (
cmdb(>   category VARCHAR(128),
cmdb(>   sub_category VARCHAR(128),
cmdb(>   product_name VARCHAR(128),
cmdb(>   sales BIGINT,
cmdb(>   profit BIGINT
cmdb(> )
cmdb-> ;
CREATE TABLE

データをロードする前に、ファイル形式を確認します。

"product_category","product_sub_category","product_name","sales","profit"
"テクノロジー","コピー機とファックス","Brother DCP1000 Digital 3 in 1 Multifunction Machine","5380949","577265"
"テクノロジー","コピー機とファックス","Canon Image Class D660 Copier","10549240","1002616"
"テクノロジー","コピー機とファックス","Canon Imageclass D680 Copier / Fax","3863655","554255"
"テクノロジー","コピー機とファックス","Canon PC-428 Personal Copier","5782544","988122"

データ(.csv)は先頭行にカラム名のヘッダファイルを持ちます。また、各フィールドの全ての値はダブルクオート「"」によって囲まれています。この点を考慮してCOPYコマンドのオプションを指定します。

最後にデータをロードします。先頭行をスキップするために「IGNOREHEADER AS 1」を指定します。また、カラムデータのダブルクオート「"」を除去するため「CSV QUOTE '"'」の指定も必要です。

cmdb=> COPY orders_summary
cmdb->  FROM 's3://cm-datalake/athena_etl/order_summary.csv'
cmdb->  CREDENTIALS 'aws_access_key_id=<aws_access_key_id>;aws_secret_access_key=<aws_secret_access_key>'
cmdb->  CSV QUOTE '"'
cmdb->  IGNOREHEADER AS 1
cmdb-> ;
INFO:  Load into table 'orders_summary' completed, 1282 record(s) loaded successfully.
COPY

以下の通り、意図した形式で格納されたことが確認できます。

cmdb=> select * from orders_summary limit 5;
   category   |      sub_category      |                     product_name                     |  sales   |  profit
--------------+------------------------+------------------------------------------------------+----------+----------
 テクノロジー | コピー機とファックス   | Canon PC-428 Personal Copier                         |  5782544 |   988122
 テクノロジー | コピー機とファックス   | Canon imageCLASS 2200 Advanced Copier                | 10813685 | -3149376
 テクノロジー | コピー機とファックス   | Sharp AL-1530CS Digital Copier                       | 16255847 |  1177030
 テクノロジー | コンピューター周辺機器 | Acco Keyboard-In-A-Box®                              |   306083 |    -2916
 テクノロジー | コンピューター周辺機器 | Belkin 107-key enhanced keyboard, USB/PS/2 interface |   363399 |   -50937
(5 rows)

最後に

Amazon Athenaのクエリ結果はS3ファイルとして保存されるので、この機能を利用することで簡易なETLが可能であることが確認できました。Redshiftでは、Nested−JSONの構造化データへの変換 や カラムの中のカンマ区切りデータを複数レコードに分解することができないという課題がありましたが、Athenaと組み合わせることで解決できます。今回はクエリ結果のS3ファイルを手動でコピーしてRedshiftにロードしましたが、これらをプログラムで自動化することも可能です。機会があればブログで紹介したいと思います。

昨年のre:Invent2016 でフルマネージドのETLサービス AWS Glue が発表されましたが、本日時点でサービス開始されていません。ETLは将来的に AWS Glue が第一候補となると考えられますが、個人的にはワークロードに合わせ適材適所で GlueのSpark と AthenaのPresto、より大規模で全てに対応できるEMRと 使い分けられるとベストではないかと考えています。