
Glue テーブルに対してデータを読み取り/書き込みする Glue ジョブを AWS CDK で作成する
この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
こんにちは、CX 事業本部 Delivery 部の若槻です。
AWS Glue は、データの ETL 処理をサーバーレスで簡単に実装できるマネージドサービスです。
今回は、AWS Glue テーブルに対してデータを読み取り/書き込みする Glue ジョブを AWS CDK で作成する方法を確認してみました。
やってみた
Glue ジョブコード
import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import current_timestamp
args = getResolvedOptions(
sys.argv,
[
'JOB_NAME',
'SOURCE_GLUE_TABLE_NAME',
'TARGET_GLUE_TABLE_NAME',
'GLUE_DATABASE_NAME'
]
)
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
timestamp = current_timestamp()
logger = glueContext.get_logger()
logger.info(f"Timestamp={timestamp}")
# データ読み込み
GlueTableData = (
glueContext.create_dynamic_frame.from_catalog(
database=args['GLUE_DATABASE_NAME'],
table_name=args['SOURCE_GLUE_TABLE_NAME'],
transformation_ctx='SourceData',
)
)
# 読み込みデータがない場合はジョブを終了する
if GlueTableData.count() == 0:
logger.info("No data found in the source table. Exiting the job.")
job.commit()
GlueTableDataDF = GlueTableData.toDF()
GlueTableDataDF.show()
# データ加工
GlueTableDataDF = GlueTableDataDF.withColumn('timestamp', timestamp)
GlueTableDataDF.show()
GlueTableData = DynamicFrame.fromDF(
GlueTableDataDF,
glueContext,
'CreatedDynamicFrame'
)
# データ書き込み
glueContext.write_dynamic_frame.from_catalog(
frame=GlueTableData,
database=args['GLUE_DATABASE_NAME'],
table_name=args['TARGET_GLUE_TABLE_NAME'],
transformation_ctx='TargetData',
additional_options={
'enableUpdateCatalog': True,
'partitionKeys': [
'year', 'month', 'day'
]
}
)
# パーティション作成
spark.sql(f"MSCK REPAIR TABLE {args['GLUE_DATABASE_NAME']}.{args['TARGET_GLUE_TABLE_NAME']}")
job.commit()
- Glue ジョブでは PySpark を使用して処理を記述します。
- Glue テーブルからのデータの読み取りは
create_dynamic_frame.from_catalog
を使用します。- 今回は使用していませんが、
push_down_predicate
オプションを使うとパーティションキーで読み取りデータをフィルターできます。
- 今回は使用していませんが、
- Glue テーブルへのデータの書き込みは
write_dynamic_frame.from_catalog
を使用します。 write_dynamic_frame.from_catalog
ではパーティションの自動作成は行われないので、データ書込み後にMSCK REPAIR TABLE
クエリを合わせて実行しています。
CDK コード
import {
aws_s3,
RemovalPolicy,
Stack,
StackProps,
CfnOutput,
} from 'aws-cdk-lib';
import { Construct } from 'constructs';
import * as glue_alpha from '@aws-cdk/aws-glue-alpha';
export class CdkSampleStack extends Stack {
constructor(scope: Construct, id: string, props: StackProps) {
super(scope, id, props);
// Glue データベース
const glueDatabase = new glue_alpha.Database(this, 'GlueDatabase');
// データソース S3 バケット
const dataSourceBucket = new aws_s3.Bucket(this, 'DataSourceBucket', {
removalPolicy: RemovalPolicy.DESTROY,
autoDeleteObjects: true,
});
// データソース Glue テーブル
const sourceGlueTable = new glue_alpha.S3Table(this, 'SourceGlueTable', {
database: glueDatabase,
bucket: dataSourceBucket,
s3Prefix: 'data/',
dataFormat: glue_alpha.DataFormat.PARQUET,
partitionKeys: [
{
name: 'year',
type: glue_alpha.Schema.STRING,
},
{
name: 'month',
type: glue_alpha.Schema.STRING,
},
{
name: 'day',
type: glue_alpha.Schema.STRING,
},
],
columns: [
{
name: 'id',
type: glue_alpha.Schema.STRING,
},
{
name: 'airTemperature',
type: glue_alpha.Schema.DOUBLE,
},
],
});
// データターゲット S3 バケット
const dataTargetBucket = new aws_s3.Bucket(this, 'DataTargetBucket', {
removalPolicy: RemovalPolicy.DESTROY,
autoDeleteObjects: true,
});
// データターゲット Glue テーブル
const targetGlueTable = new glue_alpha.S3Table(this, 'TargetGlueTable', {
database: glueDatabase,
bucket: dataTargetBucket,
s3Prefix: 'data/',
dataFormat: glue_alpha.DataFormat.PARQUET,
partitionKeys: [
{
name: 'year',
type: glue_alpha.Schema.STRING,
},
{
name: 'month',
type: glue_alpha.Schema.STRING,
},
{
name: 'day',
type: glue_alpha.Schema.STRING,
},
],
columns: [
{
name: 'id',
type: glue_alpha.Schema.STRING,
},
{
name: 'airTemperature',
type: glue_alpha.Schema.DOUBLE,
},
{
name: 'timestamp',
type: glue_alpha.Schema.TIMESTAMP,
},
],
});
// Glue ジョブ
const glueJob = new glue_alpha.Job(this, 'GlueJob', {
executable: glue_alpha.JobExecutable.pythonEtl({
glueVersion: glue_alpha.GlueVersion.V4_0,
pythonVersion: glue_alpha.PythonVersion.THREE,
script: glue_alpha.Code.fromAsset('src/glue-job.py'),
}),
defaultArguments: {
'--job-bookmark-option': 'job-bookmark-disable',
'--enable-glue-datacatalog': 'true',
'--GLUE_DATABASE_NAME': glueDatabase.databaseName,
'--SOURCE_GLUE_TABLE_NAME': sourceGlueTable.tableName,
'--TARGET_GLUE_TABLE_NAME': targetGlueTable.tableName,
},
continuousLogging: { enabled: true },
enableProfilingMetrics: true,
});
// Glue ジョブの権限設定
sourceGlueTable.grantRead(glueJob);
targetGlueTable.grantWrite(glueJob);
// 物理 ID 確認用
new CfnOutput(this, 'GlueDatabseName', {
value: glueDatabase.databaseName,
});
new CfnOutput(this, 'SourceGlueTableName', {
value: sourceGlueTable.tableName,
});
new CfnOutput(this, 'TargetGlueTableName', {
value: targetGlueTable.tableName,
});
new CfnOutput(this, 'GlueJobName', {
value: glueJob.jobName,
});
}
}
- AWS CDK のコードは TypeScript で記述しています。
- AWS Glue の L2 Construct Class は現在は
@aws-cdk/aws-glue-alpha
でのみサポートされているので使用しています。 - 読み取り元および書き込み先の Glue テーブルの Glue ジョブへの権限付与は
grantRead
およびgrantWrite
メソッドが便利です。
動作確認
読み取り元テーブルに Athena でデータを作成します。
INSERT INTO cdksamplestackgluedatabase6a27012d.cdksamplestacksourcegluetable88243330
(year, month, day, id, airtemperature)
VALUES
('2023', '10', '22', 'd001', 10.05),
('2023', '10', '23', 'd001', 5),
('2023', '10', '23', 'd002', 1.1)
データが作成されました。
SELECT * FROM "cdksamplestackgluedatabase6a27012d"."cdksamplestacksourcegluetable88243330"
id | airtemperature | year | month | day |
---|---|---|---|---|
d001 | 10.05 | 2023 | 10 | 22 |
d001 | 5.0 | 2023 | 10 | 23 |
d002 | 1.1 | 2023 | 10 | 23 |
Glue ジョブを実行します。
JobRunId=$(
aws glue start-job-run --job-name ${GLUE_JOB_NAME} \
--query JobRunId \
--output text
)
2,3 分待つと、ジョブ実行が成功しました。
$ aws glue get-job-run --job-name ${GLUE_JOB_NAME} --run-id ${JobRunId} --query 'JobRun.JobRunState'
"SUCCEEDED"
書き込み先テーブルを Athena でクエリすると、Glue ジョブによりデータが書き込まれていることが確認できました。
SELECT * FROM "cdksamplestackgluedatabase6a27012d"."cdksamplestacktargetgluetable88a583f6"
id | airtemperature | timestamp | year | month | day |
---|---|---|---|---|---|
d001 | 5.0 | 2023-10-21 17:53:41.528 | 2023 | 10 | 23 |
d001 | 10.05 | 2023-10-21 17:53:41.528 | 2023 | 10 | 22 |
d002 | 1.1 | 2023-10-21 17:53:41.528 | 2023 | 10 | 23 |
おわりに
AWS Glue テーブルに対してデータを読み取り/書き込みする Glue ジョブを AWS CDK で作成する方法を確認してみました。
よくあるユースケースだと思うので参考になれば幸いです。
参考
以上