データ準備機能詳解(テーブル結合) #quicksight #15 | Amazon QuickSight Advent Calendar 2016

2016.12.15

当エントリは『Amazon QuickSight Advent Calendar 2016』の15本目のエントリです。

昨日の14本目のエントリは『Heat Map(ヒートマップ)』でした。

『AWS re:Invent 2016』の直前に一般利用可能となったAWSによるBIサービス、Amazon QuickSight。発表されたばかりですが、早速Amazon QuickSightを使い倒すべく色々な切り口でその内容について見て行きたいと思います。

15本目となる当エントリでは、Amazon QuickSightのデータ準備機能の1つ、『テーブル結合』の内容について見て行きたいと思います。

『テーブル結合』とは

データセット内の複数のテーブルを組みあせわせて使いたいという場合、Amazon QuickSightでは『テーブル結合』のインタフェースを使う事で2つ以上のテーブルを結合させる事が出来ます。テーブル結合を使うと、結合タイプ及び結合に使用する項目を指定する事が出来ます。結合で使用される項目はデータソース由来のもので、計算フィールドには適用出来ません。また、このテーブル結合では追加でSQL文を使ってデータセットを絞り込むという事も出来ません。データを絞り込みたいという要件がある場合は『カスタムSQL』の機能を使ってください(※この内容については後日のエントリで言及します)

テーブル結合ではSPICEを使用し、SQLデータベースのデータソースに基づくデータセットのテーブル結合を行う事が出来ます。

重要:
テーブル結合の機能を使う場合、その前までに任意の項目に対して行っていた変更(フィールド名の変更や計算フィールドの追加等)は破棄されます。

データの準備

ではこの機能を紹介する上で必要なサンプルデータを用意します。下記『Movielens』のサイトから小さめのサイズのデータをダウンロード。

quicksight-data-preparation-join-tables_01

アーカイブされた内容を展開してみます。一番大きいデータでratingsの10万件ですね。

$ ll
total 1832
-rw-r--r--@ 1 xxxxxxxxx  staff    8364 10 17 14:24 README.txt
-rw-r--r--@ 1 xxxxxxxxx  staff   80520 10 17 11:12 links.csv.gz
-rw-r--r--@ 1 xxxxxxxxx  staff  156510 10 17 11:12 movies.csv.gz
-rw-r--r--@ 1 xxxxxxxxx  staff  667187 10 17 11:12 ratings.csv.gz
-rw-r--r--@ 1 xxxxxxxxx  staff   13977 10 17 11:12 tags.csv.gz
$
$ wc links.csv 
    9126    9126  183372 links.csv
$ wc movies.csv 
    9126   39127  458390 movies.csv
$ wc ratings.csv 
  100005  100005 2438266 ratings.csv
$ wc tags.csv 
    1297    1887   41902 tags.csv

gzip圧縮を行い、

$ gzip links.csv 
$ gzip movies.csv 
$ gzip ratings.csv 
$ gzip tags.csv

任意のS3バケットにアップロードしておきます。

quicksight-data-preparation-join-tables_00

データ各種をRedshiftのテーブルに格納、整備しておきます。データ投入に用いたテーブル毎のSQL定義文・実行文は以下の通りです。

テーブル:links

# CREATE SCHEMA movielens;
# DROP TABLE IF EXISTS movielens.links_load;
CREATE TABLE IF NOT EXISTS movielens.links_load (
  movie_id INT NOT NULL,
  imdb_id INT,
  tmdb_id INT
);

# COPY movielens.links_load
FROM 's3://zzzzzzzzzzzzz/movielens/links.csv.gz'
CREDENTIALS 'aws_access_key_id=XXXXX;aws_secret_access_key=YYYYY'
IGNOREHEADER 1 CSV gzip;

# ANALYZE COMPRESSION movielens.links_load;
   Table    |  Column  | Encoding | Est_reduction_pct 
------------+----------+----------+-------------------
 links_load | movie_id | delta    | 0.00
 links_load | imdb_id  | lzo      | 3.02
 links_load | tmdb_id  | mostly16 | 0.00
(3 rows)

# DROP TABLE IF EXISTS movielens.links;
# CREATE TABLE IF NOT EXISTS movielens.links (
  movie_id INT encode delta NOT NULL,
  imdb_id INT encode lzo,
  tmdb_id INT encode mostly16,
  PRIMARY KEY(movie_id)
)
DISTSTYLE ALL;

# INSERT INTO movielens.links (SELECT * FROM movielens.links_load);

テーブル:movies

# DROP TABLE IF EXISTS movielens.movies_load;
# CREATE TABLE IF NOT EXISTS movielens.movies_load (
  movie_id INT NOT NULL,
  title VARCHAR(200) NOT NULL,
  genres VARCHAR(200) NOT NULL
);

# COPY movielens.movies_load
FROM 's3://zzzzzzzzzzzzz/movielens/movies.csv.gz'
CREDENTIALS 'aws_access_key_id=XXXXX;aws_secret_access_key=YYYYY'
IGNOREHEADER 1 CSV gzip;

# ANALYZE COMPRESSION movielens.movies_load;
    Table    |  Column  | Encoding | Est_reduction_pct 
-------------+----------+----------+-------------------
 movies_load | movie_id | delta    | 0.00
 movies_load | title    | lzo      | 0.00
 movies_load | genres   | lzo      | 0.00
(3 rows)

# DROP TABLE movielens.movies;
# CREATE TABLE movielens.movies (
  movie_id INT encode delta NOT NULL,
  title VARCHAR(200) encode lzo NOT NULL,
  genres VARCHAR(200) encode lzo NOT NULL,
  PRIMARY KEY(movie_id)
)
DISTSTYLE ALL;

# INSERT INTO movielens.movies (SELECT * FROM movielens.movies_load);

テーブル:ratings

# DROP TABLE IF EXISTS movielens.ratings_load;
# CREATE TABLE IF NOT EXISTS movielens.ratings_load (
  user_id INT NOT NULL,
  movie_id INT NOT NULL,
  rating FLOAT,
  timestamp_bigint BIGINT,
  timestamp TIMESTAMP
);

# COPY movielens.ratings_load (user_id, movie_id, rating,timestamp_bigint)
FROM 's3://zzzzzzzzzzzzz/movielens/ratings.csv.gz'
CREDENTIALS 'aws_access_key_id=XXXXX;aws_secret_access_key=YYYYY'
IGNOREHEADER 1 CSV gzip;

# CREATE OR REPLACE FUNCTION cm_unixts_to_timestamp(ts BIGINT, units CHAR(2))
RETURNS timestamp
STABLE
AS $$
    import pandas
    return pandas.to_datetime(ts, unit=units.rstrip())
$$ LANGUAGE plpythonu;

DROP TABLE IF EXISTS movielens.ratings_tmp;
CREATE TABLE IF NOT EXISTS movielens.ratings_tmp (
  user_id INT NOT NULL,
  movie_id INT NOT NULL,
  rating FLOAT,
  timestamp_bigint BIGINT,
  timestamp TIMESTAMP
);

# INSERT INTO movielens.ratings_tmp (
SELECT
  user_id,
  movie_id,
  rating,
  timestamp_bigint,
  cm_unixts_to_timestamp(timestamp_bigint, 's')
FROM
  movielens.ratings_load);

# ANALYZE COMPRESSION movielens.ratings_tmp;
    Table    |      Column      | Encoding | Est_reduction_pct 
-------------+------------------+----------+-------------------
 ratings_tmp | user_id          | lzo      | 86.50
 ratings_tmp | movie_id         | delta32k | 43.61
 ratings_tmp | rating           | bytedict | 86.15
 ratings_tmp | timestamp_bigint | lzo      | 50.43
 ratings_tmp | timestamp        | lzo      | 15.92
(5 rows)

# DROP TABLE IF EXISTS movielens.ratings;
# CREATE TABLE IF NOT EXISTS movielens.ratings (
  user_id INT encode lzo NOT NULL,
  movie_id INT encode delta32k NOT NULL,
  rating FLOAT encode bytedict,
  timestamp TIMESTAMP,
  PRIMARY KEY(movie_id,timestamp)
)
DISTKEY(movie_id)
SORTKEY(timestamp);

# INSERT INTO movielens.ratings
(SELECT user_id, movie_id, rating, timestamp FROM movielens.ratings_tmp ORDER BY timestamp);

テーブル:tags

DROP TABLE movielens.tags_load;
CREATE TABLE movielens.tags_load (
  user_id INT NOT NULL,
  movie_id INT NOT NULL,
  tags VARCHAR(300),
  timestamp_bigint BIGINT,
  timestamp TIMESTAMP
);
  
# COPY movielens.tags_load (user_id, movie_id,tags,timestamp_bigint)
FROM 's3://zzzzzzzzzzzzz/movielens/tags.csv.gz'
CREDENTIALS 'aws_access_key_id=XXXXX;aws_secret_access_key=YYYYY'
IGNOREHEADER 1 CSV gzip MAXERROR AS 100;

DROP TABLE movielens.tags_tmp;
CREATE TABLE movielens.tags_tmp (
  user_id INT NOT NULL,
  movie_id INT NOT NULL,
  tags VARCHAR(300),
  timestamp_bigint BIGINT,
  timestamp TIMESTAMP
);

INSERT INTO movielens.tags_tmp
(SELECT
  user_id,
  movie_id,
  tags,
  timestamp_bigint,
  cm_unixts_to_timestamp(timestamp_bigint, 's')
FROM
  movielens.tags_load);
  
# ANALYZE COMPRESSION movielens.tags_tmp;
  Table   |      Column      | Encoding | Est_reduction_pct 
----------+------------------+----------+-------------------
 tags_tmp | user_id          | lzo      | 86.56
 tags_tmp | movie_id         | delta32k | 36.01
 tags_tmp | tags             | lzo      | 47.46
 tags_tmp | timestamp_bigint | mostly32 | 47.69
 tags_tmp | timestamp        | lzo      | 10.63
(5 rows)

DROP TABLE movielens.tags;
CREATE TABLE movielens.tags (
  user_id INT encode lzo NOT NULL,
  movie_id INT encode delta32k NOT NULL,
  tags VARCHAR(300) encode lzo,
  timestamp TIMESTAMP,
  PRIMARY KEY(movie_id,timestamp)
)
DISTSTYLE ALL
SORTKEY(timestamp);

# INSERT INTO movielens.tags (SELECT user_id, movie_id, tags, timestamp FROM movielens.tags_tmp ORDER BY timestamp);

整形(ELT)に用いた中間テーブルを削除した上で、データの内容を確認してみます。

# DROP TABLE movielens.links_load;
# DROP TABLE movielens.movies_load;
# DROP TABLE movielens.ratings_load;
# DROP TABLE movielens.ratings_tmp;
# DROP TABLE movielens.tags_load;
# DROP TABLE movielens.tags_tmp;
# SET SEARCH_PATH TO '$user', public, movielens;
SET

# select
  trim(pgdb.datname) as Database,
  trim(pgn.nspname) as Schema,
  trim(a.name) as Table,
  b.mbytes,
  (CAST(b.mbytes as double precision) / 1024) as gbytes,
  to_char(a.rows, '9999,9999,9999,9999') as rows_jp
from (
       select db_id, id, name, sum(rows) as rows
       from stv_tbl_perm a
       group by db_id, id, name
     ) as a
  join pg_class as pgc on pgc.oid = a.id
  join pg_namespace as pgn on pgn.oid = pgc.relnamespace
  join pg_database as pgdb on pgdb.oid = a.db_id
  join (
         select tbl, count(*) as mbytes
         from stv_blocklist
         group by tbl
       ) b on a.id = b.tbl
where
  trim(pgn.nspname) = 'movielens'
order by mbytes desc, a.db_id, a.name;

  database   |  schema   |  table  | mbytes |   gbytes   |       rows_jp        
-------------+-----------+---------+--------+------------+----------------------
 cmawsteamdb | movielens | ratings |     56 |  0.0546875 |              10,0004
 cmawsteamdb | movielens | tags    |     28 | 0.02734375 |                 2592
 cmawsteamdb | movielens | links   |     12 | 0.01171875 |               1,8250
 cmawsteamdb | movielens | movies  |     12 | 0.01171875 |               1,8250
(4 rows)

データ準備機能:テーブル結合の実践

ここからは機能の実践です。データを投入したAmazon Redshiftクラスタを使って接続を確立させ、

quicksight-data-preparation-join-tables_02

対象となるデータが存在するスキーマ及びテーブルをまずは選択します。ここでは一番件数の多い、可視化の軸となるratingsテーブルを選択しました。[Select]押下。

quicksight-data-preparation-join-tables_03

SPICEへの取り込みを行い、[Edit/Preview data]でデータ準備画面に遷移します。

quicksight-data-preparation-join-tables_04

編集画面で[Tables]ペインを選択。初期の状態では選択済みのテーブル(ratings)にチェックが入っているのみとなりますが、ここで別のテーブル(links)のチェックを選択してみます。すると、ratingsとlinksのテーブルが赤丸の記号で連結されました。

quicksight-data-preparation-join-tables_05

上記の状態ではまだ結合された状態とはなっていません。赤丸部分を選択すると以下の様にキーの結合条件設定部分が展開されますので所定の条件で設定を行います。

quicksight-data-preparation-join-tables_06

今回は一通りのデータがmovie_idで結合可能な状態となっていますので、以下の様な条件としました。結合のタイプは

  • Inner(内部結合)
  • Left(左辺外部結合)
  • Right(右辺外部結合)
  • Outer(完全外部結合)

と選べますので任意の結合方法を選択してください。ここではInner(内部結合)を選択しました。

quicksight-data-preparation-join-tables_07

上記と同様の手順で他の2テーブルについても結合を行います。

quicksight-data-preparation-join-tables_08

quicksight-data-preparation-join-tables_09

併せて4つのテーブルが結合された状態となりました。[Save Vizualize]で先に進みます。

quicksight-data-preparation-join-tables_10

ここまで出来れば、後は通常のデータソース同様に、複数のテーブルデータを1つに結合した形(今回は更にそれをSPICEに結合した形)で扱う事が出来るようになります。

quicksight-data-preparation-join-tables_11

まとめ

という訳でAmazon QuickSightのデータ準備機能である『テーブル結合』に関する内容の紹介でした。今回はAmazon Redshiftクラスタのスペックはdc1.large 2ノードという非常にミニマムな形で検証を行っています。サンプルデータには大きめのサイズ(トータル200MB超、ratingsの件数も2000万件超)も存在し、こちらのデータを使ったテーブル結合についても試してはみたのですが件数に対するスペックが貧弱なせいか、処理が1時間で終わらずに(QuickSightからAmazon Redshiftに投げていたクエリ:まずはじめにUNLOADが実行されていました)1時間経過後に強制終了されてしまっていました。この辺り、処理に時間が掛かった場合に制限時間があるのかどうかは定かではありませんが、非常に大きな件数を相手にこの機能を使う場合はクラスタのスペック等も踏まえて一度検証をしてみる必要があるのかも知れません。

件数が多く処理に時間が掛かるという事が前もって判明しているのであれば、Redshift側で(今回QuickSightで行ったテーブル結合のELT処理を)予め済ませておくというのも手だと思います。ちなみに今回扱ったデータ量での操作は瞬殺で終わりました。Redshiftのスペックと扱うデータ量、結合の条件度合いによってこの辺り利用シーンを検討頂けますと幸いです。こちらからは以上です。

参考情報: