この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
こんにちは、CX事業本部の若槻です。
AWS Glueでは、ジョブブックマークという機能を使用することにより、最後のジョブ実行以降の増分データのみを処理対象とすることが可能です。
今回は、ジョブブックマークが有効なGlueジョブで、前回のジョブ実行で処理済みのパーティションやS3オブジェクトに増分データが追加された場合の次回のGlueジョブでの動作を確認してみました。
確認すること
具体的は、AWSドキュメントに記載されている下記仕様通りの動作となるかを実際に確認してみます。
S3 に保存されている処理対象のファイルを識別するため、ジョブブックマークはファイル名ではなくオブジェクトの最終変更時刻を確認します。ジョブが最後に実行されてから入力オブジェクトが変更された場合、ジョブが再度実行されるときに入力オブジェクトが再処理されます。
AWS Glue データカタログ内の特定のパーティションの作成タイムスタンプが、ジョブブックマークによってキャプチャされた最後のジョブ実行のタイムスタンプよりも古い場合、パーティションがスキップされます。
確認してみた
環境作成
動作確認環境を作成します。
CloudFormationスタック
template.yaml
AWSTemplateFormatVersion: '2010-09-09'
Resources:
DevicesRawDataBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Sub devices-raw-data-${AWS::AccountId}-${AWS::Region}
DevicesDataAnalyticsBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Sub devices-data-analytics-${AWS::AccountId}-${AWS::Region}
DevicesDataAnalyticsGlueDatabase:
Type: AWS::Glue::Database
Properties:
CatalogId: !Ref AWS::AccountId
DatabaseInput:
Name: devices_data_analystics
RawDataGlueTable:
Type: AWS::Glue::Table
Properties:
CatalogId: !Ref AWS::AccountId
DatabaseName: !Ref DevicesDataAnalyticsGlueDatabase
TableInput:
Name: devices_raw_data
TableType: EXTERNAL_TABLE
Parameters:
has_encrypted_data: false
serialization.encoding: utf-8
EXTERNAL: true
StorageDescriptor:
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Columns:
- Name: device_id
Type: string
- Name: timestamp
Type: bigint
- Name: state
Type: boolean
InputFormat: org.apache.hadoop.mapred.TextInputFormat
Location: !Sub s3://${DevicesRawDataBucket}/raw-data
SerdeInfo:
Parameters:
paths: "device_id, timestamp, state"
SerializationLibrary: org.apache.hive.hcatalog.data.JsonSerDe
PartitionKeys:
- Name: year
Type: string
- Name: month
Type: string
- Name: day
Type: string
IntegratedDataGlueTable:
Type: AWS::Glue::Table
Properties:
CatalogId: !Ref AWS::AccountId
DatabaseName: !Ref DevicesDataAnalyticsGlueDatabase
TableInput:
Name: devices_integrated_data
TableType: EXTERNAL_TABLE
Parameters:
has_encrypted_data: false
serialization.encoding: utf-8
EXTERNAL: true
StorageDescriptor:
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Columns:
- Name: device_id
Type: string
- Name: timestamp
Type: bigint
- Name: state
Type: boolean
- Name: year
Type: string
- Name: month
Type: string
- Name: day
Type: string
InputFormat: org.apache.hadoop.mapred.TextInputFormat
Location: !Sub s3://${DevicesDataAnalyticsBucket}/integrated-data
SerdeInfo:
Parameters:
paths: "device_id, timestamp, state, year, month, day"
SerializationLibrary: org.apache.hive.hcatalog.data.JsonSerDe
ExecuteETLJobRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
-
Effect: Allow
Principal:
Service:
- glue.amazonaws.com
Action:
- sts:AssumeRole
Policies:
- PolicyName: devices-data-etl-glue-job-policy
PolicyDocument:
Version: 2012-10-17
Statement:
-
Effect: Allow
Action:
- glue:StartJobRun
Resource:
- !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:job/devices-data-etl
-
Effect: Allow
Action:
- glue:GetPartition
- glue:GetPartitions
- glue:GetTable
Resource:
- !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog
- !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${DevicesDataAnalyticsGlueDatabase}
- !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/${DevicesDataAnalyticsGlueDatabase}/${RawDataGlueTable}
- !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/${DevicesDataAnalyticsGlueDatabase}/${IntegratedDataGlueTable}
-
Effect: Allow
Action:
- glue:GetJobBookmark
Resource:
- !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/${DevicesDataAnalyticsGlueDatabase}/${RawDataGlueTable}
-
Effect: Allow
Action:
- s3:ListBucket
- s3:GetBucketLocation
Resource:
- arn:aws:s3:::*
-
Effect: Allow
Action:
- logs:CreateLogStream
- logs:CreateLogGroup
- logs:PutLogEvents
Resource: !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws-glue/jobs/*
-
Effect: Allow
Action:
- s3:GetObject
Resource:
- !Sub arn:aws:s3:::${DevicesRawDataBucket}/raw-data/*
- !Sub arn:aws:s3:::${DevicesDataAnalyticsBucket}/glue-job-script/devices-data-etl.py
-
Effect: Allow
Action:
- s3:GetObject
- s3:PutObject
Resource:
- !Sub arn:aws:s3:::${DevicesDataAnalyticsBucket}/glue-job-temp-dir/*
-
Effect: Allow
Action:
- s3:PutObject
Resource:
- !Sub arn:aws:s3:::${DevicesDataAnalyticsBucket}/integrated-data/*
DevicesDataETLGlueJob:
Type: AWS::Glue::Job
Properties:
Name: devices-data-etl
Command:
Name: glueetl
PythonVersion: 3
ScriptLocation: !Sub s3://${DevicesDataAnalyticsBucket}/glue-job-script/devices-data-etl.py
DefaultArguments:
--job-language: python
--job-bookmark-option: job-bookmark-enable
--TempDir: !Sub s3://${DevicesDataAnalyticsBucket}/glue-job-temp-dir
--GLUE_DATABASE_NAME: !Sub ${DevicesDataAnalyticsGlueDatabase}
--SRC_GLUE_TABLE_NAME: !Sub ${RawDataGlueTable}
--DEST_GLUE_TABLE_NAME: !Sub ${IntegratedDataGlueTable}
GlueVersion: 2.0
ExecutionProperty:
MaxConcurrentRuns: 1
MaxRetries: 0
Role: !Ref ExecuteETLJobRole
データソースとなるGlueデータカタログのリソース定義です。パーティションキーはyear
、month
、day
となります。
template.yaml
RawDataGlueTable:
Type: AWS::Glue::Table
Properties:
CatalogId: !Ref AWS::AccountId
DatabaseName: !Ref DevicesDataAnalyticsGlueDatabase
TableInput:
Name: devices_raw_data
TableType: EXTERNAL_TABLE
Parameters:
has_encrypted_data: false
serialization.encoding: utf-8
EXTERNAL: true
StorageDescriptor:
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Columns:
- Name: device_id
Type: string
- Name: timestamp
Type: bigint
- Name: state
Type: boolean
InputFormat: org.apache.hadoop.mapred.TextInputFormat
Location: !Sub s3://${DevicesRawDataBucket}/raw-data
SerdeInfo:
Parameters:
paths: "device_id, timestamp, state"
SerializationLibrary: org.apache.hive.hcatalog.data.JsonSerDe
PartitionKeys:
- Name: year
Type: string
- Name: month
Type: string
- Name: day
Type: string
Glueジョブのリソース定義です。ジョブブックマークを有効にする場合はjob-bookmark-option
をjob-bookmark-enable
とする必要があります。
template.yaml
DevicesDataETLGlueJob:
Type: AWS::Glue::Job
Properties:
Name: devices-data-etl
Command:
Name: glueetl
PythonVersion: 3
ScriptLocation: !Sub s3://${DevicesDataAnalyticsBucket}/glue-job-script/devices-data-etl.py
DefaultArguments:
--job-language: python
--job-bookmark-option: job-bookmark-enable
--TempDir: !Sub s3://${DevicesDataAnalyticsBucket}/glue-job-temp-dir
--GLUE_DATABASE_NAME: !Sub ${DevicesDataAnalyticsGlueDatabase}
--SRC_GLUE_TABLE_NAME: !Sub ${RawDataGlueTable}
--DEST_GLUE_TABLE_NAME: !Sub ${IntegratedDataGlueTable}
GlueVersion: 2.0
ExecutionProperty:
MaxConcurrentRuns: 1
MaxRetries: 0
Role: !Ref ExecuteETLJobRole
Glueジョブスクリプト
GlueジョブのPySparkスクリプトは下記のようになります。今回は処理内容ではなく処理対象のデータが何であるかを確認したいので、データソースから取得したデータを変更を加えずデータターゲットに書き込むだけの処理内容となっています。
devices-data-etl.py
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
args = getResolvedOptions(
sys.argv,
[
'JOB_NAME',
'GLUE_DATABASE_NAME',
'SRC_GLUE_TABLE_NAME',
'DEST_GLUE_TABLE_NAME'
]
)
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
df = glueContext.create_dynamic_frame.from_catalog(
database = args['GLUE_DATABASE_NAME'],
table_name = args['SRC_GLUE_TABLE_NAME'],
transformation_ctx = 'datasource'
).toDF()
df.show()
dyf = DynamicFrame.fromDF(df,
glueContext,
'integrated_data'
)
glueContext.write_dynamic_frame.from_catalog(
frame = dyf,
database = args['GLUE_DATABASE_NAME'],
table_name = args['DEST_GLUE_TABLE_NAME'],
transformation_ctx = 'datasink'
)
job.commit()
データソースからのデータ取得時にジョブブックマークを使用する場合は、create_dynamic_frame.from_catalog()
でtransformation_ctx
オプションを指定します。
For job bookmarks to work properly, enable the job bookmark parameter and set the transformation_ctx parameter. If you don't pass in the transformation_ctx parameter, then job bookmarks are not enabled for a dynamic frame or a table used in the method.
devices-data-etl.py
df = glueContext.create_dynamic_frame.from_catalog(
database = args['GLUE_DATABASE_NAME'],
table_name = args['SRC_GLUE_TABLE_NAME'],
transformation_ctx = 'datasource'
).toDF()
デプロイ
以降のコマンド実行で使用する変数を定義します。
% AWS_REGION=ap-northeast-1
% ACCOUNT_ID=$(aws sts get-caller-identity | jq -r ".Account")
% RAW_DATA_BUCKET=s3://devices-raw-data-${ACCOUNT_ID}-${AWS_REGION}
% DATA_ANALYTICS_BUCKET=s3://devices-data-analytics-${ACCOUNT_ID}-${AWS_REGION}
CloudFormationスタックをデプロイします。
% aws cloudformation deploy \
--template-file template.yaml \
--stack-name devices-data-analytics-stack \
--capabilities CAPABILITY_NAMED_IAM \
--no-fail-on-empty-changeset
GlueジョブのスクリプトをS3バケットにアップロードします。
% aws s3 cp devices-data-etl.py \
${DATA_ANALYTICS_BUCKET}/glue-job-script/devices-data-etl.py
動作確認
確認1:新規パーティション・新規オブジェクト
Glueジョブで未処理の新規パーティションに新規オブジェクトとしてデータが作成された場合のジョブ実行時の動作を確認してみます。(後続の確認2,3と比較のため)
データが記載されたオブジェクトraw-data-A.json
をS3バケットのパーティションパスに作成します。
raw-data-1.json
{"device_id": "3ff9c44a", "timestamp": 1609348014, "state": true}
% aws s3 cp raw-data-1.json \
${RAW_DATA_BUCKET}/raw-data/year=2021/month=01/day=06/raw-data-A.json
データカタログのパーティションを更新します。
% aws athena start-query-execution \
--query-string "MSCK REPAIR TABLE ${GLUE_DATABASE_NAME}.${RAW_DATA_GLUE_TABLE_NAME}" \
--work-group primary
ジョブを実行します。
% aws glue start-job-run --job-name devices-data-etl
スクリプト内でのdf.show()
の出力を見ると、下記のように追加したデータが過不足なく取得できていることが分かります。
>>> df.show()
+---------+----------+-----+----+-----+---+
|device_id| timestamp|state|year|month|day|
+---------+----------+-----+----+-----+---+
| 3ff9c44a|1609348014| true|2021| 01| 06|
+---------+----------+-----+----+-----+---+
データターゲットのデータカタログからデータを取得してみます。
SELECT * FROM "devices_data_analystics"."devices_integrated_data"
データソースから取得されたデータがデータターゲットにロードされています。
確認2:処理済みパーティション・新規オブジェクト
前回のGlueジョブで処理済みのパーティションに新規オブジェクトとしてデータが作成された場合のジョブ実行時の動作を確認してみます。
データが記載されたオブジェクトraw-data-B.json
をS3バケットの前回と同じパーティションパスに作成します。
raw-data-2.json
{"device_id": "e36b7dfa", "timestamp": 1609375822, "state": true}
% aws s3 cp raw-data-2.json \
${RAW_DATA_BUCKET}/raw-data/year=2021/month=01/day=06/raw-data-B.json
ジョブを実行します。
% aws glue start-job-run --job-name devices-data-etl
スクリプト内でのdf.show()
の出力を見ると、下記のように追加したデータが過不足なく取得できていることが分かります。
>>> df.show()
+---------+----------+-----+----+-----+---+
|device_id| timestamp|state|year|month|day|
+---------+----------+-----+----+-----+---+
| e36b7dfa|1609375822| true|2021| 01| 06|
+---------+----------+-----+----+-----+---+
データターゲットのデータカタログからデータを取得してみます。
SELECT * FROM "devices_data_analystics"."devices_integrated_data"
データソースから取得された追加のデータ(device_id=e36b7dfa
)がデータターゲットにロードされています。
確認3:処理済みパーティション・処理済みオブジェクト
前回のGlueジョブで処理済みのオブジェクトに追記によりデータが追加された場合のジョブ実行時の動作を確認してみます。
前回処理したものと同じ名前で、2行目にデータが追記されたオブジェクトraw-data-B.json
を、S3バケットの前回と同じパーティションパスに作成します。
raw-data-3.json
{"device_id": "e36b7dfa", "timestamp": 1609375822, "state": true}
{"device_id": "7d4215d0", "timestamp": 1609497057, "state": false}
% aws s3 cp raw-data-3.json \
${RAW_DATA_BUCKET}/raw-data/year=2021/month=01/day=06/raw-data-B.json
ジョブを実行します。
% aws glue start-job-run --job-name devices-data-etl
スクリプト内でのdf.show()
の出力を見ると、下記のように今回追記したデータに加えて、前回処理したデータが重複して取得されていることが分かります。
>>> df.show()
+---------+----------+-----+----+-----+---+
|device_id| timestamp|state|year|month|day|
+---------+----------+-----+----+-----+---+
| e36b7dfa|1609375822| true|2021| 01| 06|
| 7d4215d0|1609497057|false|2021| 01| 06|
+---------+----------+-----+----+-----+---+
データターゲットのデータカタログからデータを取得してみます。
SELECT * FROM "devices_data_analystics"."devices_integrated_data"
今回はデータターゲットには追加のデータがロードされていませんでした。
この動作は、ジョブブックマークはデータソースからのデータ取得時だけでなく、データターゲットへのデータロード時にも適用させることができるためです。
データターゲットへのデータロード時にジョブブックマークを使用する場合も、write_dynamic_frame.from_catalog()
でtransformation_ctx
オプションを指定します。
devices-data-etl.py
glueContext.write_dynamic_frame.from_catalog(
frame = dyf,
database = args['GLUE_DATABASE_NAME'],
table_name = args['DEST_GLUE_TABLE_NAME'],
transformation_ctx = 'datasink'
)
まとめ
検証結果をまとめると下記の通りとなりました。(データソースからのデータ取得、データターゲットへのデータロードのいずれでもジョブブックマークを有効にした場合)冒頭で示したAWSドキュメント通りの仕様となりました。
- 前回までのジョブで処理済みのパーティション内に作成されたオブジェクトのデータは、データソースからの取得、データターゲットへのロードの対象となる。
- 前回までのジョブで処理済みのオブジェクトにデータが追記された場合は、そのオブジェクトのデータはデータソースからの取得の対象となる(ジョブ内でのデータの重複が発生する)が、データターゲットへのロードの対象とはならない(データの欠損が発生する)。
ちなみに、Amazon Kinesis Data FirehoseによりS3バケットへ配信されたデータをGlueジョブのデータソースとする場合は、レコードが重複なく保持されているバッファごとにランダムな名前のオブジェクトが作成されるので、2.の仕様を懸念する必要はなさそうです。
The frequency of data delivery to Amazon S3 is determined by the Amazon S3 Buffer size and Buffer interval value that you configured for your delivery stream. Kinesis Data Firehose buffers incoming data before it delivers it to Amazon S3.
The Amazon S3 object name follows the pattern DeliveryStreamName-DeliveryStreamVersion-YYYY-MM-dd-HH-MM-SS-RandomString,
おわりに
ジョブブックマークが有効なGlueジョブで、前回のジョブ実行で処理済みのパーティションやS3オブジェクトに増分データが追加された際の動作を確認してみました。
このようにGlueのジョブブックマークはデータの処理履歴を裏側でよろしく管理してくれてとても便利なのでGlueジョブを使う際は是非とも有効活用したいですね。
参考
以上