DataAPIでRedshiftにJSONデータを格納して、Tableauで可視化してみた

2021.06.15

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

データアナリティクス事業本部の鈴木です。

先日、RedshiftにJSONを格納し、Tableauで可視化する記事を公開しました。

今度はLambdaからData API for Redshift(以降「DataAPI」)を使ってJSONを含んだレコードを同じテーブルに追加し、Tableauで可視化してみました。

前回記事はこちらになります。

RedshiftにJSONを格納してTableauで可視化してみた | DevelopersIO

今回の記事では、SUPER型のカラムが存在するテーブルがあったとして、DataAPIからレコードを追加できるのかに関心がありました。また、変更後のデータをTableauで可視化するために必要な項目も併せて確認しました。

環境準備

まず、前回記事で作成したRedshiftのリソースについて振り返ります。

テーブル

irisというテーブルを作成しました。sampleidというカラムにデータのIDを、iris_jsonというSUPER型のカラムにJSONを格納しました。iris_jsonは、具体的にはscikit-learnのIris Dataset(いわゆるアイリスデータ)をJSONに変換したものでした。

CREATE TABLE iris(
      sampleid  INT   NOT NULL,
      iris_json SUPER
);

マテリアライズドビュー

Tableauからの参照用に、iris_materialized_viewというマテリアライズドビューを作成していました。TableauがJSONデータを読み出せるように、マテリアライズドビューの定義内でドット記法を使ってJSONの属性を展開しました。

CREATE MATERIALIZED VIEW iris_materialized_view AS
    SELECT 
      sampleid,
      iris_json.sepal_length,
      iris_json.sepal_width,
      iris_json.petal_length,
      iris_json.petal_width,
      iris_json.species
      FROM iris;

続いて、今回の検証の構成を説明します。

検証した構成

今回は、RedshiftがあるAWSアカウントとは違うAWSアカウントのLambdaから、DataAPIを使ってデータを追加しました。構成は以下のようなイメージです。

検証時の構成

Lambdaは以下の2つを作成しました。

  • データを挿入するLambda
  • マテリアライズドビューをリフレッシュするLambda

実際には、2つのLambdaをAWS Step Functionsなどで実行するような想定です。

また、DataAPIにアクセスするためのIAMロールを準備しておきます。今回はクロスアカウントで行うので、以下の2つのロールを用意します。

  • LambdaにDataAPIへのアクセスを許可する、Redshiftアカウント側のIAMロール。
  • 上記IAMロールへのAssumeRoleを許可する、Lambdaアカウント側のIAMロール。

具体的な設定手順は、弊社平野の記事をご参照ください。

DataAPIで楽々、Lambda関数から別アカウントのRedshiftにクエリを投げてみた。 | DevelopersIO

やってみる

データの挿入

まず、1つ目のLambda関数を作成します。以下のコードをデプロイします。

コードは上記記事で紹介されたコードを参考にしています。今回はJSONを含んだレコードを新しく挿入したいので、SQL文を対応したものに変更しておきます。

# https://dev.classmethod.jp/articles/redshift-datapia-from-lambda-crossaccount/を参考にしました。

import json
import time
import boto3
from boto3.session import Session

# Redshift接続情報
CLUSTER_NAME='cluster_name'
DATABASE_NAME='sample_db'
DB_USER='dbuser'
ROLE_ARN='arn:aws:iam::XXXXXXXXXXXX:role/AllowRedshiftDataApiCrossAccount'

# 実行するSQLを設定
sql = '''
    INSERT INTO iris VALUES
    (151, JSON_PARSE('{"sepal_length":10.1,"sepal_width":7.5,"petal_length":2.4,"petal_width":1.2,"species":"dummy"}'));
'''

def create_session():
    sts_client = boto3.client('sts', endpoint_url='https://sts.ap-northeast-1.amazonaws.com')
    credentials = sts_client.assume_role(RoleArn=ROLE_ARN,
                                             RoleSessionName='RedsfhitDataAPI')
    accesskey = credentials['Credentials']['AccessKeyId']
    secretkey = credentials['Credentials']['SecretAccessKey']
    session_token = credentials['Credentials']['SessionToken']
    session = Session(aws_access_key_id=accesskey,
                      aws_secret_access_key=secretkey,
                      aws_session_token=session_token)
    return session


def lambda_handler(event, context):
    # Redshiftにクエリを投げる。非同期なのですぐ返ってくる
    session = create_session()
    data_client = session.client('redshift-data')
    result = data_client.execute_statement(
        ClusterIdentifier=CLUSTER_NAME,
        Database=DATABASE_NAME,
        DbUser=DB_USER,
        Sql=sql,
    )

    # 実行IDを取得
    id = result['Id']

    # クエリが終わるのを待つ
    statement = ''
    status = ''
    while status != 'FINISHED' and status != 'FAILED' and status != 'ABORTED':
        statement = data_client.describe_statement(Id=id)
        status = statement['Status']
        time.sleep(1)

    # 結果の表示
    if status == 'FINISHED':
        if int(statement['ResultSize']) > 0:
            # select文等なら戻り値を表示
            statement = data_client.get_statement_result(Id=id)
            print(json.dumps(statement['Records']))
        else:
            # 戻り値がないものはFINISHだけ出力して終わり
            print('QUERY FINSHED')
    elif status == 'FAILED':
        # 失敗時
        print('QUERY FAILED\n{}'.format(statement))
    elif status == 'ABORTED':
        # ユーザによる停止時
        print('QUERY ABORTED: The query run was stopped by the user.')

SQL文で挿入したデータは、結果が分かりやすいよう、以下のように"species"キーのバリューが"dummy"であるJSONを用意しました。

{
    "sepal_length":10.1,
    "sepal_width":7.5,
    "petal_length":2.4,
    "petal_width":1.2,
    "species":"dummy"
}

半構造化データのRedshiftへのロードには「INSERTまたはUPDATEを使用する方法」と「COPYを使用する方法」がありますが、今回は簡単のため、前者で行いました。

Lambdaを実行し、Database Managerからテーブルを参照すると、IDが151のデータが追加されていることが確認できました。

Data APIからJSONを挿入した結果

マテリアライズドビューのリフレッシュ

続いて2つ目のLambdaを作成します。この関数はマテリアライズドビューのリフレッシュを行います。マテリアライズドビューは、ソーステーブルの変更を反映させるために、リフレッシュする必要があります。

1つ目のLambdaにデプロイしたコードと同じものを用意し、SQL文の内容を以下のように修正します。修正したコードを2つ目のLambdaにデプロイします。

(略)

# 実行するSQLを設定
sql = '''
    REFRESH MATERIALIZED VIEW iris_materialized_view;
'''

(略)

Lambdaを実行すると、マテリアライズドビューにも新しく追加したレコードが反映されました。 リフレッシュ後のマテリアライズドビュー

Tableauでの可視化

最後に、新しく追加したレコードを含めて、Tableauで可視化してみます。

Tableau Desktopを起動し、Redshiftのiris_materialized_viewマテリアライズドビューに接続します。 マテリアライズドビューに接続する

iris_materialized_viewからデータを読み取って、散布図を描いてみます。

151番目のデータとして、○のマーカーが表示されており、Lambdaから投入したデータが可視化できていることが分かりました。

データ投入後の可視化

まとめ

DataAPIから、SUPER型のカラムにJSONを格納することができました。また、マテリアライズドビューをリフレッシュすることで、Tableauから追加したデータを参照できました。

今回はかなり簡易的な構成で行っていて、実際には、

  • どのくらいまとめてLambdaからデータを投入するか
  • どのくらいの頻度でマテリアライズドビューをリフレッシュするか

など、まだまだ検討することがたくさんありそうでした。

以上、同じような検討をされている方の参考になれば幸いです。