Glue テーブルに対してデータを読み取り/書き込みする Glue ジョブを AWS CDK で作成する

Glue テーブルに対してデータを読み取り/書き込みする Glue ジョブを AWS CDK で作成する

Clock Icon2023.10.21 18:23

こんにちは、CX 事業本部 Delivery 部の若槻です。

AWS Glue は、データの ETL 処理をサーバーレスで簡単に実装できるマネージドサービスです。

今回は、AWS Glue テーブルに対してデータを読み取り/書き込みする Glue ジョブを AWS CDK で作成する方法を確認してみました。

やってみた

Glue ジョブコード

import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import current_timestamp

args = getResolvedOptions(
  sys.argv,
  [
    'JOB_NAME',
    'SOURCE_GLUE_TABLE_NAME',
    'TARGET_GLUE_TABLE_NAME',
    'GLUE_DATABASE_NAME'
  ]
)

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

timestamp = current_timestamp()

logger = glueContext.get_logger()
logger.info(f"Timestamp={timestamp}")

# データ読み込み
GlueTableData = (
    glueContext.create_dynamic_frame.from_catalog(
        database=args['GLUE_DATABASE_NAME'],
        table_name=args['SOURCE_GLUE_TABLE_NAME'],
        transformation_ctx='SourceData',
    )
)

# 読み込みデータがない場合はジョブを終了する
if GlueTableData.count() == 0:
    logger.info("No data found in the source table. Exiting the job.")
    job.commit()

GlueTableDataDF = GlueTableData.toDF()
GlueTableDataDF.show()

# データ加工
GlueTableDataDF = GlueTableDataDF.withColumn('timestamp', timestamp)
GlueTableDataDF.show()

GlueTableData = DynamicFrame.fromDF(
  GlueTableDataDF,
  glueContext,
  'CreatedDynamicFrame'
)

# データ書き込み
glueContext.write_dynamic_frame.from_catalog(
    frame=GlueTableData,
    database=args['GLUE_DATABASE_NAME'],
    table_name=args['TARGET_GLUE_TABLE_NAME'],
    transformation_ctx='TargetData',
    additional_options={
      'enableUpdateCatalog': True,
      'partitionKeys': [
        'year', 'month', 'day'
      ]
    }
)

# パーティション作成
spark.sql(f"MSCK REPAIR TABLE {args['GLUE_DATABASE_NAME']}.{args['TARGET_GLUE_TABLE_NAME']}")

job.commit()

  • Glue ジョブでは PySpark を使用して処理を記述します。
  • Glue テーブルからのデータの読み取りは create_dynamic_frame.from_catalog を使用します。
    • 今回は使用していませんが、push_down_predicate オプションを使うとパーティションキーで読み取りデータをフィルターできます。
  • Glue テーブルへのデータの書き込みは write_dynamic_frame.from_catalog を使用します。
  • write_dynamic_frame.from_catalog ではパーティションの自動作成は行われないので、データ書込み後に MSCK REPAIR TABLE クエリを合わせて実行しています。

CDK コード

import {
  aws_s3,
  RemovalPolicy,
  Stack,
  StackProps,
  CfnOutput,
} from 'aws-cdk-lib';
import { Construct } from 'constructs';
import * as glue_alpha from '@aws-cdk/aws-glue-alpha';

export class CdkSampleStack extends Stack {
  constructor(scope: Construct, id: string, props: StackProps) {
    super(scope, id, props);

    // Glue データベース
    const glueDatabase = new glue_alpha.Database(this, 'GlueDatabase');

    // データソース S3 バケット
    const dataSourceBucket = new aws_s3.Bucket(this, 'DataSourceBucket', {
      removalPolicy: RemovalPolicy.DESTROY,
      autoDeleteObjects: true,
    });

    // データソース Glue テーブル
    const sourceGlueTable = new glue_alpha.S3Table(this, 'SourceGlueTable', {
      database: glueDatabase,
      bucket: dataSourceBucket,
      s3Prefix: 'data/',
      dataFormat: glue_alpha.DataFormat.PARQUET,
      partitionKeys: [
        {
          name: 'year',
          type: glue_alpha.Schema.STRING,
        },
        {
          name: 'month',
          type: glue_alpha.Schema.STRING,
        },
        {
          name: 'day',
          type: glue_alpha.Schema.STRING,
        },
      ],
      columns: [
        {
          name: 'id',
          type: glue_alpha.Schema.STRING,
        },
        {
          name: 'airTemperature',
          type: glue_alpha.Schema.DOUBLE,
        },
      ],
    });

    // データターゲット S3 バケット
    const dataTargetBucket = new aws_s3.Bucket(this, 'DataTargetBucket', {
      removalPolicy: RemovalPolicy.DESTROY,
      autoDeleteObjects: true,
    });

    // データターゲット Glue テーブル
    const targetGlueTable = new glue_alpha.S3Table(this, 'TargetGlueTable', {
      database: glueDatabase,
      bucket: dataTargetBucket,
      s3Prefix: 'data/',
      dataFormat: glue_alpha.DataFormat.PARQUET,
      partitionKeys: [
        {
          name: 'year',
          type: glue_alpha.Schema.STRING,
        },
        {
          name: 'month',
          type: glue_alpha.Schema.STRING,
        },
        {
          name: 'day',
          type: glue_alpha.Schema.STRING,
        },
      ],
      columns: [
        {
          name: 'id',
          type: glue_alpha.Schema.STRING,
        },
        {
          name: 'airTemperature',
          type: glue_alpha.Schema.DOUBLE,
        },
        {
          name: 'timestamp',
          type: glue_alpha.Schema.TIMESTAMP,
        },
      ],
    });

    // Glue ジョブ
    const glueJob = new glue_alpha.Job(this, 'GlueJob', {
      executable: glue_alpha.JobExecutable.pythonEtl({
        glueVersion: glue_alpha.GlueVersion.V4_0,
        pythonVersion: glue_alpha.PythonVersion.THREE,
        script: glue_alpha.Code.fromAsset('src/glue-job.py'),
      }),
      defaultArguments: {
        '--job-bookmark-option': 'job-bookmark-disable',
        '--enable-glue-datacatalog': 'true',
        '--GLUE_DATABASE_NAME': glueDatabase.databaseName,
        '--SOURCE_GLUE_TABLE_NAME': sourceGlueTable.tableName,
        '--TARGET_GLUE_TABLE_NAME': targetGlueTable.tableName,
      },
      continuousLogging: { enabled: true },
      enableProfilingMetrics: true,
    });

    // Glue ジョブの権限設定
    sourceGlueTable.grantRead(glueJob);
    targetGlueTable.grantWrite(glueJob);

    // 物理 ID 確認用
    new CfnOutput(this, 'GlueDatabseName', {
      value: glueDatabase.databaseName,
    });
    new CfnOutput(this, 'SourceGlueTableName', {
      value: sourceGlueTable.tableName,
    });
    new CfnOutput(this, 'TargetGlueTableName', {
      value: targetGlueTable.tableName,
    });
    new CfnOutput(this, 'GlueJobName', {
      value: glueJob.jobName,
    });
  }
}

  • AWS CDK のコードは TypeScript で記述しています。
  • AWS Glue の L2 Construct Class は現在は @aws-cdk/aws-glue-alpha でのみサポートされているので使用しています。
  • 読み取り元および書き込み先の Glue テーブルの Glue ジョブへの権限付与は grantRead および grantWrite メソッドが便利です。

動作確認

読み取り元テーブルに Athena でデータを作成します。

INSERT INTO cdksamplestackgluedatabase6a27012d.cdksamplestacksourcegluetable88243330
    (year, month, day, id, airtemperature)
VALUES
    ('2023', '10', '22', 'd001', 10.05),
    ('2023', '10', '23', 'd001', 5),
    ('2023', '10', '23', 'd002', 1.1)

データが作成されました。

SELECT * FROM "cdksamplestackgluedatabase6a27012d"."cdksamplestacksourcegluetable88243330"

id airtemperature year month day
d001 10.05 2023 10 22
d001 5.0 2023 10 23
d002 1.1 2023 10 23

Glue ジョブを実行します。

JobRunId=$(
  aws glue start-job-run --job-name ${GLUE_JOB_NAME} \
    --query JobRunId \
    --output text
)

2,3 分待つと、ジョブ実行が成功しました。

$ aws glue get-job-run --job-name ${GLUE_JOB_NAME} --run-id ${JobRunId} --query 'JobRun.JobRunState'
"SUCCEEDED"

書き込み先テーブルを Athena でクエリすると、Glue ジョブによりデータが書き込まれていることが確認できました。

SELECT * FROM "cdksamplestackgluedatabase6a27012d"."cdksamplestacktargetgluetable88a583f6"

id airtemperature timestamp year month day
d001 5.0 2023-10-21 17:53:41.528 2023 10 23
d001 10.05 2023-10-21 17:53:41.528 2023 10 22
d002 1.1 2023-10-21 17:53:41.528 2023 10 23

おわりに

AWS Glue テーブルに対してデータを読み取り/書き込みする Glue ジョブを AWS CDK で作成する方法を確認してみました。

よくあるユースケースだと思うので参考になれば幸いです。

参考

以上

この記事をシェアする

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.