この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
こんにちは、CX事業本部の若槻です。
AWS Glueでは、パーティション分割を行うことによりデータの整理や効率的なクエリ実行を行うことが可能です。
今回は、クローラーとジョブを組み合わせて、パーティション分割されたデータソースをパーティション分割したデータターゲットに追加するETLフローを作ってみました。
作るもの
- データソースに追加されたデータをもとにGlueテーブルにパーティションを追加するクローラー
- パーティション分割されたデータソースをパーティション分割したデータターゲットに追加するジョブ
これらクローラーとジョブで下記のようなワークフローを作成します。まずクローラーを実行し、正常に実行が完了したらジョブが実行されるようにします。
やってみた
まず最初に以降のコマンド実行で使用する変数を定義しておきます。
% AWS_REGION=ap-northeast-1
% ACCOUNT_ID=$(aws sts get-caller-identity | jq -r ".Account")
% RAW_DATA_BUCKET=s3://devices-raw-data-${ACCOUNT_ID}-${AWS_REGION}
% DATA_ANALYTICS_BUCKET=s3://devices-data-analytics-${ACCOUNT_ID}-${AWS_REGION}
% GLUE_DATABASE_NAME=devices_data_analystics
% RAW_DATA_GLUE_TABLE_NAME=devices_raw_data
% INTEGRATED_DATA_GLUE_TABLE_NAME=devices_integrated_data
% WORKFLOW_NAME=devices-analytics
% ATHENA_WORK_GROUP_NAME=devices-data-analytics
環境構築
動作確認環境を作成します。
CloudFormationスタック
CloudFormationスタックのテンプレートです。(長いため折りたたんでいます。)
クリックで展開
template.yaml
AWSTemplateFormatVersion: '2010-09-09'
Resources:
DevicesRawDataBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Sub devices-raw-data-${AWS::AccountId}-${AWS::Region}
DevicesDataAnalyticsAthenaWorkGroup:
Type: AWS::Athena::WorkGroup
Properties:
Name: devices-data-analytics
WorkGroupConfiguration:
ResultConfiguration:
OutputLocation: !Sub s3://${DevicesRawDataBucket}/query-result
EnforceWorkGroupConfiguration: true
PublishCloudWatchMetricsEnabled: true
DevicesDataAnalyticsBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Sub devices-data-analytics-${AWS::AccountId}-${AWS::Region}
DevicesDataAnalyticsGlueDatabase:
Type: AWS::Glue::Database
Properties:
CatalogId: !Ref AWS::AccountId
DatabaseInput:
Name: devices_data_analystics
DevicesRawDataGlueTable:
Type: AWS::Glue::Table
Properties:
CatalogId: !Ref AWS::AccountId
DatabaseName: !Ref DevicesDataAnalyticsGlueDatabase
TableInput:
Name: devices_raw_data
TableType: EXTERNAL_TABLE
Parameters:
has_encrypted_data: false
serialization.encoding: utf-8
EXTERNAL: true
StorageDescriptor:
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Columns:
- Name: device_id
Type: string
- Name: timestamp
Type: bigint
- Name: state
Type: boolean
InputFormat: org.apache.hadoop.mapred.TextInputFormat
Location: !Sub s3://${DevicesRawDataBucket}/raw-data
SerdeInfo:
Parameters:
paths: "device_id, timestamp, state"
SerializationLibrary: org.apache.hive.hcatalog.data.JsonSerDe
PartitionKeys:
- Name: year
Type: string
- Name: month
Type: string
- Name: day
Type: string
DevicesIntegratedDataGlueTable:
Type: AWS::Glue::Table
Properties:
CatalogId: !Ref AWS::AccountId
DatabaseName: !Ref DevicesDataAnalyticsGlueDatabase
TableInput:
Name: devices_integrated_data
TableType: EXTERNAL_TABLE
Parameters:
has_encrypted_data: false
serialization.encoding: utf-8
EXTERNAL: true
StorageDescriptor:
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Columns:
- Name: device_id
Type: string
- Name: timestamp
Type: bigint
- Name: state
Type: boolean
- Name: partition_date
Type: string
InputFormat: org.apache.hadoop.mapred.TextInputFormat
Location: !Sub s3://${DevicesDataAnalyticsBucket}/integrated-data
SerdeInfo:
Parameters:
paths: "device_id, timestamp, state, partition_date"
SerializationLibrary: org.apache.hive.hcatalog.data.JsonSerDe
PartitionKeys:
- Name: year
Type: string
- Name: month
Type: string
- Name: day
Type: string
ExecuteDevicesDataETLGlueJobRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
-
Effect: Allow
Principal:
Service:
- glue.amazonaws.com
Action:
- sts:AssumeRole
Policies:
- PolicyName: devices-data-etl-glue-job-policy
PolicyDocument:
Version: 2012-10-17
Statement:
-
Effect: Allow
Action:
- glue:StartJobRun
Resource:
- !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:job/devices-data-etl
-
Effect: Allow
Action:
- glue:GetPartition
- glue:GetPartitions
- glue:GetTable
- glue:BatchCreatePartition
Resource:
- !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog
- !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${DevicesDataAnalyticsGlueDatabase}
- !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/${DevicesDataAnalyticsGlueDatabase}/${DevicesRawDataGlueTable}
- !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/${DevicesDataAnalyticsGlueDatabase}/${DevicesIntegratedDataGlueTable}
-
Effect: Allow
Action:
- s3:ListBucket
- s3:GetBucketLocation
Resource:
- arn:aws:s3:::*
-
Effect: Allow
Action:
- logs:CreateLogStream
- logs:CreateLogGroup
- logs:PutLogEvents
Resource: !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws-glue/jobs/*
-
Effect: Allow
Action:
- s3:GetObject
Resource:
- !Sub arn:aws:s3:::${DevicesRawDataBucket}/raw-data/*
- !Sub arn:aws:s3:::${DevicesDataAnalyticsBucket}/glue-job-script/devices-data-etl.py
-
Effect: Allow
Action:
- s3:GetObject
- s3:PutObject
Resource:
- !Sub arn:aws:s3:::${DevicesDataAnalyticsBucket}/glue-job-temp-dir/*
-
Effect: Allow
Action:
- s3:PutObject
Resource:
- !Sub arn:aws:s3:::${DevicesDataAnalyticsBucket}/integrated-data/*
DevicesDataETLGlueJob:
Type: AWS::Glue::Job
Properties:
Name: devices-data-etl
Command:
Name: glueetl
PythonVersion: 3
ScriptLocation: !Sub s3://${DevicesDataAnalyticsBucket}/glue-job-script/devices-data-etl.py
DefaultArguments:
--job-language: python
--job-bookmark-option: job-bookmark-enable
--TempDir: !Sub s3://${DevicesDataAnalyticsBucket}/glue-job-temp-dir
--GLUE_DATABASE_NAME: !Sub ${DevicesDataAnalyticsGlueDatabase}
--RAW_DATA_GLUE_TABLE_NAME: !Sub ${DevicesRawDataGlueTable}
--INTEGRATED_DATA_GLUE_TABLE_NAME: !Sub ${DevicesIntegratedDataGlueTable}
GlueVersion: 2.0
ExecutionProperty:
MaxConcurrentRuns: 1
MaxRetries: 0
Role: !Ref ExecuteDevicesDataETLGlueJobRole
DevicesDataAnalyticsGlueCrawler:
Type: AWS::Glue::Crawler
Properties:
Role: !Sub arn:aws:iam::${AWS::AccountId}:role/service-role/AWSGlueServiceRole-DefaultRole
Targets:
CatalogTargets:
- DatabaseName: !Ref DevicesDataAnalyticsGlueDatabase
Tables:
- !Ref DevicesRawDataGlueTable
SchemaChangePolicy:
DeleteBehavior: LOG
DevicesAnalyticsGlueWorkflow:
Type: AWS::Glue::Workflow
Properties:
Name: devices-analytics
DevicesDataAnalyticsGlueCrawlerTrigger:
Type: AWS::Glue::Trigger
Properties:
WorkflowName: !Ref DevicesAnalyticsGlueWorkflow
Type: ON_DEMAND
Actions:
- CrawlerName: !Ref DevicesDataAnalyticsGlueCrawler
DevicesDataETLGlueJobTrigger:
Type: AWS::Glue::Trigger
Properties:
WorkflowName: !Ref DevicesAnalyticsGlueWorkflow
Type: CONDITIONAL
Actions:
- JobName: !Ref DevicesDataETLGlueJob
Predicate:
Conditions:
- LogicalOperator: EQUALS
CrawlerName: !Ref DevicesDataAnalyticsGlueCrawler
CrawlState: SUCCEEDED
StartOnCreation: true
データターゲットへの書き込み時にGlueテーブルにパーティションを作成したい場合はジョブの実行ロールでglue:BatchCreatePartition
アクションを許可する必要があるため、リソース定義ExecuteDevicesDataETLGlueJobRole
で許可しています。
スタックをデプロイします。
% aws cloudformation deploy \
--template-file template.yaml \
--stack-name devices-data-analytics-stack \
--capabilities CAPABILITY_NAMED_IAM \
--no-fail-on-empty-changeset
Glueジョブスクリプト
データソースから取得したデータに対して、パーティションスキーマの値を使用したpartition_date
列を追加し、データターゲットに書き込むPySparkスクリプトです。(長いため折りたたんでいます。)
クリックで展開
devices-data-etl.py
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import concat, lit
args = getResolvedOptions(
sys.argv,
[
'JOB_NAME',
'GLUE_DATABASE_NAME',
'RAW_DATA_GLUE_TABLE_NAME',
'INTEGRATED_DATA_GLUE_TABLE_NAME'
]
)
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
df = glueContext.create_dynamic_frame.from_catalog(
database = args['GLUE_DATABASE_NAME'],
table_name = args['RAW_DATA_GLUE_TABLE_NAME'],
transformation_ctx = 'datasource'
).toDF()
df = df.withColumn('partition_date',
concat(
df.year,
lit('/'),
df.month,
lit('/'),
df.day
)
)
dyf = DynamicFrame.fromDF(
df,
glueContext,
'integrated_data'
)
additionalOptions = {
"enableUpdateCatalog": True
}
additionalOptions['partitionKeys'] = [
'year', 'month', 'day'
]
glueContext.write_dynamic_frame.from_catalog(
frame = dyf,
database = args['GLUE_DATABASE_NAME'],
table_name = args['INTEGRATED_DATA_GLUE_TABLE_NAME'],
transformation_ctx = 'datasink',
additional_options=additionalOptions
)
job.commit()
データターゲットへの書き込み時に指定のパーティションを作成する場合は、write_dynamic_frame.from_catalog()
でadditional_options
を下記のように指定します。
devices-data-etl.py
additionalOptions = {
"enableUpdateCatalog": True
}
additionalOptions['partitionKeys'] = [
'year', 'month', 'day'
]
glueContext.write_dynamic_frame.from_catalog(
frame = dyf,
database = args['GLUE_DATABASE_NAME'],
table_name = args['INTEGRATED_DATA_GLUE_TABLE_NAME'],
transformation_ctx = 'datasink',
additional_options=additionalOptions
)
スクリプトをS3バケットにアップロードします。
% aws s3 cp devices-data-etl.py \
${DATA_ANALYTICS_BUCKET}/glue-job-script/devices-data-etl.py
データソースへのデータ作成
データソースの場所となるS3バケットのパスにデータが記載されたパーティションパスを持つオブジェクトを作成します。
raw-data.json
{"device_id": "3ff9c44a", "timestamp": 1609348014, "state": true}
% aws s3 cp raw-data.json \
${RAW_DATA_BUCKET}/raw-data/year=2021/month=01/day=13/raw-data.json
この時点ではデータソースのGlueテーブルにパーティションは一つも作成されていません。
% aws glue get-partitions \
--database-name ${GLUE_DATABASE_NAME} \
--table-name ${RAW_DATA_GLUE_TABLE_NAME}
{
"Partitions": []
}
動作確認
Glueワークフローを実行します。
% RunId=$(
aws glue start-workflow-run --name ${WORKFLOW_NAME} \
--query RunId \
--output text
)
ワークフローの実行のStatistics.SucceededActions
が2
となれば、すべてのアクションの実行が正常に完了しています。
% aws glue get-workflow-run \
--name ${WORKFLOW_NAME} \
--run-id ${RunId} \
--query Run.Statistics
{
"TotalActions": 2,
"TimeoutActions": 0,
"FailedActions": 0,
"StoppedActions": 0,
"SucceededActions": 2,
"RunningActions": 0
}
データターゲットのGlueテーブルのパーティションを取得してみると、データソースと同じパーティションが作成されています。
% aws glue get-partitions \
--database-name ${GLUE_DATABASE_NAME} \
--table-name ${INTEGRATED_DATA_GLUE_TABLE_NAME} \
--query 'Partitions[0].Values'
[
"2021",
"01",
"13"
]
データターゲットからS3オブジェクトを取得してみると、ちゃんとパーティション分割されてオブジェクトが作成されています。
% aws s3 ls ${DATA_ANALYTICS_BUCKET}/integrated-data --recursive
2021-01-14 19:55:09 90 integrated-data/year=2021/month=01/day=13/run-datasink-4-part-r-00000
AthenaでクエリSELECT * FROM ${RAW_DATA_GLUE_TABLE_NAME}
を実行してデータターゲットのデータを取得してみます。
% QueryExecutionId=$( \
aws athena start-query-execution \
--query-string "SELECT * FROM ${INTEGRATED_DATA_GLUE_TABLE_NAME}" \
--work-group ${ATHENA_WORK_GROUP_NAME} \
--query-execution-context Database=${GLUE_DATABASE_NAME},Catalog=AwsDataCatalog \
--query QueryExecutionId \
--output text \
)
クエリの実行結果を取得すると、データターゲットにGlueジョブで加工されたデータが作成されていることが確認できました。
% aws athena get-query-results \
--query-execution-id $QueryExecutionId \
--query ResultSet.Rows
[
{
"Data": [
{
"VarCharValue": "device_id"
},
{
"VarCharValue": "timestamp"
},
{
"VarCharValue": "state"
},
{
"VarCharValue": "partition_date"
},
{
"VarCharValue": "year"
},
{
"VarCharValue": "month"
},
{
"VarCharValue": "day"
}
]
},
{
"Data": [
{
"VarCharValue": "3ff9c44a"
},
{
"VarCharValue": "1609348014"
},
{
"VarCharValue": "true"
},
{
"VarCharValue": "2021/01/13"
},
{
"VarCharValue": "2021"
},
{
"VarCharValue": "01"
},
{
"VarCharValue": "13"
}
]
}
]
おわりに
クローラーとジョブを組み合わせて、パーティション分割されたデータソースをパーティション分割したデータターゲットに追加するETLフローを作ってみました。
今回はETLワークフローを手動実行しましたが、cronベースのトリガーで日次バッチとして実行させることももちろん可能です。これを基本形にいろいろ応用させてみたいですね。
参考
- Glueの使い方的な⑤(パーティション分割してるcsvデータをパーティション分割したparquetに変換) - Qiita
- AWS Glueを用いてパフォーマンス向上やコスト最適化するカラム名ありパーティションのデータに変換するETLコードを作成する | Developers.IO
- AWS::Glue::Trigger - AWS CloudFormation
- get-workflow-run — AWS CLI 1.18.214 Command Reference
- ls — AWS CLI 1.18.214 Command Reference
以上