Glue テーブルに対してデータを読み取り/書き込みする Glue ジョブを AWS CDK で作成する

2023.10.22

こんにちは、CX 事業本部 Delivery 部の若槻です。

AWS Glue は、データの ETL 処理をサーバーレスで簡単に実装できるマネージドサービスです。

今回は、AWS Glue テーブルに対してデータを読み取り/書き込みする Glue ジョブを AWS CDK で作成する方法を確認してみました。

やってみた

Glue ジョブコード

src/glue-job.py

import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import current_timestamp

args = getResolvedOptions(
  sys.argv,
  [
    'JOB_NAME',
    'SOURCE_GLUE_TABLE_NAME',
    'TARGET_GLUE_TABLE_NAME',
    'GLUE_DATABASE_NAME'
  ]
)

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

timestamp = current_timestamp()

logger = glueContext.get_logger()
logger.info(f"Timestamp={timestamp}")

# データ読み込み
GlueTableData = (
    glueContext.create_dynamic_frame.from_catalog(
        database=args['GLUE_DATABASE_NAME'],
        table_name=args['SOURCE_GLUE_TABLE_NAME'],
        transformation_ctx='SourceData',
    )
)

# 読み込みデータがない場合はジョブを終了する
if GlueTableData.count() == 0:
    logger.info("No data found in the source table. Exiting the job.")
    job.commit()

GlueTableDataDF = GlueTableData.toDF()
GlueTableDataDF.show()

# データ加工
GlueTableDataDF = GlueTableDataDF.withColumn('timestamp', timestamp)
GlueTableDataDF.show()

GlueTableData = DynamicFrame.fromDF(
  GlueTableDataDF,
  glueContext,
  'CreatedDynamicFrame'
)

# データ書き込み
glueContext.write_dynamic_frame.from_catalog(
    frame=GlueTableData,
    database=args['GLUE_DATABASE_NAME'],
    table_name=args['TARGET_GLUE_TABLE_NAME'],
    transformation_ctx='TargetData',
    additional_options={
      'enableUpdateCatalog': True,
      'partitionKeys': [
        'year', 'month', 'day'
      ]
    }
)

# パーティション作成
spark.sql(f"MSCK REPAIR TABLE {args['GLUE_DATABASE_NAME']}.{args['TARGET_GLUE_TABLE_NAME']}")

job.commit()

  • Glue ジョブでは PySpark を使用して処理を記述します。
  • Glue テーブルからのデータの読み取りは create_dynamic_frame.from_catalog を使用します。
    • 今回は使用していませんが、push_down_predicate オプションを使うとパーティションキーで読み取りデータをフィルターできます。
  • Glue テーブルへのデータの書き込みは write_dynamic_frame.from_catalog を使用します。
  • write_dynamic_frame.from_catalog ではパーティションの自動作成は行われないので、データ書込み後に MSCK REPAIR TABLE クエリを合わせて実行しています。

CDK コード

lib/cdk-sample-stack.ts

import {
  aws_s3,
  RemovalPolicy,
  Stack,
  StackProps,
  CfnOutput,
} from 'aws-cdk-lib';
import { Construct } from 'constructs';
import * as glue_alpha from '@aws-cdk/aws-glue-alpha';

export class CdkSampleStack extends Stack {
  constructor(scope: Construct, id: string, props: StackProps) {
    super(scope, id, props);

    // Glue データベース
    const glueDatabase = new glue_alpha.Database(this, 'GlueDatabase');

    // データソース S3 バケット
    const dataSourceBucket = new aws_s3.Bucket(this, 'DataSourceBucket', {
      removalPolicy: RemovalPolicy.DESTROY,
      autoDeleteObjects: true,
    });

    // データソース Glue テーブル
    const sourceGlueTable = new glue_alpha.S3Table(this, 'SourceGlueTable', {
      database: glueDatabase,
      bucket: dataSourceBucket,
      s3Prefix: 'data/',
      dataFormat: glue_alpha.DataFormat.PARQUET,
      partitionKeys: [
        {
          name: 'year',
          type: glue_alpha.Schema.STRING,
        },
        {
          name: 'month',
          type: glue_alpha.Schema.STRING,
        },
        {
          name: 'day',
          type: glue_alpha.Schema.STRING,
        },
      ],
      columns: [
        {
          name: 'id',
          type: glue_alpha.Schema.STRING,
        },
        {
          name: 'airTemperature',
          type: glue_alpha.Schema.DOUBLE,
        },
      ],
    });

    // データターゲット S3 バケット
    const dataTargetBucket = new aws_s3.Bucket(this, 'DataTargetBucket', {
      removalPolicy: RemovalPolicy.DESTROY,
      autoDeleteObjects: true,
    });

    // データターゲット Glue テーブル
    const targetGlueTable = new glue_alpha.S3Table(this, 'TargetGlueTable', {
      database: glueDatabase,
      bucket: dataTargetBucket,
      s3Prefix: 'data/',
      dataFormat: glue_alpha.DataFormat.PARQUET,
      partitionKeys: [
        {
          name: 'year',
          type: glue_alpha.Schema.STRING,
        },
        {
          name: 'month',
          type: glue_alpha.Schema.STRING,
        },
        {
          name: 'day',
          type: glue_alpha.Schema.STRING,
        },
      ],
      columns: [
        {
          name: 'id',
          type: glue_alpha.Schema.STRING,
        },
        {
          name: 'airTemperature',
          type: glue_alpha.Schema.DOUBLE,
        },
        {
          name: 'timestamp',
          type: glue_alpha.Schema.TIMESTAMP,
        },
      ],
    });

    // Glue ジョブ
    const glueJob = new glue_alpha.Job(this, 'GlueJob', {
      executable: glue_alpha.JobExecutable.pythonEtl({
        glueVersion: glue_alpha.GlueVersion.V4_0,
        pythonVersion: glue_alpha.PythonVersion.THREE,
        script: glue_alpha.Code.fromAsset('src/glue-job.py'),
      }),
      defaultArguments: {
        '--job-bookmark-option': 'job-bookmark-disable',
        '--enable-glue-datacatalog': 'true',
        '--GLUE_DATABASE_NAME': glueDatabase.databaseName,
        '--SOURCE_GLUE_TABLE_NAME': sourceGlueTable.tableName,
        '--TARGET_GLUE_TABLE_NAME': targetGlueTable.tableName,
      },
      continuousLogging: { enabled: true },
      enableProfilingMetrics: true,
    });

    // Glue ジョブの権限設定
    sourceGlueTable.grantRead(glueJob);
    targetGlueTable.grantWrite(glueJob);

    // 物理 ID 確認用
    new CfnOutput(this, 'GlueDatabseName', {
      value: glueDatabase.databaseName,
    });
    new CfnOutput(this, 'SourceGlueTableName', {
      value: sourceGlueTable.tableName,
    });
    new CfnOutput(this, 'TargetGlueTableName', {
      value: targetGlueTable.tableName,
    });
    new CfnOutput(this, 'GlueJobName', {
      value: glueJob.jobName,
    });
  }
}

  • AWS CDK のコードは TypeScript で記述しています。
  • AWS Glue の L2 Construct Class は現在は @aws-cdk/aws-glue-alpha でのみサポートされているので使用しています。
  • 読み取り元および書き込み先の Glue テーブルの Glue ジョブへの権限付与は grantRead および grantWrite メソッドが便利です。

動作確認

読み取り元テーブルに Athena でデータを作成します。

INSERT INTO cdksamplestackgluedatabase6a27012d.cdksamplestacksourcegluetable88243330
    (year, month, day, id, airtemperature)
VALUES
    ('2023', '10', '22', 'd001', 10.05),
    ('2023', '10', '23', 'd001', 5),
    ('2023', '10', '23', 'd002', 1.1)

データが作成されました。

SELECT * FROM "cdksamplestackgluedatabase6a27012d"."cdksamplestacksourcegluetable88243330"

id airtemperature year month day
d001 10.05 2023 10 22
d001 5.0 2023 10 23
d002 1.1 2023 10 23

Glue ジョブを実行します。

JobRunId=$(
  aws glue start-job-run --job-name ${GLUE_JOB_NAME} \
    --query JobRunId \
    --output text
)

2,3 分待つと、ジョブ実行が成功しました。

$ aws glue get-job-run --job-name ${GLUE_JOB_NAME} --run-id ${JobRunId} --query 'JobRun.JobRunState'
"SUCCEEDED"

書き込み先テーブルを Athena でクエリすると、Glue ジョブによりデータが書き込まれていることが確認できました。

SELECT * FROM "cdksamplestackgluedatabase6a27012d"."cdksamplestacktargetgluetable88a583f6"

id airtemperature timestamp year month day
d001 5.0 2023-10-21 17:53:41.528 2023 10 23
d001 10.05 2023-10-21 17:53:41.528 2023 10 22
d002 1.1 2023-10-21 17:53:41.528 2023 10 23

おわりに

AWS Glue テーブルに対してデータを読み取り/書き込みする Glue ジョブを AWS CDK で作成する方法を確認してみました。

よくあるユースケースだと思うので参考になれば幸いです。

参考

以上