この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
データアナリティクス事業本部の鈴木です。
先日、RedshiftにJSONを格納し、Tableauで可視化する記事を公開しました。
今度はLambdaからData API for Redshift(以降「DataAPI」)を使ってJSONを含んだレコードを同じテーブルに追加し、Tableauで可視化してみました。
前回記事はこちらになります。
今回の記事では、SUPER型のカラムが存在するテーブルがあったとして、DataAPIからレコードを追加できるのかに関心がありました。また、変更後のデータをTableauで可視化するために必要な項目も併せて確認しました。
環境準備
まず、前回記事で作成したRedshiftのリソースについて振り返ります。
テーブル
irisというテーブルを作成しました。sampleidというカラムにデータのIDを、iris_jsonというSUPER型のカラムにJSONを格納しました。iris_jsonは、具体的にはscikit-learnのIris Dataset(いわゆるアイリスデータ)をJSONに変換したものでした。
CREATE TABLE iris(
sampleid INT NOT NULL,
iris_json SUPER
);
マテリアライズドビュー
Tableauからの参照用に、iris_materialized_viewというマテリアライズドビューを作成していました。TableauがJSONデータを読み出せるように、マテリアライズドビューの定義内でドット記法を使ってJSONの属性を展開しました。
CREATE MATERIALIZED VIEW iris_materialized_view AS
SELECT
sampleid,
iris_json.sepal_length,
iris_json.sepal_width,
iris_json.petal_length,
iris_json.petal_width,
iris_json.species
FROM iris;
続いて、今回の検証の構成を説明します。
検証した構成
今回は、RedshiftがあるAWSアカウントとは違うAWSアカウントのLambdaから、DataAPIを使ってデータを追加しました。構成は以下のようなイメージです。
Lambdaは以下の2つを作成しました。
- データを挿入するLambda
- マテリアライズドビューをリフレッシュするLambda
実際には、2つのLambdaをAWS Step Functionsなどで実行するような想定です。
また、DataAPIにアクセスするためのIAMロールを準備しておきます。今回はクロスアカウントで行うので、以下の2つのロールを用意します。
- LambdaにDataAPIへのアクセスを許可する、Redshiftアカウント側のIAMロール。
- 上記IAMロールへのAssumeRoleを許可する、Lambdaアカウント側のIAMロール。
具体的な設定手順は、弊社平野の記事をご参照ください。
DataAPIで楽々、Lambda関数から別アカウントのRedshiftにクエリを投げてみた。 | DevelopersIO
やってみる
データの挿入
まず、1つ目のLambda関数を作成します。以下のコードをデプロイします。
コードは上記記事で紹介されたコードを参考にしています。今回はJSONを含んだレコードを新しく挿入したいので、SQL文を対応したものに変更しておきます。
# https://dev.classmethod.jp/articles/redshift-datapia-from-lambda-crossaccount/を参考にしました。
import json
import time
import boto3
from boto3.session import Session
# Redshift接続情報
CLUSTER_NAME='cluster_name'
DATABASE_NAME='sample_db'
DB_USER='dbuser'
ROLE_ARN='arn:aws:iam::XXXXXXXXXXXX:role/AllowRedshiftDataApiCrossAccount'
# 実行するSQLを設定
sql = '''
INSERT INTO iris VALUES
(151, JSON_PARSE('{"sepal_length":10.1,"sepal_width":7.5,"petal_length":2.4,"petal_width":1.2,"species":"dummy"}'));
'''
def create_session():
sts_client = boto3.client('sts', endpoint_url='https://sts.ap-northeast-1.amazonaws.com')
credentials = sts_client.assume_role(RoleArn=ROLE_ARN,
RoleSessionName='RedsfhitDataAPI')
accesskey = credentials['Credentials']['AccessKeyId']
secretkey = credentials['Credentials']['SecretAccessKey']
session_token = credentials['Credentials']['SessionToken']
session = Session(aws_access_key_id=accesskey,
aws_secret_access_key=secretkey,
aws_session_token=session_token)
return session
def lambda_handler(event, context):
# Redshiftにクエリを投げる。非同期なのですぐ返ってくる
session = create_session()
data_client = session.client('redshift-data')
result = data_client.execute_statement(
ClusterIdentifier=CLUSTER_NAME,
Database=DATABASE_NAME,
DbUser=DB_USER,
Sql=sql,
)
# 実行IDを取得
id = result['Id']
# クエリが終わるのを待つ
statement = ''
status = ''
while status != 'FINISHED' and status != 'FAILED' and status != 'ABORTED':
statement = data_client.describe_statement(Id=id)
status = statement['Status']
time.sleep(1)
# 結果の表示
if status == 'FINISHED':
if int(statement['ResultSize']) > 0:
# select文等なら戻り値を表示
statement = data_client.get_statement_result(Id=id)
print(json.dumps(statement['Records']))
else:
# 戻り値がないものはFINISHだけ出力して終わり
print('QUERY FINSHED')
elif status == 'FAILED':
# 失敗時
print('QUERY FAILED\n{}'.format(statement))
elif status == 'ABORTED':
# ユーザによる停止時
print('QUERY ABORTED: The query run was stopped by the user.')
SQL文で挿入したデータは、結果が分かりやすいよう、以下のように"species"キーのバリューが"dummy"であるJSONを用意しました。
{
"sepal_length":10.1,
"sepal_width":7.5,
"petal_length":2.4,
"petal_width":1.2,
"species":"dummy"
}
半構造化データのRedshiftへのロードには「INSERTまたはUPDATEを使用する方法」と「COPYを使用する方法」がありますが、今回は簡単のため、前者で行いました。
Lambdaを実行し、Database Managerからテーブルを参照すると、IDが151のデータが追加されていることが確認できました。
マテリアライズドビューのリフレッシュ
続いて2つ目のLambdaを作成します。この関数はマテリアライズドビューのリフレッシュを行います。マテリアライズドビューは、ソーステーブルの変更を反映させるために、リフレッシュする必要があります。
1つ目のLambdaにデプロイしたコードと同じものを用意し、SQL文の内容を以下のように修正します。修正したコードを2つ目のLambdaにデプロイします。
(略)
# 実行するSQLを設定
sql = '''
REFRESH MATERIALIZED VIEW iris_materialized_view;
'''
(略)
Lambdaを実行すると、マテリアライズドビューにも新しく追加したレコードが反映されました。
Tableauでの可視化
最後に、新しく追加したレコードを含めて、Tableauで可視化してみます。
Tableau Desktopを起動し、Redshiftのiris_materialized_viewマテリアライズドビューに接続します。
iris_materialized_viewからデータを読み取って、散布図を描いてみます。
151番目のデータとして、○のマーカーが表示されており、Lambdaから投入したデータが可視化できていることが分かりました。
まとめ
DataAPIから、SUPER型のカラムにJSONを格納することができました。また、マテリアライズドビューをリフレッシュすることで、Tableauから追加したデータを参照できました。
今回はかなり簡易的な構成で行っていて、実際には、
- どのくらいまとめてLambdaからデータを投入するか
- どのくらいの頻度でマテリアライズドビューをリフレッシュするか
など、まだまだ検討することがたくさんありそうでした。
以上、同じような検討をされている方の参考になれば幸いです。