[新機能] AWS Glue Visual ETL がネイティブ Amazon Redshift 機能をサポートしたので試してみました!

2023.05.03

データアナリティクス事業本部のコンサルティングチームの石川です。AWS Glue Visual ETL がネイティブ Amazon Redshift 機能をサポートってなんだろう?、と最初思ったのですが、re:Invent2022で発表されたAmazon Redshift integration for Apache Sparkのことでした。当時、すぐに試したのですがうまくできなかったので自分にとってはリベンジブログとなります。

新しいネイティブAmazon Redshift機能とは

冒頭でも触れましたが、re:Invent2022で発表されたAmazon Redshift integration for Apache Sparkのことです。よりGlueに限定するならAmazon Redshift integration for AWS Glue Visual ETLという方が適切ではないかと感じます。つまり、従来よりも高機能かつ簡単にRedshiftのデータを入出力できるGlue APIが追加され、さらに追加に伴いAWS Glue StudioからRedshift専用の入力と出力のノードが提供され、GUIでサクッと利用できるようになりました。

従来の方式:Glue data catalog table

従来でもRedshiftのデータを入出力することが可能でしたが、RedshiftへのGlue connectionを作成した後、Glue crawlerでそのGlue テーブルを作成します。Glue テーブルからDyamicFrameを作成する手順が必要でした。

新方式(推奨):Direct data connection

新機能では、RedshiftへのGlue connectionを作成して直接データを入力や出力ができるように手順が簡素化されました。さらにAWS Glue Studioでは、入力と出力に下記の便利なオプションが追加されました。

入力オプション

入力オプションは、Choose a single tableEnter a custom queryです。読み込んだデータは、DyamicFrameとして作成されます。

  • Choose a single table
    • スキーマとテーブルを指定してデータを読み込む
  • Enter a custom query
    • 任意のSELECTステートメントからデータを読み込む

出力オプション

出力オプションは、APPEND (insert) to target tableMERGE data into target table - previewTRUNCATE target tableDROP and recreate target tableです。DyamicFrameを下記のいずれかの方法で、Redshiftのテーブルに書き込みます。

  • APPEND (insert) to target table
    • テーブルにレコードを追加、もしくはキーに基づきレコードをUPSERTする
  • MERGE data into target table - preview
    • 条件に基づいてテーブルのカラムを更新またはレコードを追加する
    • Choose keys and simple actions
    • 突き合わせするキーを指定する
    • Enter custom MERGE statement
    • 任意のMERGEステートメントでデータを書き込む
  • TRUNCATE target table
    • デーブルのデータを削除(TRUNCATE)した後、上記のAPPENDする(いわゆる洗い替え)
  • DROP and recreate target
    • テーブルを削除した後、ソースデータからスキーマを持つテーブルを作成する

Visual ETLでネイティブAmazon Redshift機能を試す!

Glue connectionの作成

AWS Glue StudioのコンソールでGlueからRedshiftへ接続するGlue connectionという接続設定を作成します。

Create connectionのダイアログが表示されます。今回の接続先は、Redshift Serverlessを用意しました。しかし、Connection TypeにAmazon Redshiftを選択するとProvisioned Clusterしか選択できませんでしたので、Connection TypeにJDBCを選択して、マネジメントコンソールに表示されているJDBC URLを入力しています。(補足:Connection TypeにAmazon Redshiftを選択しても、設定はJDBCの設定が自動生成されます。)

Redshift ServerlessをPublic Accessを許可してもGlueからネットワーク接続できなかったため、Network optionsでVPCを経由するように設定しています。

設定したセキュリティグループは、Redshiftのポート(5439)のみの許可では足りず、全てのTCPポートの許可が必要なため、Redshift用の自己参照のセキュリティグループを作成して付与しています。

従来のGlue connectionの設定画面では、接続を検証する機能が存在したのですが、Glue StudioのGlue Connectionの設定画面では、無くなってしまい正直不便です。Legacy Pagesにもアクセスできなくなってしまったので、ぜひ追加してほしいです。

ジョブの作成

AWS Glue Studioのコンソールからblank canvasのジョブを作成します。

ノードを追加する

右上の青い丸をクリックすると、ノード追加用のダイアログが表示されます。Enter valueにredshiftと入力すると、新しいRedshift専用の入力と出力のノードが選択できます。上が入力用、下が出力用です。この2つを追加します。

Enter valueにdupと入力すると、重複レコードを排除するノードが選択できます。今回は、入力と出力の間に重複排除する機能を持ったノードを追加します。

3つのノードを配置するとこのような形で表示されます。以下の流れの処理を作成します。

  • Data source - Amazon Redshift ノード
    • Redshiftから3つのカラムのデータのみ取得する
    • ただし、重複したレコードが存在する
  • Transform - Drop Duplicates ノード
    • 全てのカラムが重複したレコードを排除する
  • Data target - Amazon Redshift ノード
    • ma_listテーブルにデータをMERGEする
    • useridをキーにUPSERTする

Data source - Amazon Redshift ノードの設定

Redshiftからデータを取得して、DynamicFrameを作成します。

Redshift access typeはDirect data connection - recommended)を選択、Redshift sourceはEnter custom queryを選択して、データ取得用のSELECTステートメントを入力します。ここでは意図的に重複レコードを10レコード追加しています。SELECTステートメントを入力もしくは更新した場合、必ず[Infer schema]ボタンを押してください。

Transform - Drop Duplicates ノードの設定

DynamicFrameを入力して重複レコードを削除したDynamicFrameを作成します。(内部的には、SparkのdropDuplicates()を呼んでます。)

Drop DuplicatesはMatch entire rows (Distinct)全てのカラムが重複したレコードを排除します。

Data target - Amazon Redshift ノードの設定

DynamicFrameをRedshiftのテーブルに書き込みます。

Redshift access typeはDirect data connection - recommended)を選択、Handling of data and target tableはMERGE data into target table - previewを選択、Matching keysにuseridを選択します。

自動生成されたスクリプト(Locked)

Redshiftへの出力は、preactionsでTargetテーブルやデータのstagingテーブル(一時保管先のテーブル)を作成しています。postactionsでは、stagingテーブルの内容をTargetテーブルに反映しています。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from awsglue import DynamicFrame
from pyspark.sql import functions as SqlFuncs

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# Script generated for node Amazon Redshift
AmazonRedshift_node1683030838670 = glueContext.create_dynamic_frame.from_options(
    connection_type="redshift",
    connection_options={
        "sampleQuery": "(select userid, username, email from users) union all (select userid, username, email from users limit 10)",
        "redshiftTmpDir": "s3://aws-glue-assets-123456789012-ap-northeast-1/temporary/",
        "useConnectionProperties": "true",
        "connectionName": "blog-pub-redshift",
        "aws_iam_user": "arn:aws:iam::123456789012:role/AWSGlueServiceRoleDefault",
    },
    transformation_ctx="AmazonRedshift_node1683030838670",
)

# Script generated for node Drop Duplicates
DropDuplicates_node1683032382054 = DynamicFrame.fromDF(
    AmazonRedshift_node1683030838670.toDF().dropDuplicates(),
    glueContext,
    "DropDuplicates_node1683032382054",
)

# Script generated for node Amazon Redshift
AmazonRedshift_node1683033046255 = glueContext.write_dynamic_frame.from_options(
    frame=DropDuplicates_node1683032382054,
    connection_type="redshift",
    connection_options={
        "postactions": "BEGIN; MERGE INTO public.ma_list USING public.ma_list_temp_6af7d8 ON ma_list.userid = ma_list_temp_6af7d8.userid WHEN MATCHED THEN UPDATE SET userid = ma_list_temp_6af7d8.userid, username = ma_list_temp_6af7d8.username, email = ma_list_temp_6af7d8.email WHEN NOT MATCHED THEN INSERT VALUES (ma_list_temp_6af7d8.userid, ma_list_temp_6af7d8.username, ma_list_temp_6af7d8.email); DROP TABLE public.ma_list_temp_6af7d8; END;",
        "redshiftTmpDir": "s3://aws-glue-assets-123456789012-ap-northeast-1/temporary/",
        "useConnectionProperties": "true",
        "dbtable": "public.ma_list_temp_6af7d8",
        "connectionName": "blog-pub-redshift",
        "preactions": "CREATE TABLE IF NOT EXISTS public.ma_list (userid INTEGER, username VARCHAR, email VARCHAR); DROP TABLE IF EXISTS public.ma_list_temp_6af7d8; CREATE TABLE public.ma_list_temp_6af7d8 (userid INTEGER, username VARCHAR, email VARCHAR);",
        "aws_iam_user": "arn:aws:iam::123456789012:role/AWSGlueServiceRoleDefault",
    },
    transformation_ctx="AmazonRedshift_node1683033046255",
)

job.commit()

ジョブの実行

DPUはG.1Xが2つ、Glue version 3.0にて実行、正常終了しました。

入力のレコード

Data source - Amazon Redshift の入力テーブルであるusersテーブルのレコード数は49990レコードです。重複した10レコードを追加して50000レコードをDrop Duplicatesノードに渡しています。

出力のレコード

Data target - Amazon Redshift の出力テーブルであるma_listテーブルのレコード数は49990レコードです。重複した10レコードはDrop Duplicatesノードで削除されたことが確認できます。

Redshift上のクエリの確認

では、Redshiftにどのようなクエリが発生したのかをsys_query_historyシステムテーブルから確認します。

  • Glueのスクリプト上には存在しないUNLOADステートメントにsampleQueryで渡したデータ取得用のSELECTステートメントが利用され、データファイルとマニフェストファイルがS3に出力される
    • S3上のファイルのデータの重複排除はGlueのDynamicFrame上で変換した後、変換済みCSVデータはS3に保存される
  • Targetテーブル(ma_list)の(CREATE TEMP TABLEではない)一時テーブルを作成して、COPYコマンドで変換済みデータをロードする
    • 一時テーブルの文字列カラムはVARCHAR(MAX)で作成した後VARCHARで作り直したりしており、最大データ長なども加味してTargetテーブルを作成している可能性がある
  • Handling of data and target tableに基づき、一時テーブルのレコードをTargetテーブルに反映する
    • 下記の例では、MERGEステートメントでUPSERTしている
    • 一時テーブルも削除する
No query_text
1 UNLOAD ('SELECT "userid", "username", "email" FROM ((select userid, username, email from users) union all (select userid, username, email from users limit 10)) ') TO 's3://aws-glue-assets-123456789012-ap-northeast-1/temporary/7373547e-f18e-43a2-8420-2da481d2f9b1/' WITH CREDENTIALS '' ESCAPE MANIFEST
2 SELECT character_value, version() FROM INFORMATION_SCHEMA.SQL_IMPLEMENTATION_INFO WHERE implementation_info_id = '17' or implementation_info_id = '18'
3 SET application_name = 'JDBC-1.2.12.1017'
4 SELECT supported_value FROM INFORMATION_SCHEMA.SQL_SIZING WHERE sizing_id = 34 or sizing_id = 30 or sizing_id = 31 or sizing_id = 10005 or sizing_id = 32 or sizing_id = 35 or sizing_id = 107 or sizing_id = 97 or sizing_id = 99 or sizing_id = 100 or sizing_id = 101
5 BEGIN;
6 CREATE TABLE IF NOT EXISTS public.ma_list_temp_6af7d8 (userid INTEGER, username VARCHAR(MAX), email VARCHAR(MAX)) DISTSTYLE EVEN
7 CREATE TABLE IF NOT EXISTS public.ma_list (userid INTEGER, username VARCHAR, email VARCHAR)
8 DROP TABLE IF EXISTS public.ma_list_temp_6af7d8
9 CREATE TABLE public.ma_list_temp_6af7d8 (userid INTEGER, username VARCHAR, email VARCHAR)
10 COPY public.ma_list_temp_6af7d8 FROM 's3://aws-glue-assets-123456789012-ap-northeast-1/temporary/7d4f2d00-600a-4ffd-98e4-0a94f850a626/manifest.json' CREDENTIALS '' FORMAT AS CSV NULL AS '@NULL@' manifest
11 BEGIN
12 MERGE INTO public.ma_list USING public.ma_list_temp_6af7d8 ON ma_list.userid = ma_list_temp_6af7d8.userid WHEN MATCHED THEN UPDATE SET userid = ma_list_temp_6af7d8.userid, username = ma_list_temp_6af7d8.username, email = ma_list_temp_6af7d8.email WHEN NOT MATCHED THEN INSERT VALUES (ma_list_temp_6af7d8.userid, ma_list_temp_6af7d8.username, ma_list_temp_6af7d8.email)
13 DROP TABLE public.ma_list_temp_6af7d8
14 END
15 BEGIN;
16 COMMIT;

最後に

AWS Glueに対して、Sparkの大規模並列分散処理基盤というイメージがあり、ETLの中でもTransformを意識する場面が多くありました。しかし、直近のre:Invent2022で発表されたGlue Data Qualityでデータ検証、Sparkによる高度なデータ変換(Visual Transformの追加、Custom Transform拡張)、最後にGlueのSpark統合によるRedshiftへロード(Append、Upsert、洗い替えなど)をGUIのみで実現できるようになりました。

今回のアップデートでGlue StudioによるビジュアルETLパイプラインのストーリーがイメージできるようになり、これまでPySparkやScalaのコードを書くことを敬遠していたお客様にもGUIでETLをご利用いただける場面が増えたのではないかと手応えを感じました。

合わせて読みたい