[AWS Glue]クローラーとジョブを組み合わせて、パーティション分割されたデータソースをパーティション分割したデータターゲットに追加するETLフローを作ってみた

[AWS Glue]クローラーとジョブを組み合わせて、パーティション分割されたデータソースをパーティション分割したデータターゲットに追加するETLフローを作ってみた

Clock Icon2021.01.14

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、CX事業本部の若槻です。

AWS Glueでは、パーティション分割を行うことによりデータの整理や効率的なクエリ実行を行うことが可能です。

今回は、クローラーとジョブを組み合わせて、パーティション分割されたデータソースをパーティション分割したデータターゲットに追加するETLフローを作ってみました。

作るもの

  • データソースに追加されたデータをもとにGlueテーブルにパーティションを追加するクローラー
  • パーティション分割されたデータソースをパーティション分割したデータターゲットに追加するジョブ

これらクローラーとジョブで下記のようなワークフローを作成します。まずクローラーを実行し、正常に実行が完了したらジョブが実行されるようにします。

やってみた

まず最初に以降のコマンド実行で使用する変数を定義しておきます。

% AWS_REGION=ap-northeast-1
% ACCOUNT_ID=$(aws sts get-caller-identity | jq -r ".Account")
% RAW_DATA_BUCKET=s3://devices-raw-data-${ACCOUNT_ID}-${AWS_REGION}
% DATA_ANALYTICS_BUCKET=s3://devices-data-analytics-${ACCOUNT_ID}-${AWS_REGION}
% GLUE_DATABASE_NAME=devices_data_analystics
% RAW_DATA_GLUE_TABLE_NAME=devices_raw_data
% INTEGRATED_DATA_GLUE_TABLE_NAME=devices_integrated_data
% WORKFLOW_NAME=devices-analytics
% ATHENA_WORK_GROUP_NAME=devices-data-analytics

環境構築

動作確認環境を作成します。

CloudFormationスタック

CloudFormationスタックのテンプレートです。(長いため折りたたんでいます。)

クリックで展開
AWSTemplateFormatVersion: '2010-09-09'

Resources:
  DevicesRawDataBucket:
    Type: AWS::S3::Bucket
    Properties: 
      BucketName: !Sub devices-raw-data-${AWS::AccountId}-${AWS::Region}

  DevicesDataAnalyticsAthenaWorkGroup:
    Type: AWS::Athena::WorkGroup
    Properties:
      Name: devices-data-analytics
      WorkGroupConfiguration:
        ResultConfiguration:
          OutputLocation: !Sub s3://${DevicesRawDataBucket}/query-result
        EnforceWorkGroupConfiguration: true
        PublishCloudWatchMetricsEnabled: true

  DevicesDataAnalyticsBucket:
    Type: AWS::S3::Bucket
    Properties: 
      BucketName: !Sub devices-data-analytics-${AWS::AccountId}-${AWS::Region}

  DevicesDataAnalyticsGlueDatabase:
    Type: AWS::Glue::Database
    Properties: 
      CatalogId: !Ref AWS::AccountId
      DatabaseInput:
        Name: devices_data_analystics

  DevicesRawDataGlueTable:
    Type: AWS::Glue::Table
    Properties:
      CatalogId: !Ref AWS::AccountId
      DatabaseName: !Ref DevicesDataAnalyticsGlueDatabase
      TableInput:
        Name: devices_raw_data
        TableType: EXTERNAL_TABLE
        Parameters:
          has_encrypted_data: false
          serialization.encoding: utf-8
          EXTERNAL: true
        StorageDescriptor:
          OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
          Columns:
            - Name: device_id
              Type: string
            - Name: timestamp
              Type: bigint
            - Name: state
              Type: boolean
          InputFormat: org.apache.hadoop.mapred.TextInputFormat
          Location: !Sub s3://${DevicesRawDataBucket}/raw-data
          SerdeInfo:
            Parameters:
              paths: "device_id, timestamp, state"
            SerializationLibrary: org.apache.hive.hcatalog.data.JsonSerDe
        PartitionKeys:
          - Name: year
            Type: string
          - Name: month
            Type: string
          - Name: day
            Type: string

  DevicesIntegratedDataGlueTable:
    Type: AWS::Glue::Table
    Properties:
      CatalogId: !Ref AWS::AccountId
      DatabaseName: !Ref DevicesDataAnalyticsGlueDatabase
      TableInput:
        Name: devices_integrated_data
        TableType: EXTERNAL_TABLE
        Parameters:
          has_encrypted_data: false
          serialization.encoding: utf-8
          EXTERNAL: true
        StorageDescriptor:
          OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
          Columns:
            - Name: device_id
              Type: string
            - Name: timestamp
              Type: bigint
            - Name: state
              Type: boolean
            - Name: partition_date
              Type: string
          InputFormat: org.apache.hadoop.mapred.TextInputFormat
          Location: !Sub s3://${DevicesDataAnalyticsBucket}/integrated-data
          SerdeInfo:
            Parameters:
              paths: "device_id, timestamp, state, partition_date"
            SerializationLibrary: org.apache.hive.hcatalog.data.JsonSerDe
        PartitionKeys:
          - Name: year
            Type: string
          - Name: month
            Type: string
          - Name: day
            Type: string

  ExecuteDevicesDataETLGlueJobRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          -
            Effect: Allow
            Principal:
              Service:
                - glue.amazonaws.com
            Action:
              - sts:AssumeRole
      Policies:
        - PolicyName: devices-data-etl-glue-job-policy
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              -
                Effect: Allow
                Action:
                  - glue:StartJobRun
                Resource:
                 - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:job/devices-data-etl
              -
                Effect: Allow
                Action:
                  - glue:GetPartition
                  - glue:GetPartitions
                  - glue:GetTable
                  - glue:BatchCreatePartition
                Resource:
                  - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog
                  - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${DevicesDataAnalyticsGlueDatabase}
                  - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/${DevicesDataAnalyticsGlueDatabase}/${DevicesRawDataGlueTable}
                  - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/${DevicesDataAnalyticsGlueDatabase}/${DevicesIntegratedDataGlueTable}
              -
                Effect: Allow
                Action:
                  - s3:ListBucket
                  - s3:GetBucketLocation
                Resource: 
                  - arn:aws:s3:::*
              -
                Effect: Allow
                Action:
                  - logs:CreateLogStream
                  - logs:CreateLogGroup
                  - logs:PutLogEvents
                Resource: !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws-glue/jobs/*
              -
                Effect: Allow
                Action:
                  - s3:GetObject
                Resource:
                  - !Sub arn:aws:s3:::${DevicesRawDataBucket}/raw-data/*
                  - !Sub arn:aws:s3:::${DevicesDataAnalyticsBucket}/glue-job-script/devices-data-etl.py
              -
                Effect: Allow
                Action:
                  - s3:GetObject
                  - s3:PutObject
                Resource:
                  - !Sub arn:aws:s3:::${DevicesDataAnalyticsBucket}/glue-job-temp-dir/*
              -
                Effect: Allow
                Action:
                  - s3:PutObject
                Resource:
                  - !Sub arn:aws:s3:::${DevicesDataAnalyticsBucket}/integrated-data/*

  DevicesDataETLGlueJob:
    Type: AWS::Glue::Job
    Properties:
      Name: devices-data-etl
      Command:
        Name: glueetl
        PythonVersion: 3
        ScriptLocation: !Sub s3://${DevicesDataAnalyticsBucket}/glue-job-script/devices-data-etl.py
      DefaultArguments:
        --job-language: python
        --job-bookmark-option: job-bookmark-enable
        --TempDir: !Sub s3://${DevicesDataAnalyticsBucket}/glue-job-temp-dir
        --GLUE_DATABASE_NAME: !Sub ${DevicesDataAnalyticsGlueDatabase}
        --RAW_DATA_GLUE_TABLE_NAME: !Sub ${DevicesRawDataGlueTable}
        --INTEGRATED_DATA_GLUE_TABLE_NAME: !Sub ${DevicesIntegratedDataGlueTable}
      GlueVersion: 2.0
      ExecutionProperty:
        MaxConcurrentRuns: 1
      MaxRetries: 0
      Role: !Ref ExecuteDevicesDataETLGlueJobRole

  DevicesDataAnalyticsGlueCrawler:
    Type: AWS::Glue::Crawler
    Properties:
      Role: !Sub arn:aws:iam::${AWS::AccountId}:role/service-role/AWSGlueServiceRole-DefaultRole
      Targets:
        CatalogTargets:
          - DatabaseName: !Ref DevicesDataAnalyticsGlueDatabase
            Tables:
              - !Ref DevicesRawDataGlueTable
      SchemaChangePolicy:
        DeleteBehavior: LOG

  DevicesAnalyticsGlueWorkflow:
    Type: AWS::Glue::Workflow
    Properties: 
      Name: devices-analytics

  DevicesDataAnalyticsGlueCrawlerTrigger:
    Type: AWS::Glue::Trigger
    Properties:
      WorkflowName: !Ref DevicesAnalyticsGlueWorkflow
      Type: ON_DEMAND
      Actions:
        - CrawlerName: !Ref DevicesDataAnalyticsGlueCrawler

  DevicesDataETLGlueJobTrigger:
    Type: AWS::Glue::Trigger
    Properties:
      WorkflowName: !Ref DevicesAnalyticsGlueWorkflow
      Type: CONDITIONAL
      Actions:
        - JobName: !Ref DevicesDataETLGlueJob
      Predicate:
        Conditions:
          - LogicalOperator: EQUALS
            CrawlerName: !Ref DevicesDataAnalyticsGlueCrawler
            CrawlState: SUCCEEDED
      StartOnCreation: true

データターゲットへの書き込み時にGlueテーブルにパーティションを作成したい場合はジョブの実行ロールでglue:BatchCreatePartitionアクションを許可する必要があるため、リソース定義ExecuteDevicesDataETLGlueJobRoleで許可しています。

スタックをデプロイします。

% aws cloudformation deploy \
  --template-file template.yaml \
  --stack-name devices-data-analytics-stack \
  --capabilities CAPABILITY_NAMED_IAM \
  --no-fail-on-empty-changeset

Glueジョブスクリプト

データソースから取得したデータに対して、パーティションスキーマの値を使用したpartition_date列を追加し、データターゲットに書き込むPySparkスクリプトです。(長いため折りたたんでいます。)

クリックで展開
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import concat, lit

args = getResolvedOptions(
  sys.argv,
  [
    'JOB_NAME',
    'GLUE_DATABASE_NAME',
    'RAW_DATA_GLUE_TABLE_NAME',
    'INTEGRATED_DATA_GLUE_TABLE_NAME'
  ]
)

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

df = glueContext.create_dynamic_frame.from_catalog(
  database = args['GLUE_DATABASE_NAME'],
  table_name = args['RAW_DATA_GLUE_TABLE_NAME'],
  transformation_ctx = 'datasource'
).toDF()

df = df.withColumn('partition_date',
  concat(
    df.year,
    lit('/'),
    df.month,
    lit('/'),
    df.day
  )
)

dyf = DynamicFrame.fromDF(
  df,
  glueContext,
  'integrated_data'
)

additionalOptions = {
  "enableUpdateCatalog": True
}

additionalOptions['partitionKeys'] = [
  'year', 'month', 'day'
]

glueContext.write_dynamic_frame.from_catalog(
  frame = dyf,
  database = args['GLUE_DATABASE_NAME'],
  table_name = args['INTEGRATED_DATA_GLUE_TABLE_NAME'],
  transformation_ctx = 'datasink',
  additional_options=additionalOptions
)

job.commit()

データターゲットへの書き込み時に指定のパーティションを作成する場合は、write_dynamic_frame.from_catalog()additional_optionsを下記のように指定します。

additionalOptions = {
  "enableUpdateCatalog": True
}

additionalOptions['partitionKeys'] = [
  'year', 'month', 'day'
]

glueContext.write_dynamic_frame.from_catalog(
  frame = dyf,
  database = args['GLUE_DATABASE_NAME'],
  table_name = args['INTEGRATED_DATA_GLUE_TABLE_NAME'],
  transformation_ctx = 'datasink',
  additional_options=additionalOptions
)

スクリプトをS3バケットにアップロードします。

% aws s3 cp devices-data-etl.py \
  ${DATA_ANALYTICS_BUCKET}/glue-job-script/devices-data-etl.py

データソースへのデータ作成

データソースの場所となるS3バケットのパスにデータが記載されたパーティションパスを持つオブジェクトを作成します。

{"device_id": "3ff9c44a", "timestamp": 1609348014, "state": true}
% aws s3 cp raw-data.json \
  ${RAW_DATA_BUCKET}/raw-data/year=2021/month=01/day=13/raw-data.json

この時点ではデータソースのGlueテーブルにパーティションは一つも作成されていません。

% aws glue get-partitions \
  --database-name ${GLUE_DATABASE_NAME} \
  --table-name ${RAW_DATA_GLUE_TABLE_NAME}
{
    "Partitions": []
}

動作確認

Glueワークフローを実行します。

% RunId=$(
  aws glue start-workflow-run --name ${WORKFLOW_NAME} \
    --query RunId \
    --output text
)

ワークフローの実行のStatistics.SucceededActions2となれば、すべてのアクションの実行が正常に完了しています。

% aws glue get-workflow-run \
  --name ${WORKFLOW_NAME} \
  --run-id ${RunId} \
  --query Run.Statistics

{
    "TotalActions": 2,
    "TimeoutActions": 0,
    "FailedActions": 0,
    "StoppedActions": 0,
    "SucceededActions": 2,
    "RunningActions": 0
}

データターゲットのGlueテーブルのパーティションを取得してみると、データソースと同じパーティションが作成されています。

% aws glue get-partitions \
  --database-name ${GLUE_DATABASE_NAME} \
  --table-name ${INTEGRATED_DATA_GLUE_TABLE_NAME} \
  --query 'Partitions[0].Values'

[
    "2021",
    "01",
    "13"
]

データターゲットからS3オブジェクトを取得してみると、ちゃんとパーティション分割されてオブジェクトが作成されています。

% aws s3 ls ${DATA_ANALYTICS_BUCKET}/integrated-data --recursive

2021-01-14 19:55:09         90 integrated-data/year=2021/month=01/day=13/run-datasink-4-part-r-00000

AthenaでクエリSELECT * FROM ${RAW_DATA_GLUE_TABLE_NAME}を実行してデータターゲットのデータを取得してみます。

% QueryExecutionId=$( \
  aws athena start-query-execution \
    --query-string "SELECT * FROM ${INTEGRATED_DATA_GLUE_TABLE_NAME}" \
    --work-group ${ATHENA_WORK_GROUP_NAME} \
    --query-execution-context Database=${GLUE_DATABASE_NAME},Catalog=AwsDataCatalog \
    --query QueryExecutionId \
    --output text \
)

クエリの実行結果を取得すると、データターゲットにGlueジョブで加工されたデータが作成されていることが確認できました。

% aws athena get-query-results \
  --query-execution-id $QueryExecutionId \
  --query ResultSet.Rows

[
    {
        "Data": [
            {
                "VarCharValue": "device_id"
            },
            {
                "VarCharValue": "timestamp"
            },
            {
                "VarCharValue": "state"
            },
            {
                "VarCharValue": "partition_date"
            },
            {
                "VarCharValue": "year"
            },
            {
                "VarCharValue": "month"
            },
            {
                "VarCharValue": "day"
            }
        ]
    },
    {
        "Data": [
            {
                "VarCharValue": "3ff9c44a"
            },
            {
                "VarCharValue": "1609348014"
            },
            {
                "VarCharValue": "true"
            },
            {
                "VarCharValue": "2021/01/13"
            },
            {
                "VarCharValue": "2021"
            },
            {
                "VarCharValue": "01"
            },
            {
                "VarCharValue": "13"
            }
        ]
    }
]

おわりに

クローラーとジョブを組み合わせて、パーティション分割されたデータソースをパーティション分割したデータターゲットに追加するETLフローを作ってみました。

今回はETLワークフローを手動実行しましたが、cronベースのトリガーで日次バッチとして実行させることももちろん可能です。これを基本形にいろいろ応用させてみたいですね。

参考

以上

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.