この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
こんにちは、CX事業本部の若槻です。
AWS Glueでは、Pushdown Predicatesを使うことによりジョブでデータソースからデータを取得する際にパーティションキーによるフィルターを行うことができます。
- Managing Partitions for ETL Output in AWS Glue - AWS Glue
- AWS Glue の Pushdown Predicates を用いてすべてのファイルを読み込むことなく、パーティションをプレフィルタリングする | Developers.IO
このPushdown PredicatesはHive標準の機能なのですが、複数条件や数値の大小など様々な条件を指定することが可能です。
そこでふと「Glueテーブルでstring型として定義したパーティションキーをPushdown Predicatesでday <= 18
やmonth > 1
のような数値の大小の条件でフィルターできるのか?」というのが気になったので確認してみました。
やってみた
まず最初に以降のコマンド実行で使用する変数を定義しておきます。
% AWS_REGION=ap-northeast-1
% ACCOUNT_ID=$(aws sts get-caller-identity | jq -r ".Account")
% RAW_DATA_BUCKET=s3://devices-raw-data-${ACCOUNT_ID}-${AWS_REGION}
% DATA_ANALYTICS_BUCKET=s3://devices-data-analytics-${ACCOUNT_ID}-${AWS_REGION}
% GLUE_DATABASE_NAME=devices_data_analystics
% RAW_DATA_GLUE_TABLE_NAME=devices_raw_data
環境作成
CloudFormationスタック
CloudFormationスタックのテンプレートです。
template.yaml
AWSTemplateFormatVersion: '2010-09-09'
Resources:
DevicesRawDataBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Sub devices-raw-data-${AWS::AccountId}-${AWS::Region}
DevicesDataAnalyticsBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Sub devices-data-analytics-${AWS::AccountId}-${AWS::Region}
DevicesDataAnalyticsGlueDatabase:
Type: AWS::Glue::Database
Properties:
CatalogId: !Ref AWS::AccountId
DatabaseInput:
Name: devices_data_analystics
DevicesRawDataGlueTable:
Type: AWS::Glue::Table
Properties:
CatalogId: !Ref AWS::AccountId
DatabaseName: !Ref DevicesDataAnalyticsGlueDatabase
TableInput:
Name: devices_raw_data
TableType: EXTERNAL_TABLE
Parameters:
has_encrypted_data: false
serialization.encoding: utf-8
EXTERNAL: true
StorageDescriptor:
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Columns:
- Name: device_id
Type: string
- Name: timestamp
Type: bigint
- Name: state
Type: boolean
InputFormat: org.apache.hadoop.mapred.TextInputFormat
Location: !Sub s3://${DevicesRawDataBucket}/raw-data
SerdeInfo:
Parameters:
paths: "device_id, timestamp, state"
SerializationLibrary: org.apache.hive.hcatalog.data.JsonSerDe
PartitionKeys:
- Name: year
Type: string
- Name: month
Type: string
- Name: day
Type: string
ExecuteDevicesDataETLGlueJobRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
-
Effect: Allow
Principal:
Service:
- glue.amazonaws.com
Action:
- sts:AssumeRole
Policies:
- PolicyName: devices-data-etl-glue-job-policy
PolicyDocument:
Version: 2012-10-17
Statement:
-
Effect: Allow
Action:
- glue:StartJobRun
Resource:
- !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:job/devices-data-etl
-
Effect: Allow
Action:
- glue:GetPartition
- glue:GetPartitions
- glue:GetTable
Resource:
- !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog
- !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${DevicesDataAnalyticsGlueDatabase}
- !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/${DevicesDataAnalyticsGlueDatabase}/${DevicesRawDataGlueTable}
-
Effect: Allow
Action:
- s3:ListBucket
- s3:GetBucketLocation
Resource:
- arn:aws:s3:::*
-
Effect: Allow
Action:
- logs:CreateLogStream
- logs:CreateLogGroup
- logs:PutLogEvents
Resource: !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws-glue/jobs/*
-
Effect: Allow
Action:
- s3:GetObject
Resource:
- !Sub arn:aws:s3:::${DevicesRawDataBucket}/raw-data/*
- !Sub arn:aws:s3:::${DevicesDataAnalyticsBucket}/glue-job-script/devices-data-etl.py
-
Effect: Allow
Action:
- s3:GetObject
- s3:PutObject
Resource:
- !Sub arn:aws:s3:::${DevicesDataAnalyticsBucket}/glue-job-temp-dir/*
DevicesDataETLGlueJob:
Type: AWS::Glue::Job
Properties:
Name: devices-data-etl
Command:
Name: glueetl
PythonVersion: 3
ScriptLocation: !Sub s3://${DevicesDataAnalyticsBucket}/glue-job-script/devices-data-etl.py
DefaultArguments:
--job-language: python
--job-bookmark-option: job-bookmark-enable
--TempDir: !Sub s3://${DevicesDataAnalyticsBucket}/glue-job-temp-dir
--GLUE_DATABASE_NAME: !Sub ${DevicesDataAnalyticsGlueDatabase}
--RAW_DATA_GLUE_TABLE_NAME: !Sub ${DevicesRawDataGlueTable}
GlueVersion: 2.0
ExecutionProperty:
MaxConcurrentRuns: 1
MaxRetries: 0
Role: !Ref ExecuteDevicesDataETLGlueJobRole
データソースとなるGlueテーブルのリソース定義DevicesRawDataGlueTable
で、Properties.TableInput.PartitionKeys
にて、year
、month
、day
の3つのパーティションキーをstring
型で定義しています。
スタックをデプロイします。
% aws cloudformation deploy \
--template-file template.yaml \
--stack-name devices-data-analytics-stack \
--capabilities CAPABILITY_NAMED_IAM \
--no-fail-on-empty-changeset
Glueジョブスクリプト
データソースからデータを取得するだけのPySparkスクリプトです。
devices-data-etl.py
import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(
sys.argv,
[
'JOB_NAME',
'GLUE_DATABASE_NAME',
'RAW_DATA_GLUE_TABLE_NAME'
]
)
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
print('1. フィルターなし')
df = glueContext.create_dynamic_frame.from_catalog(
database = args['GLUE_DATABASE_NAME'],
table_name = args['RAW_DATA_GLUE_TABLE_NAME']
).toDF()
df.show()
print('2. day >= 17')
df = glueContext.create_dynamic_frame.from_catalog(
database = args['GLUE_DATABASE_NAME'],
table_name = args['RAW_DATA_GLUE_TABLE_NAME'],
push_down_predicate = 'day >= 18'
).toDF()
df.show()
print('3. day < 17')
df = glueContext.create_dynamic_frame.from_catalog(
database = args['GLUE_DATABASE_NAME'],
table_name = args['RAW_DATA_GLUE_TABLE_NAME'],
push_down_predicate = 'day < 17'
).toDF()
df.show()
print('4. month >= 1')
df = glueContext.create_dynamic_frame.from_catalog(
database = args['GLUE_DATABASE_NAME'],
table_name = args['RAW_DATA_GLUE_TABLE_NAME'],
push_down_predicate = 'month >= 1'
).toDF()
df.show()
print('5. month >= 2')
df = glueContext.create_dynamic_frame.from_catalog(
database = args['GLUE_DATABASE_NAME'],
table_name = args['RAW_DATA_GLUE_TABLE_NAME'],
push_down_predicate = 'month >= 2'
).toDF()
df.show()
job.commit()
create_dynamic_frame.from_catalog()
にて以下のフィルターなしおよび4通りの数値の大小の条件のPushdown Predicatesによりデータソースからのデータ取得を試すようにしています。
- フィルターなし
day >= 17
day < 17
month >= 1
month >= 2
スクリプトをS3バケットにアップロードします。
% aws s3 cp devices-data-etl.py \
${DATA_ANALYTICS_BUCKET}/glue-job-script/devices-data-etl.py
動作確認
次のパーティションパスを持つオブジェクトを、データソースのS3ロケーションにそれぞれ作成します。
year=2021/month=01/day=17
year=2021/month=01/day=18
raw-data.json
{"device_id": "3ff9c44a", "timestamp": 1609348014, "state": true}
% aws s3 cp raw-data.json \
${RAW_DATA_BUCKET}/raw-data/year=2021/month=01/day=17/raw-data.json
% aws s3 cp raw-data.json \
${RAW_DATA_BUCKET}/raw-data/year=2021/month=01/day=18/raw-data.json
作成したオブジェクトのパーティションを作成します。
% aws athena start-query-execution \
--query-string "MSCK REPAIR TABLE ${GLUE_TABLE_NAME}" \
--work-group ${ATHENA_WORK_GROUP_NAME} \
--query-execution-context \
Database=${GLUE_DATABASE_NAME},Catalog=AwsDataCatalog
ジョブを実行します。
% JobRunId=$(
aws glue start-job-run --job-name ${ETL_GLUE_JOB_NAME} \
--query JobRunId \
--output text
)
ジョブのスクリプト中でデータソースからの取得データをdf.show()
で出力するようにしています。CloudWatch Logsへの出力結果を取得してみます。
% aws logs get-log-events \
--log-group-name /aws-glue/jobs/output \
--log-stream-name ${JobRunId} \
--query 'events[].[message]' \
--output text
1. フィルターなし
+---------+----------+-----+----+-----+---+
|device_id| timestamp|state|year|month|day|
+---------+----------+-----+----+-----+---+
| 3ff9c44a|1609348014| true|2021| 01| 17|
| 3ff9c44a|1609348014| true|2021| 01| 18|
+---------+----------+-----+----+-----+---+
2. day >= 17
+---------+----------+-----+----+-----+---+
|device_id| timestamp|state|year|month|day|
+---------+----------+-----+----+-----+---+
| 3ff9c44a|1609348014| true|2021| 01| 18|
+---------+----------+-----+----+-----+---+
3. day < 17
++
||
++
++
4. month >= 1
+---------+----------+-----+----+-----+---+
|device_id| timestamp|state|year|month|day|
+---------+----------+-----+----+-----+---+
| 3ff9c44a|1609348014| true|2021| 01| 17|
| 3ff9c44a|1609348014| true|2021| 01| 18|
+---------+----------+-----+----+-----+---+
5. month >= 2
++
||
++
++
パーティションキーがstring型で、また0埋めの有無に関わらずとも、Pushdown Predicatesの条件で指定した通り、パーティションキーがint型であるかのように動作してフィルターされるようです。
おわりに
Glueテーブルでstring型として定義したパーティションキーをPushdown Predicatesの数値の大小の条件でフィルターできるのか確認してみました。
これにより例えば「Amazon Kinesis Data Firehoseが作成するパーティション値は既定で0埋めとなるため、Glueテーブルでのパーティションキーの型指定はstring型とするが、Glueジョブ内ではPushdown Predicatesの数値の大小の条件でフィルターをしたい」なんて場合に対応可能となることが確認できました。
参考
以上