データ準備機能詳解(テーブル結合) #quicksight #15 | Amazon QuickSight Advent Calendar 2016
当エントリは『Amazon QuickSight Advent Calendar 2016』の15本目のエントリです。
昨日の14本目のエントリは『Heat Map(ヒートマップ)』でした。
『AWS re:Invent 2016』の直前に一般利用可能となったAWSによるBIサービス、Amazon QuickSight。発表されたばかりですが、早速Amazon QuickSightを使い倒すべく色々な切り口でその内容について見て行きたいと思います。
15本目となる当エントリでは、Amazon QuickSightのデータ準備機能の1つ、『テーブル結合』の内容について見て行きたいと思います。
『テーブル結合』とは
データセット内の複数のテーブルを組みあせわせて使いたいという場合、Amazon QuickSightでは『テーブル結合』のインタフェースを使う事で2つ以上のテーブルを結合させる事が出来ます。テーブル結合を使うと、結合タイプ及び結合に使用する項目を指定する事が出来ます。結合で使用される項目はデータソース由来のもので、計算フィールドには適用出来ません。また、このテーブル結合では追加でSQL文を使ってデータセットを絞り込むという事も出来ません。データを絞り込みたいという要件がある場合は『カスタムSQL』の機能を使ってください(※この内容については後日のエントリで言及します)
テーブル結合ではSPICEを使用し、SQLデータベースのデータソースに基づくデータセットのテーブル結合を行う事が出来ます。
テーブル結合の機能を使う場合、その前までに任意の項目に対して行っていた変更(フィールド名の変更や計算フィールドの追加等)は破棄されます。
データの準備
ではこの機能を紹介する上で必要なサンプルデータを用意します。下記『Movielens』のサイトから小さめのサイズのデータをダウンロード。
アーカイブされた内容を展開してみます。一番大きいデータでratingsの10万件ですね。
$ ll total 1832 -rw-r--r--@ 1 xxxxxxxxx staff 8364 10 17 14:24 README.txt -rw-r--r--@ 1 xxxxxxxxx staff 80520 10 17 11:12 links.csv.gz -rw-r--r--@ 1 xxxxxxxxx staff 156510 10 17 11:12 movies.csv.gz -rw-r--r--@ 1 xxxxxxxxx staff 667187 10 17 11:12 ratings.csv.gz -rw-r--r--@ 1 xxxxxxxxx staff 13977 10 17 11:12 tags.csv.gz $ $ wc links.csv 9126 9126 183372 links.csv $ wc movies.csv 9126 39127 458390 movies.csv $ wc ratings.csv 100005 100005 2438266 ratings.csv $ wc tags.csv 1297 1887 41902 tags.csv
gzip圧縮を行い、
$ gzip links.csv $ gzip movies.csv $ gzip ratings.csv $ gzip tags.csv
任意のS3バケットにアップロードしておきます。
データ各種をRedshiftのテーブルに格納、整備しておきます。データ投入に用いたテーブル毎のSQL定義文・実行文は以下の通りです。
テーブル:links
# CREATE SCHEMA movielens;
# DROP TABLE IF EXISTS movielens.links_load; CREATE TABLE IF NOT EXISTS movielens.links_load ( movie_id INT NOT NULL, imdb_id INT, tmdb_id INT ); # COPY movielens.links_load FROM 's3://zzzzzzzzzzzzz/movielens/links.csv.gz' CREDENTIALS 'aws_access_key_id=XXXXX;aws_secret_access_key=YYYYY' IGNOREHEADER 1 CSV gzip; # ANALYZE COMPRESSION movielens.links_load; Table | Column | Encoding | Est_reduction_pct ------------+----------+----------+------------------- links_load | movie_id | delta | 0.00 links_load | imdb_id | lzo | 3.02 links_load | tmdb_id | mostly16 | 0.00 (3 rows) # DROP TABLE IF EXISTS movielens.links; # CREATE TABLE IF NOT EXISTS movielens.links ( movie_id INT encode delta NOT NULL, imdb_id INT encode lzo, tmdb_id INT encode mostly16, PRIMARY KEY(movie_id) ) DISTSTYLE ALL; # INSERT INTO movielens.links (SELECT * FROM movielens.links_load);
テーブル:movies
# DROP TABLE IF EXISTS movielens.movies_load; # CREATE TABLE IF NOT EXISTS movielens.movies_load ( movie_id INT NOT NULL, title VARCHAR(200) NOT NULL, genres VARCHAR(200) NOT NULL ); # COPY movielens.movies_load FROM 's3://zzzzzzzzzzzzz/movielens/movies.csv.gz' CREDENTIALS 'aws_access_key_id=XXXXX;aws_secret_access_key=YYYYY' IGNOREHEADER 1 CSV gzip; # ANALYZE COMPRESSION movielens.movies_load; Table | Column | Encoding | Est_reduction_pct -------------+----------+----------+------------------- movies_load | movie_id | delta | 0.00 movies_load | title | lzo | 0.00 movies_load | genres | lzo | 0.00 (3 rows) # DROP TABLE movielens.movies; # CREATE TABLE movielens.movies ( movie_id INT encode delta NOT NULL, title VARCHAR(200) encode lzo NOT NULL, genres VARCHAR(200) encode lzo NOT NULL, PRIMARY KEY(movie_id) ) DISTSTYLE ALL; # INSERT INTO movielens.movies (SELECT * FROM movielens.movies_load);
テーブル:ratings
# DROP TABLE IF EXISTS movielens.ratings_load; # CREATE TABLE IF NOT EXISTS movielens.ratings_load ( user_id INT NOT NULL, movie_id INT NOT NULL, rating FLOAT, timestamp_bigint BIGINT, timestamp TIMESTAMP ); # COPY movielens.ratings_load (user_id, movie_id, rating,timestamp_bigint) FROM 's3://zzzzzzzzzzzzz/movielens/ratings.csv.gz' CREDENTIALS 'aws_access_key_id=XXXXX;aws_secret_access_key=YYYYY' IGNOREHEADER 1 CSV gzip; # CREATE OR REPLACE FUNCTION cm_unixts_to_timestamp(ts BIGINT, units CHAR(2)) RETURNS timestamp STABLE AS $$ import pandas return pandas.to_datetime(ts, unit=units.rstrip()) $$ LANGUAGE plpythonu; DROP TABLE IF EXISTS movielens.ratings_tmp; CREATE TABLE IF NOT EXISTS movielens.ratings_tmp ( user_id INT NOT NULL, movie_id INT NOT NULL, rating FLOAT, timestamp_bigint BIGINT, timestamp TIMESTAMP ); # INSERT INTO movielens.ratings_tmp ( SELECT user_id, movie_id, rating, timestamp_bigint, cm_unixts_to_timestamp(timestamp_bigint, 's') FROM movielens.ratings_load); # ANALYZE COMPRESSION movielens.ratings_tmp; Table | Column | Encoding | Est_reduction_pct -------------+------------------+----------+------------------- ratings_tmp | user_id | lzo | 86.50 ratings_tmp | movie_id | delta32k | 43.61 ratings_tmp | rating | bytedict | 86.15 ratings_tmp | timestamp_bigint | lzo | 50.43 ratings_tmp | timestamp | lzo | 15.92 (5 rows) # DROP TABLE IF EXISTS movielens.ratings; # CREATE TABLE IF NOT EXISTS movielens.ratings ( user_id INT encode lzo NOT NULL, movie_id INT encode delta32k NOT NULL, rating FLOAT encode bytedict, timestamp TIMESTAMP, PRIMARY KEY(movie_id,timestamp) ) DISTKEY(movie_id) SORTKEY(timestamp); # INSERT INTO movielens.ratings (SELECT user_id, movie_id, rating, timestamp FROM movielens.ratings_tmp ORDER BY timestamp);
テーブル:tags
DROP TABLE movielens.tags_load; CREATE TABLE movielens.tags_load ( user_id INT NOT NULL, movie_id INT NOT NULL, tags VARCHAR(300), timestamp_bigint BIGINT, timestamp TIMESTAMP ); # COPY movielens.tags_load (user_id, movie_id,tags,timestamp_bigint) FROM 's3://zzzzzzzzzzzzz/movielens/tags.csv.gz' CREDENTIALS 'aws_access_key_id=XXXXX;aws_secret_access_key=YYYYY' IGNOREHEADER 1 CSV gzip MAXERROR AS 100; DROP TABLE movielens.tags_tmp; CREATE TABLE movielens.tags_tmp ( user_id INT NOT NULL, movie_id INT NOT NULL, tags VARCHAR(300), timestamp_bigint BIGINT, timestamp TIMESTAMP ); INSERT INTO movielens.tags_tmp (SELECT user_id, movie_id, tags, timestamp_bigint, cm_unixts_to_timestamp(timestamp_bigint, 's') FROM movielens.tags_load); # ANALYZE COMPRESSION movielens.tags_tmp; Table | Column | Encoding | Est_reduction_pct ----------+------------------+----------+------------------- tags_tmp | user_id | lzo | 86.56 tags_tmp | movie_id | delta32k | 36.01 tags_tmp | tags | lzo | 47.46 tags_tmp | timestamp_bigint | mostly32 | 47.69 tags_tmp | timestamp | lzo | 10.63 (5 rows) DROP TABLE movielens.tags; CREATE TABLE movielens.tags ( user_id INT encode lzo NOT NULL, movie_id INT encode delta32k NOT NULL, tags VARCHAR(300) encode lzo, timestamp TIMESTAMP, PRIMARY KEY(movie_id,timestamp) ) DISTSTYLE ALL SORTKEY(timestamp); # INSERT INTO movielens.tags (SELECT user_id, movie_id, tags, timestamp FROM movielens.tags_tmp ORDER BY timestamp);
整形(ELT)に用いた中間テーブルを削除した上で、データの内容を確認してみます。
# DROP TABLE movielens.links_load; # DROP TABLE movielens.movies_load; # DROP TABLE movielens.ratings_load; # DROP TABLE movielens.ratings_tmp; # DROP TABLE movielens.tags_load; # DROP TABLE movielens.tags_tmp; # SET SEARCH_PATH TO '$user', public, movielens; SET # select trim(pgdb.datname) as Database, trim(pgn.nspname) as Schema, trim(a.name) as Table, b.mbytes, (CAST(b.mbytes as double precision) / 1024) as gbytes, to_char(a.rows, '9999,9999,9999,9999') as rows_jp from ( select db_id, id, name, sum(rows) as rows from stv_tbl_perm a group by db_id, id, name ) as a join pg_class as pgc on pgc.oid = a.id join pg_namespace as pgn on pgn.oid = pgc.relnamespace join pg_database as pgdb on pgdb.oid = a.db_id join ( select tbl, count(*) as mbytes from stv_blocklist group by tbl ) b on a.id = b.tbl where trim(pgn.nspname) = 'movielens' order by mbytes desc, a.db_id, a.name; database | schema | table | mbytes | gbytes | rows_jp -------------+-----------+---------+--------+------------+---------------------- cmawsteamdb | movielens | ratings | 56 | 0.0546875 | 10,0004 cmawsteamdb | movielens | tags | 28 | 0.02734375 | 2592 cmawsteamdb | movielens | links | 12 | 0.01171875 | 1,8250 cmawsteamdb | movielens | movies | 12 | 0.01171875 | 1,8250 (4 rows)
データ準備機能:テーブル結合の実践
ここからは機能の実践です。データを投入したAmazon Redshiftクラスタを使って接続を確立させ、
対象となるデータが存在するスキーマ及びテーブルをまずは選択します。ここでは一番件数の多い、可視化の軸となるratingsテーブルを選択しました。[Select]押下。
SPICEへの取り込みを行い、[Edit/Preview data]でデータ準備画面に遷移します。
編集画面で[Tables]ペインを選択。初期の状態では選択済みのテーブル(ratings)にチェックが入っているのみとなりますが、ここで別のテーブル(links)のチェックを選択してみます。すると、ratingsとlinksのテーブルが赤丸の記号で連結されました。
上記の状態ではまだ結合された状態とはなっていません。赤丸部分を選択すると以下の様にキーの結合条件設定部分が展開されますので所定の条件で設定を行います。
今回は一通りのデータがmovie_idで結合可能な状態となっていますので、以下の様な条件としました。結合のタイプは
- Inner(内部結合)
- Left(左辺外部結合)
- Right(右辺外部結合)
- Outer(完全外部結合)
と選べますので任意の結合方法を選択してください。ここではInner(内部結合)を選択しました。
上記と同様の手順で他の2テーブルについても結合を行います。
併せて4つのテーブルが結合された状態となりました。[Save Vizualize]で先に進みます。
ここまで出来れば、後は通常のデータソース同様に、複数のテーブルデータを1つに結合した形(今回は更にそれをSPICEに結合した形)で扱う事が出来るようになります。
まとめ
という訳でAmazon QuickSightのデータ準備機能である『テーブル結合』に関する内容の紹介でした。今回はAmazon Redshiftクラスタのスペックはdc1.large 2ノードという非常にミニマムな形で検証を行っています。サンプルデータには大きめのサイズ(トータル200MB超、ratingsの件数も2000万件超)も存在し、こちらのデータを使ったテーブル結合についても試してはみたのですが件数に対するスペックが貧弱なせいか、処理が1時間で終わらずに(QuickSightからAmazon Redshiftに投げていたクエリ:まずはじめにUNLOADが実行されていました)1時間経過後に強制終了されてしまっていました。この辺り、処理に時間が掛かった場合に制限時間があるのかどうかは定かではありませんが、非常に大きな件数を相手にこの機能を使う場合はクラスタのスペック等も踏まえて一度検証をしてみる必要があるのかも知れません。
件数が多く処理に時間が掛かるという事が前もって判明しているのであれば、Redshift側で(今回QuickSightで行ったテーブル結合のELT処理を)予め済ませておくというのも手だと思います。ちなみに今回扱ったデータ量での操作は瞬殺で終わりました。Redshiftのスペックと扱うデータ量、結合の条件度合いによってこの辺り利用シーンを検討頂けますと幸いです。こちらからは以上です。