[AWS Glue]string型のパーティションキーをPushdown Predicatesの数値の大小の条件でフィルターできるのか確認してみた

[AWS Glue]string型のパーティションキーをPushdown Predicatesの数値の大小の条件でフィルターできるのか確認してみた

Clock Icon2021.01.20

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、CX事業本部の若槻です。

AWS Glueでは、Pushdown Predicatesを使うことによりジョブでデータソースからデータを取得する際にパーティションキーによるフィルターを行うことができます。

このPushdown PredicatesはHive標準の機能なのですが、複数条件や数値の大小など様々な条件を指定することが可能です。

そこでふと「Glueテーブルでstring型として定義したパーティションキーをPushdown Predicatesでday <= 18month > 1のような数値の大小の条件でフィルターできるのか?」というのが気になったので確認してみました。

やってみた

まず最初に以降のコマンド実行で使用する変数を定義しておきます。

% AWS_REGION=ap-northeast-1
% ACCOUNT_ID=$(aws sts get-caller-identity | jq -r ".Account")
% RAW_DATA_BUCKET=s3://devices-raw-data-${ACCOUNT_ID}-${AWS_REGION}
% DATA_ANALYTICS_BUCKET=s3://devices-data-analytics-${ACCOUNT_ID}-${AWS_REGION}
% GLUE_DATABASE_NAME=devices_data_analystics
% RAW_DATA_GLUE_TABLE_NAME=devices_raw_data

環境作成

CloudFormationスタック

CloudFormationスタックのテンプレートです。

AWSTemplateFormatVersion: '2010-09-09'

Resources:
  DevicesRawDataBucket:
    Type: AWS::S3::Bucket
    Properties: 
      BucketName: !Sub devices-raw-data-${AWS::AccountId}-${AWS::Region}

  DevicesDataAnalyticsBucket:
    Type: AWS::S3::Bucket
    Properties: 
      BucketName: !Sub devices-data-analytics-${AWS::AccountId}-${AWS::Region}

  DevicesDataAnalyticsGlueDatabase:
    Type: AWS::Glue::Database
    Properties: 
      CatalogId: !Ref AWS::AccountId
      DatabaseInput:
        Name: devices_data_analystics

  DevicesRawDataGlueTable:
    Type: AWS::Glue::Table
    Properties:
      CatalogId: !Ref AWS::AccountId
      DatabaseName: !Ref DevicesDataAnalyticsGlueDatabase
      TableInput:
        Name: devices_raw_data
        TableType: EXTERNAL_TABLE
        Parameters:
          has_encrypted_data: false
          serialization.encoding: utf-8
          EXTERNAL: true
        StorageDescriptor:
          OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
          Columns:
            - Name: device_id
              Type: string
            - Name: timestamp
              Type: bigint
            - Name: state
              Type: boolean
          InputFormat: org.apache.hadoop.mapred.TextInputFormat
          Location: !Sub s3://${DevicesRawDataBucket}/raw-data
          SerdeInfo:
            Parameters:
              paths: "device_id, timestamp, state"
            SerializationLibrary: org.apache.hive.hcatalog.data.JsonSerDe
        PartitionKeys:
          - Name: year
            Type: string
          - Name: month
            Type: string
          - Name: day
            Type: string

  ExecuteDevicesDataETLGlueJobRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          -
            Effect: Allow
            Principal:
              Service:
                - glue.amazonaws.com
            Action:
              - sts:AssumeRole
      Policies:
        - PolicyName: devices-data-etl-glue-job-policy
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              -
                Effect: Allow
                Action:
                  - glue:StartJobRun
                Resource:
                 - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:job/devices-data-etl
              -
                Effect: Allow
                Action:
                  - glue:GetPartition
                  - glue:GetPartitions
                  - glue:GetTable
                Resource:
                  - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog
                  - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${DevicesDataAnalyticsGlueDatabase}
                  - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/${DevicesDataAnalyticsGlueDatabase}/${DevicesRawDataGlueTable}
              -
                Effect: Allow
                Action:
                  - s3:ListBucket
                  - s3:GetBucketLocation
                Resource: 
                  - arn:aws:s3:::*
              -
                Effect: Allow
                Action:
                  - logs:CreateLogStream
                  - logs:CreateLogGroup
                  - logs:PutLogEvents
                Resource: !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws-glue/jobs/*
              -
                Effect: Allow
                Action:
                  - s3:GetObject
                Resource:
                  - !Sub arn:aws:s3:::${DevicesRawDataBucket}/raw-data/*
                  - !Sub arn:aws:s3:::${DevicesDataAnalyticsBucket}/glue-job-script/devices-data-etl.py
              -
                Effect: Allow
                Action:
                  - s3:GetObject
                  - s3:PutObject
                Resource:
                  - !Sub arn:aws:s3:::${DevicesDataAnalyticsBucket}/glue-job-temp-dir/*

  DevicesDataETLGlueJob:
    Type: AWS::Glue::Job
    Properties:
      Name: devices-data-etl
      Command:
        Name: glueetl
        PythonVersion: 3
        ScriptLocation: !Sub s3://${DevicesDataAnalyticsBucket}/glue-job-script/devices-data-etl.py
      DefaultArguments:
        --job-language: python
        --job-bookmark-option: job-bookmark-enable
        --TempDir: !Sub s3://${DevicesDataAnalyticsBucket}/glue-job-temp-dir
        --GLUE_DATABASE_NAME: !Sub ${DevicesDataAnalyticsGlueDatabase}
        --RAW_DATA_GLUE_TABLE_NAME: !Sub ${DevicesRawDataGlueTable}
      GlueVersion: 2.0
      ExecutionProperty:
        MaxConcurrentRuns: 1
      MaxRetries: 0
      Role: !Ref ExecuteDevicesDataETLGlueJobRole

データソースとなるGlueテーブルのリソース定義DevicesRawDataGlueTableで、Properties.TableInput.PartitionKeysにて、yearmonthdayの3つのパーティションキーをstring型で定義しています。

スタックをデプロイします。

% aws cloudformation deploy \
  --template-file template.yaml \
  --stack-name devices-data-analytics-stack \
  --capabilities CAPABILITY_NAMED_IAM \
  --no-fail-on-empty-changeset

Glueジョブスクリプト

データソースからデータを取得するだけのPySparkスクリプトです。

import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(
  sys.argv,
  [
    'JOB_NAME',
    'GLUE_DATABASE_NAME',
    'RAW_DATA_GLUE_TABLE_NAME'
  ]
)

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

print('1. フィルターなし')
df = glueContext.create_dynamic_frame.from_catalog(
  database = args['GLUE_DATABASE_NAME'],
  table_name = args['RAW_DATA_GLUE_TABLE_NAME']
).toDF()
df.show()

print('2. day >= 17')
df = glueContext.create_dynamic_frame.from_catalog(
  database = args['GLUE_DATABASE_NAME'],
  table_name = args['RAW_DATA_GLUE_TABLE_NAME'],
  push_down_predicate = 'day >= 18'
).toDF()
df.show()

print('3. day < 17')
df = glueContext.create_dynamic_frame.from_catalog(
  database = args['GLUE_DATABASE_NAME'],
  table_name = args['RAW_DATA_GLUE_TABLE_NAME'],
  push_down_predicate = 'day < 17'
).toDF()
df.show()

print('4. month >= 1')
df = glueContext.create_dynamic_frame.from_catalog(
  database = args['GLUE_DATABASE_NAME'],
  table_name = args['RAW_DATA_GLUE_TABLE_NAME'],
  push_down_predicate = 'month >= 1'
).toDF()
df.show()

print('5. month >= 2')
df = glueContext.create_dynamic_frame.from_catalog(
  database = args['GLUE_DATABASE_NAME'],
  table_name = args['RAW_DATA_GLUE_TABLE_NAME'],
  push_down_predicate = 'month >= 2'
).toDF()
df.show()

job.commit()

create_dynamic_frame.from_catalog()にて以下のフィルターなしおよび4通りの数値の大小の条件のPushdown Predicatesによりデータソースからのデータ取得を試すようにしています。

  1. フィルターなし
  2. day >= 17
  3. day < 17
  4. month >= 1
  5. month >= 2

スクリプトをS3バケットにアップロードします。

% aws s3 cp devices-data-etl.py \
  ${DATA_ANALYTICS_BUCKET}/glue-job-script/devices-data-etl.py

動作確認

次のパーティションパスを持つオブジェクトを、データソースのS3ロケーションにそれぞれ作成します。

  • year=2021/month=01/day=17
  • year=2021/month=01/day=18
{"device_id": "3ff9c44a", "timestamp": 1609348014, "state": true}
% aws s3 cp raw-data.json \
  ${RAW_DATA_BUCKET}/raw-data/year=2021/month=01/day=17/raw-data.json
% aws s3 cp raw-data.json \
  ${RAW_DATA_BUCKET}/raw-data/year=2021/month=01/day=18/raw-data.json

作成したオブジェクトのパーティションを作成します。

% aws athena start-query-execution \
  --query-string "MSCK REPAIR TABLE ${GLUE_TABLE_NAME}" \
  --work-group ${ATHENA_WORK_GROUP_NAME} \
  --query-execution-context \
    Database=${GLUE_DATABASE_NAME},Catalog=AwsDataCatalog

ジョブを実行します。

% JobRunId=$(
  aws glue start-job-run --job-name ${ETL_GLUE_JOB_NAME} \
    --query JobRunId \
    --output text
)

ジョブのスクリプト中でデータソースからの取得データをdf.show()で出力するようにしています。CloudWatch Logsへの出力結果を取得してみます。

% aws logs get-log-events \
  --log-group-name /aws-glue/jobs/output \
  --log-stream-name ${JobRunId} \
  --query 'events[].[message]' \
  --output text

1. フィルターなし

+---------+----------+-----+----+-----+---+
|device_id| timestamp|state|year|month|day|
+---------+----------+-----+----+-----+---+
| 3ff9c44a|1609348014| true|2021|   01| 17|
| 3ff9c44a|1609348014| true|2021|   01| 18|
+---------+----------+-----+----+-----+---+

2. day >= 17

+---------+----------+-----+----+-----+---+
|device_id| timestamp|state|year|month|day|
+---------+----------+-----+----+-----+---+
| 3ff9c44a|1609348014| true|2021|   01| 18|
+---------+----------+-----+----+-----+---+

3. day < 17

++
||
++
++

4. month >= 1

+---------+----------+-----+----+-----+---+
|device_id| timestamp|state|year|month|day|
+---------+----------+-----+----+-----+---+
| 3ff9c44a|1609348014| true|2021|   01| 17|
| 3ff9c44a|1609348014| true|2021|   01| 18|
+---------+----------+-----+----+-----+---+

5. month >= 2

++
||
++
++

パーティションキーがstring型で、また0埋めの有無に関わらずとも、Pushdown Predicatesの条件で指定した通り、パーティションキーがint型であるかのように動作してフィルターされるようです。

おわりに

Glueテーブルでstring型として定義したパーティションキーをPushdown Predicatesの数値の大小の条件でフィルターできるのか確認してみました。

これにより例えば「Amazon Kinesis Data Firehoseが作成するパーティション値は既定で0埋めとなるため、Glueテーブルでのパーティションキーの型指定はstring型とするが、Glueジョブ内ではPushdown Predicatesの数値の大小の条件でフィルターをしたい」なんて場合に対応可能となることが確認できました。

参考

以上

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.