[AWS Glue]string型のパーティションキーをPushdown Predicatesの数値の大小の条件でフィルターできるのか確認してみた
こんにちは、CX事業本部の若槻です。
AWS Glueでは、Pushdown Predicatesを使うことによりジョブでデータソースからデータを取得する際にパーティションキーによるフィルターを行うことができます。
- Managing Partitions for ETL Output in AWS Glue - AWS Glue
- AWS Glue の Pushdown Predicates を用いてすべてのファイルを読み込むことなく、パーティションをプレフィルタリングする | Developers.IO
このPushdown PredicatesはHive標準の機能なのですが、複数条件や数値の大小など様々な条件を指定することが可能です。
そこでふと「Glueテーブルでstring型として定義したパーティションキーをPushdown Predicatesでday <= 18
やmonth > 1
のような数値の大小の条件でフィルターできるのか?」というのが気になったので確認してみました。
やってみた
まず最初に以降のコマンド実行で使用する変数を定義しておきます。
% AWS_REGION=ap-northeast-1 % ACCOUNT_ID=$(aws sts get-caller-identity | jq -r ".Account") % RAW_DATA_BUCKET=s3://devices-raw-data-${ACCOUNT_ID}-${AWS_REGION} % DATA_ANALYTICS_BUCKET=s3://devices-data-analytics-${ACCOUNT_ID}-${AWS_REGION} % GLUE_DATABASE_NAME=devices_data_analystics % RAW_DATA_GLUE_TABLE_NAME=devices_raw_data
環境作成
CloudFormationスタック
CloudFormationスタックのテンプレートです。
AWSTemplateFormatVersion: '2010-09-09' Resources: DevicesRawDataBucket: Type: AWS::S3::Bucket Properties: BucketName: !Sub devices-raw-data-${AWS::AccountId}-${AWS::Region} DevicesDataAnalyticsBucket: Type: AWS::S3::Bucket Properties: BucketName: !Sub devices-data-analytics-${AWS::AccountId}-${AWS::Region} DevicesDataAnalyticsGlueDatabase: Type: AWS::Glue::Database Properties: CatalogId: !Ref AWS::AccountId DatabaseInput: Name: devices_data_analystics DevicesRawDataGlueTable: Type: AWS::Glue::Table Properties: CatalogId: !Ref AWS::AccountId DatabaseName: !Ref DevicesDataAnalyticsGlueDatabase TableInput: Name: devices_raw_data TableType: EXTERNAL_TABLE Parameters: has_encrypted_data: false serialization.encoding: utf-8 EXTERNAL: true StorageDescriptor: OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat Columns: - Name: device_id Type: string - Name: timestamp Type: bigint - Name: state Type: boolean InputFormat: org.apache.hadoop.mapred.TextInputFormat Location: !Sub s3://${DevicesRawDataBucket}/raw-data SerdeInfo: Parameters: paths: "device_id, timestamp, state" SerializationLibrary: org.apache.hive.hcatalog.data.JsonSerDe PartitionKeys: - Name: year Type: string - Name: month Type: string - Name: day Type: string ExecuteDevicesDataETLGlueJobRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - glue.amazonaws.com Action: - sts:AssumeRole Policies: - PolicyName: devices-data-etl-glue-job-policy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - glue:StartJobRun Resource: - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:job/devices-data-etl - Effect: Allow Action: - glue:GetPartition - glue:GetPartitions - glue:GetTable Resource: - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${DevicesDataAnalyticsGlueDatabase} - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/${DevicesDataAnalyticsGlueDatabase}/${DevicesRawDataGlueTable} - Effect: Allow Action: - s3:ListBucket - s3:GetBucketLocation Resource: - arn:aws:s3:::* - Effect: Allow Action: - logs:CreateLogStream - logs:CreateLogGroup - logs:PutLogEvents Resource: !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws-glue/jobs/* - Effect: Allow Action: - s3:GetObject Resource: - !Sub arn:aws:s3:::${DevicesRawDataBucket}/raw-data/* - !Sub arn:aws:s3:::${DevicesDataAnalyticsBucket}/glue-job-script/devices-data-etl.py - Effect: Allow Action: - s3:GetObject - s3:PutObject Resource: - !Sub arn:aws:s3:::${DevicesDataAnalyticsBucket}/glue-job-temp-dir/* DevicesDataETLGlueJob: Type: AWS::Glue::Job Properties: Name: devices-data-etl Command: Name: glueetl PythonVersion: 3 ScriptLocation: !Sub s3://${DevicesDataAnalyticsBucket}/glue-job-script/devices-data-etl.py DefaultArguments: --job-language: python --job-bookmark-option: job-bookmark-enable --TempDir: !Sub s3://${DevicesDataAnalyticsBucket}/glue-job-temp-dir --GLUE_DATABASE_NAME: !Sub ${DevicesDataAnalyticsGlueDatabase} --RAW_DATA_GLUE_TABLE_NAME: !Sub ${DevicesRawDataGlueTable} GlueVersion: 2.0 ExecutionProperty: MaxConcurrentRuns: 1 MaxRetries: 0 Role: !Ref ExecuteDevicesDataETLGlueJobRole
データソースとなるGlueテーブルのリソース定義DevicesRawDataGlueTable
で、Properties.TableInput.PartitionKeys
にて、year
、month
、day
の3つのパーティションキーをstring
型で定義しています。
スタックをデプロイします。
% aws cloudformation deploy \ --template-file template.yaml \ --stack-name devices-data-analytics-stack \ --capabilities CAPABILITY_NAMED_IAM \ --no-fail-on-empty-changeset
Glueジョブスクリプト
データソースからデータを取得するだけのPySparkスクリプトです。
import sys from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job args = getResolvedOptions( sys.argv, [ 'JOB_NAME', 'GLUE_DATABASE_NAME', 'RAW_DATA_GLUE_TABLE_NAME' ] ) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) print('1. フィルターなし') df = glueContext.create_dynamic_frame.from_catalog( database = args['GLUE_DATABASE_NAME'], table_name = args['RAW_DATA_GLUE_TABLE_NAME'] ).toDF() df.show() print('2. day >= 17') df = glueContext.create_dynamic_frame.from_catalog( database = args['GLUE_DATABASE_NAME'], table_name = args['RAW_DATA_GLUE_TABLE_NAME'], push_down_predicate = 'day >= 18' ).toDF() df.show() print('3. day < 17') df = glueContext.create_dynamic_frame.from_catalog( database = args['GLUE_DATABASE_NAME'], table_name = args['RAW_DATA_GLUE_TABLE_NAME'], push_down_predicate = 'day < 17' ).toDF() df.show() print('4. month >= 1') df = glueContext.create_dynamic_frame.from_catalog( database = args['GLUE_DATABASE_NAME'], table_name = args['RAW_DATA_GLUE_TABLE_NAME'], push_down_predicate = 'month >= 1' ).toDF() df.show() print('5. month >= 2') df = glueContext.create_dynamic_frame.from_catalog( database = args['GLUE_DATABASE_NAME'], table_name = args['RAW_DATA_GLUE_TABLE_NAME'], push_down_predicate = 'month >= 2' ).toDF() df.show() job.commit()
create_dynamic_frame.from_catalog()
にて以下のフィルターなしおよび4通りの数値の大小の条件のPushdown Predicatesによりデータソースからのデータ取得を試すようにしています。
- フィルターなし
day >= 17
day < 17
month >= 1
month >= 2
スクリプトをS3バケットにアップロードします。
% aws s3 cp devices-data-etl.py \ ${DATA_ANALYTICS_BUCKET}/glue-job-script/devices-data-etl.py
動作確認
次のパーティションパスを持つオブジェクトを、データソースのS3ロケーションにそれぞれ作成します。
year=2021/month=01/day=17
year=2021/month=01/day=18
{"device_id": "3ff9c44a", "timestamp": 1609348014, "state": true}
% aws s3 cp raw-data.json \ ${RAW_DATA_BUCKET}/raw-data/year=2021/month=01/day=17/raw-data.json % aws s3 cp raw-data.json \ ${RAW_DATA_BUCKET}/raw-data/year=2021/month=01/day=18/raw-data.json
作成したオブジェクトのパーティションを作成します。
% aws athena start-query-execution \ --query-string "MSCK REPAIR TABLE ${GLUE_TABLE_NAME}" \ --work-group ${ATHENA_WORK_GROUP_NAME} \ --query-execution-context \ Database=${GLUE_DATABASE_NAME},Catalog=AwsDataCatalog
ジョブを実行します。
% JobRunId=$( aws glue start-job-run --job-name ${ETL_GLUE_JOB_NAME} \ --query JobRunId \ --output text )
ジョブのスクリプト中でデータソースからの取得データをdf.show()
で出力するようにしています。CloudWatch Logsへの出力結果を取得してみます。
% aws logs get-log-events \ --log-group-name /aws-glue/jobs/output \ --log-stream-name ${JobRunId} \ --query 'events[].[message]' \ --output text 1. フィルターなし +---------+----------+-----+----+-----+---+ |device_id| timestamp|state|year|month|day| +---------+----------+-----+----+-----+---+ | 3ff9c44a|1609348014| true|2021| 01| 17| | 3ff9c44a|1609348014| true|2021| 01| 18| +---------+----------+-----+----+-----+---+ 2. day >= 17 +---------+----------+-----+----+-----+---+ |device_id| timestamp|state|year|month|day| +---------+----------+-----+----+-----+---+ | 3ff9c44a|1609348014| true|2021| 01| 18| +---------+----------+-----+----+-----+---+ 3. day < 17 ++ || ++ ++ 4. month >= 1 +---------+----------+-----+----+-----+---+ |device_id| timestamp|state|year|month|day| +---------+----------+-----+----+-----+---+ | 3ff9c44a|1609348014| true|2021| 01| 17| | 3ff9c44a|1609348014| true|2021| 01| 18| +---------+----------+-----+----+-----+---+ 5. month >= 2 ++ || ++ ++
パーティションキーがstring型で、また0埋めの有無に関わらずとも、Pushdown Predicatesの条件で指定した通り、パーティションキーがint型であるかのように動作してフィルターされるようです。
おわりに
Glueテーブルでstring型として定義したパーティションキーをPushdown Predicatesの数値の大小の条件でフィルターできるのか確認してみました。
これにより例えば「Amazon Kinesis Data Firehoseが作成するパーティション値は既定で0埋めとなるため、Glueテーブルでのパーティションキーの型指定はstring型とするが、Glueジョブ内ではPushdown Predicatesの数値の大小の条件でフィルターをしたい」なんて場合に対応可能となることが確認できました。
参考
以上