こんにちは、CX 事業本部 Delivery 部の若槻です。
AWS Glue は、データの ETL 処理をサーバーレスで簡単に実装できるマネージドサービスです。
今回は、AWS Glue テーブルに対してデータを読み取り/書き込みする Glue ジョブを AWS CDK で作成する方法を確認してみました。
やってみた
Glue ジョブコード
src/glue-job.py
import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import current_timestamp
args = getResolvedOptions(
sys.argv,
[
'JOB_NAME',
'SOURCE_GLUE_TABLE_NAME',
'TARGET_GLUE_TABLE_NAME',
'GLUE_DATABASE_NAME'
]
)
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
timestamp = current_timestamp()
logger = glueContext.get_logger()
logger.info(f"Timestamp={timestamp}")
# データ読み込み
GlueTableData = (
glueContext.create_dynamic_frame.from_catalog(
database=args['GLUE_DATABASE_NAME'],
table_name=args['SOURCE_GLUE_TABLE_NAME'],
transformation_ctx='SourceData',
)
)
# 読み込みデータがない場合はジョブを終了する
if GlueTableData.count() == 0:
logger.info("No data found in the source table. Exiting the job.")
job.commit()
GlueTableDataDF = GlueTableData.toDF()
GlueTableDataDF.show()
# データ加工
GlueTableDataDF = GlueTableDataDF.withColumn('timestamp', timestamp)
GlueTableDataDF.show()
GlueTableData = DynamicFrame.fromDF(
GlueTableDataDF,
glueContext,
'CreatedDynamicFrame'
)
# データ書き込み
glueContext.write_dynamic_frame.from_catalog(
frame=GlueTableData,
database=args['GLUE_DATABASE_NAME'],
table_name=args['TARGET_GLUE_TABLE_NAME'],
transformation_ctx='TargetData',
additional_options={
'enableUpdateCatalog': True,
'partitionKeys': [
'year', 'month', 'day'
]
}
)
# パーティション作成
spark.sql(f"MSCK REPAIR TABLE {args['GLUE_DATABASE_NAME']}.{args['TARGET_GLUE_TABLE_NAME']}")
job.commit()
- Glue ジョブでは PySpark を使用して処理を記述します。
- Glue テーブルからのデータの読み取りは
create_dynamic_frame.from_catalog
を使用します。- 今回は使用していませんが、
push_down_predicate
オプションを使うとパーティションキーで読み取りデータをフィルターできます。
- 今回は使用していませんが、
- Glue テーブルへのデータの書き込みは
write_dynamic_frame.from_catalog
を使用します。 write_dynamic_frame.from_catalog
ではパーティションの自動作成は行われないので、データ書込み後にMSCK REPAIR TABLE
クエリを合わせて実行しています。
CDK コード
lib/cdk-sample-stack.ts
import {
aws_s3,
RemovalPolicy,
Stack,
StackProps,
CfnOutput,
} from 'aws-cdk-lib';
import { Construct } from 'constructs';
import * as glue_alpha from '@aws-cdk/aws-glue-alpha';
export class CdkSampleStack extends Stack {
constructor(scope: Construct, id: string, props: StackProps) {
super(scope, id, props);
// Glue データベース
const glueDatabase = new glue_alpha.Database(this, 'GlueDatabase');
// データソース S3 バケット
const dataSourceBucket = new aws_s3.Bucket(this, 'DataSourceBucket', {
removalPolicy: RemovalPolicy.DESTROY,
autoDeleteObjects: true,
});
// データソース Glue テーブル
const sourceGlueTable = new glue_alpha.S3Table(this, 'SourceGlueTable', {
database: glueDatabase,
bucket: dataSourceBucket,
s3Prefix: 'data/',
dataFormat: glue_alpha.DataFormat.PARQUET,
partitionKeys: [
{
name: 'year',
type: glue_alpha.Schema.STRING,
},
{
name: 'month',
type: glue_alpha.Schema.STRING,
},
{
name: 'day',
type: glue_alpha.Schema.STRING,
},
],
columns: [
{
name: 'id',
type: glue_alpha.Schema.STRING,
},
{
name: 'airTemperature',
type: glue_alpha.Schema.DOUBLE,
},
],
});
// データターゲット S3 バケット
const dataTargetBucket = new aws_s3.Bucket(this, 'DataTargetBucket', {
removalPolicy: RemovalPolicy.DESTROY,
autoDeleteObjects: true,
});
// データターゲット Glue テーブル
const targetGlueTable = new glue_alpha.S3Table(this, 'TargetGlueTable', {
database: glueDatabase,
bucket: dataTargetBucket,
s3Prefix: 'data/',
dataFormat: glue_alpha.DataFormat.PARQUET,
partitionKeys: [
{
name: 'year',
type: glue_alpha.Schema.STRING,
},
{
name: 'month',
type: glue_alpha.Schema.STRING,
},
{
name: 'day',
type: glue_alpha.Schema.STRING,
},
],
columns: [
{
name: 'id',
type: glue_alpha.Schema.STRING,
},
{
name: 'airTemperature',
type: glue_alpha.Schema.DOUBLE,
},
{
name: 'timestamp',
type: glue_alpha.Schema.TIMESTAMP,
},
],
});
// Glue ジョブ
const glueJob = new glue_alpha.Job(this, 'GlueJob', {
executable: glue_alpha.JobExecutable.pythonEtl({
glueVersion: glue_alpha.GlueVersion.V4_0,
pythonVersion: glue_alpha.PythonVersion.THREE,
script: glue_alpha.Code.fromAsset('src/glue-job.py'),
}),
defaultArguments: {
'--job-bookmark-option': 'job-bookmark-disable',
'--enable-glue-datacatalog': 'true',
'--GLUE_DATABASE_NAME': glueDatabase.databaseName,
'--SOURCE_GLUE_TABLE_NAME': sourceGlueTable.tableName,
'--TARGET_GLUE_TABLE_NAME': targetGlueTable.tableName,
},
continuousLogging: { enabled: true },
enableProfilingMetrics: true,
});
// Glue ジョブの権限設定
sourceGlueTable.grantRead(glueJob);
targetGlueTable.grantWrite(glueJob);
// 物理 ID 確認用
new CfnOutput(this, 'GlueDatabseName', {
value: glueDatabase.databaseName,
});
new CfnOutput(this, 'SourceGlueTableName', {
value: sourceGlueTable.tableName,
});
new CfnOutput(this, 'TargetGlueTableName', {
value: targetGlueTable.tableName,
});
new CfnOutput(this, 'GlueJobName', {
value: glueJob.jobName,
});
}
}
- AWS CDK のコードは TypeScript で記述しています。
- AWS Glue の L2 Construct Class は現在は
@aws-cdk/aws-glue-alpha
でのみサポートされているので使用しています。 - 読み取り元および書き込み先の Glue テーブルの Glue ジョブへの権限付与は
grantRead
およびgrantWrite
メソッドが便利です。
動作確認
読み取り元テーブルに Athena でデータを作成します。
INSERT INTO cdksamplestackgluedatabase6a27012d.cdksamplestacksourcegluetable88243330
(year, month, day, id, airtemperature)
VALUES
('2023', '10', '22', 'd001', 10.05),
('2023', '10', '23', 'd001', 5),
('2023', '10', '23', 'd002', 1.1)
データが作成されました。
SELECT * FROM "cdksamplestackgluedatabase6a27012d"."cdksamplestacksourcegluetable88243330"
id | airtemperature | year | month | day |
---|---|---|---|---|
d001 | 10.05 | 2023 | 10 | 22 |
d001 | 5.0 | 2023 | 10 | 23 |
d002 | 1.1 | 2023 | 10 | 23 |
Glue ジョブを実行します。
JobRunId=$(
aws glue start-job-run --job-name ${GLUE_JOB_NAME} \
--query JobRunId \
--output text
)
2,3 分待つと、ジョブ実行が成功しました。
$ aws glue get-job-run --job-name ${GLUE_JOB_NAME} --run-id ${JobRunId} --query 'JobRun.JobRunState'
"SUCCEEDED"
書き込み先テーブルを Athena でクエリすると、Glue ジョブによりデータが書き込まれていることが確認できました。
SELECT * FROM "cdksamplestackgluedatabase6a27012d"."cdksamplestacktargetgluetable88a583f6"
id | airtemperature | timestamp | year | month | day |
---|---|---|---|---|---|
d001 | 5.0 | 2023-10-21 17:53:41.528 | 2023 | 10 | 23 |
d001 | 10.05 | 2023-10-21 17:53:41.528 | 2023 | 10 | 22 |
d002 | 1.1 | 2023-10-21 17:53:41.528 | 2023 | 10 | 23 |
おわりに
AWS Glue テーブルに対してデータを読み取り/書き込みする Glue ジョブを AWS CDK で作成する方法を確認してみました。
よくあるユースケースだと思うので参考になれば幸いです。
参考
以上