ちょっと話題の記事

[アップデート] 1行たりともコードは書かない!AWS GlueでストリーミングETLが可能になりました

ストリーミングデータの ETL も AWS Glue にまるっとお任せしてしまいましょう!
2020.04.30

先日のアップデートで AWS Glue がストリーミング ETL をサポートするようになりました!

なにが嬉しいのか

まず、AWS Glue?ナニソレ?という方は、是非、以下の再入門ブログをお読みください。

AWS Glue はフルマネージドでサーバーレスな ETL(抽出/変換/ロード)サービスですが、これまでデータソースとしては S3、RDS、Redshift・・・etc といったデータストレージのみが対象でした。そのため、Kinesis Data Streams や Apache Kafka といったストリーミングデータに対して Glue を組み込んだ ETL 処理は出来ませんでした。

従来であれば、例えば以下のように Lambda や Kinesis Data Firehose と Glue カタログを連携して Parquet 変換したり

以下のように Kinesis Data Analytics を介して Apache Flink でストリーミング ETL を行う必要がありました。

今回のアップデートが刺さりそうなド本命だと、EMR を使った Spark Streaming ですね。

いずれにおいても、Lambda、Apache Flink、Spark Streaming の部分はユーザー側でコードを書く必要がありましたが、Glue がストリーミング ETL をサポートしたことでユーザーが 1 行たりともコードを書くことなく実装できるようになりました。こんなイメージですね。

ETL 処理をまるっとマネージドサービスに預けることで、ユーザーは本来の目的である分析処理のほうに注力できるようになります。

料金

Apache Spark タイプの ETL ジョブの場合、「DPU 時間あたり $0.44、毎秒課金(最小時間は 10 分」の課金が発生します。(最新の料金情報が公式ページを参照ください)

ストリーミング処理であることを考慮すると ETL ジョブは常時稼働しているかと思いますので、それなりにコストが掛かる点はご注意ください。基本的には Kinesis Data Analytics や EMR(Spark Streaming)の置き換え検討になるかと思います。コストを考慮すると、処理が回るのであれば依然として Lambda による ETL も検討いただくのが良いでしょう。

やってみる

それではいつものとおり、さっそくやってみましょう。

今回は Amazon Kinesis Data Streams をストリーミングソースとして検証しました。サンプルのデータストリームは Kinesis Data Analytics で用意されている株価のティッカー情報を使用しました。以下のようなデータが流れてきます。

{"TICKER_SYMBOL":"CVB","SECTOR":"TECHNOLOGY","CHANGE":0.88,"PRICE":53.7},
{"TICKER_SYMBOL":"WMT","SECTOR":"RETAIL","CHANGE":3.48,"PRICE":73.99},
{"TICKER_SYMBOL":"PPL","SECTOR":"HEALTHCARE","CHANGE":1.95,"PRICE":32.58},
{"TICKER_SYMBOL":"DEG","SECTOR":"ENERGY","CHANGE":-0.18,"PRICE":5.63},
{"TICKER_SYMBOL":"AZL","SECTOR":"HEALTHCARE","CHANGE":0.51,"PRICE":17.52},
{"TICKER_SYMBOL":"QXZ","SECTOR":"FINANCIAL","CHANGE":2.04,"PRICE":221.5},
{"TICKER_SYMBOL":"DFG","SECTOR":"TECHNOLOGY","CHANGE":1.21,"PRICE":135.25},
{"TICKER_SYMBOL":"KIN","SECTOR":"ENERGY","CHANGE":0.32,"PRICE":5.37},
{"TICKER_SYMBOL":"VVS","SECTOR":"ENERGY","CHANGE":-0.49,"PRICE":14.78},
{"TICKER_SYMBOL":"WSB","SECTOR":"RETAIL","CHANGE":0.39,"PRICE":9.13}

ストリーミングソースのデータカタログテーブル作成

AWS Glue コンソールを開き、データベースがない場合は[データベースの追加]をクリックし、新規にデータベースを作成します。

次に、[テーブルの追加]-[手動でのテーブル追加]をクリックします。

任意のテーブル名を入力し、データベースを指定します。

データストアの追加に Kinesis および Kafka が指定できるようになっています。今回は Kinesis を選択し、Stream name および Kinesis source URL を入力します。Kinesis source URL はリージョン毎のエンドポイント URL を入力します。

データ形式は JSON を指定します。

ストリーミングデータのスキーマを定義します。ここで列を指定しない場合、ストリームの処理時に無視されます。今回は指定していませんが必要に応じてパーティションキーも指定できます。

定義内容を確認し、完了をクリックします。

ETL ジョブ作成

次に、ETL ジョブを作成します。事前に Glue ジョブに割り当てる IAM ロールを作成しておきます。IAMロールには AWSGlueServiceRole および AWSGlueServiceRole の IAM ポリシーをアタッチしています。

[ジョブの追加]をクリック。

任意の名前を入力、IAM ロールを指定します。タイプは Spark Streaming を選択し、次へ。

データソースには先程作成したテーブルを選択します。

データターゲットを選択します。今回は Parquet 形式に変換したものを S3 で受け取るため、データターゲットでテーブルを作成する を指定しました。ターゲットパスの S3 バケットには aws-glue- で始まるバケットを指定していますが、これは AWSGlueServiceRole で許可されているバケット名に従ってください。

ソースストリーム列とターゲット列のマッピングをします。列を入れ替えたり、列名や型を変更したり、不用な列を削除したり、コードを書くことなく、提案されたスクリプトを使用してレコードを取り込むことができます。

ジョブを保存してスクリプトを編集する をクリックすると、以下のように自動生成されたスクリプトが確認できます。

[ジョブの実行]をクリックし、しばらくするとターゲットで指定した S3 バケットに Parquet ファイルが出力されはじめます。

動作確認

以下のような Glue クローラを作成し、Amazon Athena でクエリしてみましょう。クローラ構成では checkpoint を除外するように checkpoint/** と指定しています。

Amazon Athena を開き Glue で作成されたデータベースとテーブルを指定し、テーブルのプレビューを実行します。

ターゲット列のマッピングで指定したように列の入れ替えや、change 列の削除が正常に動作していることが確認できましたね!簡単ではありますが、検証は以上です!

さいごに

これまで Amazon Kinesis Data Streams などのストリーミングデータソースを ETL 処理するには、自前で Lambda 関数を書いたり、Amazon Kinesis Data Analytics を介して Apache Flink を利用する方法がありましたが、いずれもコードを準備する必要がありました。

AWS Glue がストリーミング ETL に対応したことでコードを 1行たりとも書くことなく、ストリーミング ETL を実装することが可能となりましたね!

また、今回は Glue より提案されたスクリプトをそのまま受け入れて利用していますが、提案されたスクリプトをベースにユーザー側でカスタマイズすることも可能です。必要に応じて使い分けください!

以上!大阪オフィスの丸毛(@marumo1981)でした!