Glue JobでData CatalogのデータをETL変換してS3に出力する方法
はじめに
かつまたです。本ブログは以下ブログでGlue Data Catalogに登録したCSVファイルのデータに対して、Glue jobを使用してデータ変換を行い、S3に出力する手順をご紹介します。
やってみた
ジョブ作成
-
「ETL jobs」→「Visual ETL」を選択し、新規ジョブを作成します。 「Job details」タブにおいて、Job名を指定し、IAMロールには前回ブログでGlueの実行ロールとして作成した 「AWSGlueServiceRole-katsumata」を指定します。
-
今回はスクリプトを直接記述して処理方法を指定しました。
Scriptタブに以下のコードを記述して「Save」を押下します。
Python例
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
# Glue contextの初期化
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
# データソースの設定
datasource = glueContext.create_dynamic_frame.from_catalog(
database="katsumata_s3_subject", # クローラーで指定したデータベース名
table_name="cm_katsumata" # クローラが作成したテーブル名
)
# データの表示
print("元のデータ:")
datasource.show()
# 年齢が25以上のレコードをフィルタリング
filtered_data = Filter.apply(
frame=datasource,
f=lambda x: x["age"] >= 25
)
# フィルタリング結果の表示
print("フィルタリング後のデータ:")
filtered_data.show()
# 結果をS3に書き出し
output_path = "s3://cm-katsumata/" # 保存先S3バケット名
glueContext.write_dynamic_frame.from_options(
frame=filtered_data,
connection_type="s3",
connection_options={"path": output_path},
format="csv"
)
print("処理が完了しました!")
ジョブ実行
-
Jobが作成完了したら、「Run」からjobを実行します。
-
「Runs」タブからJobの実行ステータスが「Succeeded」となっていることを確認します。
結果確認(s3への出力とCloudwatch Logs)
S3への出力とCloudwatch Logsへの実行結果から実行を確認してみます。
- 「Runs」タブの「Run details」でのCloudwatch LogsのOutput Logsからログ結果を確認します。
スクリプトで記述した通りに正常に実行されていることが確認できました。
- 保存先のS3バケットを確認します。結果のオブジェクトが作成されていることが確認できます。
スクリプト通り、ageが25以上の要素が抽出されていることを確認できました。
おわりに
ご覧いただきありがとうございました。AWS Glueを使用することで、簡単にETLジョブを作成し、データ変換処理を実行できることが確認できました。Visual ETLを使用すれば、コードを自身で記述せずにGUIベースでジョブを構築することも可能です。
アノテーション株式会社について
アノテーション株式会社はクラスメソッドグループのオペレーション専門特化企業です。サポート・運用・開発保守・情シス・バックオフィスの専門チームが、最新 IT テクノロジー、高い技術力、蓄積されたノウハウをフル活用し、お客様の課題解決を行っています。当社は様々な職種でメンバーを募集しています。「オペレーション・エクセレンス」と「らしく働く、らしく生きる」を共に実現するカルチャー・しくみ・働き方にご興味がある方は、アノテーション株式会社 採用サイトをぜひご覧ください。