DynamoDB から S3 への定期的なエクスポートの仕組みを AWS Glue と Step Functions を使用して実装してみた

DynamoDB から S3 への定期的なエクスポートの仕組みを AWS Glue と Step Functions を使用して実装してみた

DynamoDB から S3 へのエクスポートを定期的に実行する仕組みを AWS ブログで紹介されている CloudFormation テンプレートとスクリプトを使用して実装してみました
Clock Icon2020.09.14

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

コンバンハ、千葉(幸)です。

DynamoDB テーブルの中身を S3 バケットにエクスポートしたい、という場合があるかと思います。S3 にエクスポートしたものに対して、例えば Athena を利用して解析をかけたい、といったケースです。

AWS Glue や AWS Step Functions を利用して、定期的にエクスポートを行う仕組みについて以下のブログで紹介されているので、試してみました。 CloudFormation テンプレートや スクリプトが用意されているので、一通り流すだけでできます。

全体像としては以下のイメージです。


2020/11/6追記

執筆時点より一年以上前のブログを参考にしていることもあり、サービスアップデートによりバッドプラクティスになってしまっている箇所があります。社内でフィードバックを受けましたので、数カ所で補足をします。

  • Data Pipeline, EMR, Glue の比較
  • Step Functions の Glue Workflow への置き換え
  • Glue ETL スクリプトの 各種バージョン
  • Glue ETL スクリプトのDynamoDB 読み取り並列度

目次

DynamoDB から S3 へのバックアップのパターン

やってみる前に、全体像のおさらいをしておきます。

DynamoDB の中身を S3 にバックアップ・エクスポートする手法について、以下にまとまっています。

DynamoDB 標準機能によるバックアップ

ユーザー側で作り込みをしなくとも、DynamoDB では標準のバックアップの仕組みが用意されています。

  • オンデマンドバックアップ
  • ポイントインタイムリカバリ(継続的なバックアップ)

これらの機能により取得されたバックアップは S3 に保存されますが、ユーザーが該当 S3 にアクセスすることはできません。

ユーザー側での操作による DynamoDB バックアップ(エクスポート)

2020/11/06追記:Data Pipeline, EMR, Glue の比較内容について全体的に修正を行いました

ユーザーがアクセスできる S3 バケットへ DynamoDB テーブルをエクスポートする手段として、以下が紹介されています。

  • Data Pipeline
    • メリット:最も簡単。AWS リソースの使用をできる限り少なく抑えながら1回限りのバックアップを作成する際に適する
    • デメリット:他の方法と比較してカスマイズ性が乏しい。直近でのサービスアップデートが行われていない
  • Amazon EMR
    • メリット:詳細な設定が可能
    • デメリット:Amazon EMR に対する習熟が必要
  • AWS Glue
    • メリット:Athena など他のサービスでも利用できる自動的かつ継続的なバックアップを行う際のベストプラクティス
    • デメリット:AWS Glue に対する知見が必要。 Data Pipeline より料金がかかる

今回やってみるのは、一番最後の AWS Glue を使用するパターンです。

Glue はサーバーレスであるため、インフラのメンテナンスが必要ない点、高速に起動する点(Glue 2.0の場合)にメリットがあります。Data Pipeline は直近でのサービスアップデートはほとんど行われていないため、これから採用を検討する際には Glue を選択いただくのがよいかと思います。

参考までに、 Data Pipeline の使用イメージについては、以下エントリをご参照ください。

やってみた

今回は以下のステップに分けて実行していきます。基本的には提供されている CloudFormation テンプレートを使用します。

  1. DynamoDB テーブルの作成
  2. 共通スタックの作成
  3. テーブルエクスポートスタックの作成
  4. DynamoDB から S3 へのエクスポートの実行
  5. エクスポート後の内容の確認

1. DynamoDB テーブルの作成

まずはエクスポート元となる DynamoDB を準備します。すでに適当な DynamoDB がある場合は、このステップは飛ばして問題ありません。

今回は、以下のエントリで紹介されている手順に則り作成します。

ブログで紹介されている、以下のテンプレートを用いて CloudFormation スタックをデプロイします。

(ブログのボタンを押下して CloudFormation の画面に遷移した場合、バージニア北部が選択されている状態のため、必要に応じて別のリージョンに切り替えます。以降の手順でも同様です。)

Description: An example DynamoDB table for storing Reviews on books
Resources:
  ReviewsTable:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: Reviews
      KeySchema:
        - AttributeName: User
          KeyType: HASH
        - AttributeName: Book
          KeyType: RANGE
      AttributeDefinitions:
      - AttributeName: User
        AttributeType: S
      - AttributeName: Book
        AttributeType: S
      ProvisionedThroughput:
       ReadCapacityUnits: 5
       WriteCapacityUnits: 5

ここで作成されるリソースは以下です。

論理 ID タイプ リソース名
ReviewsTable AWS::DynamoDB::Table Reviews

デプロイされた結果を確認すると。このようになっています。

AWS CLI でも確認しておきます。

% aws dynamodb describe-table --table-name Reviews
{
    "Table": {
        "AttributeDefinitions": [
            {
                "AttributeName": "Book",
                "AttributeType": "S"
            },
            {
                "AttributeName": "User",
                "AttributeType": "S"
            }
        ],
        "TableName": "Reviews",
        "KeySchema": [
            {
                "AttributeName": "User",
                "KeyType": "HASH"
            },
            {
                "AttributeName": "Book",
                "KeyType": "RANGE"
            }
        ],
        "TableStatus": "ACTIVE",
        "CreationDateTime": "2020-09-11T20:41:41.764000+09:00",
        "ProvisionedThroughput": {
            "NumberOfDecreasesToday": 0,
            "ReadCapacityUnits": 5,
            "WriteCapacityUnits": 5
        },
        "TableSizeBytes": 0,
        "ItemCount": 0,
        "TableArn": "arn:aws:dynamodb:ap-northeast-1:000000000000:table/Reviews",
        "TableId": "4db9731f-1934-4964-abcc-da9de06207c5"
    }
}

テーブルにアイテムを追加する

現状テーブルは空のままなので、アイテムを追加します。

[ 項目 ]-> [項目の作成 ]を選択し……

[ Text ]を選択し、内容を入力して[保存]を押下します。

なお、ここで入力するのは以下の値です。

{
  "User": "Tristan",
  "Book": "Harry Potter and the Philosopher's Stone",
  "Rating": 5,
  "Review": "A thrilling journey through the world of Hogwarts",
  "Author": "J.K.Rowling"
}

これをもう一度行い、2 項目を登録した状態にします。

2 回目に登録した内容は以下です。

{
  "User": "Adeline",
  "Book": "Harry Potter and the Sorcerer's Stone",
  "Rating": 4,
  "Review": "Harry is pretty brave, but Hermione is the clear hero",
  "Author": "J.K.Rowling"
}

これで DynamoDB の準備ができました。

2. 共通スタックの作成

冒頭のブログの一つ目のテンプレートを用いて実装します。ここで作成されるリソースは以下です。

論理 ID タイプ リソース名
DynamoDBExportStateMachine AWS::StepFunctions::StateMachine DynamoDBExportAndAthenaLoad
StateExecutionRole AWS::IAM::Role AWSBigDataBlog-GlueDynamoExport-StateExecutionRole-<ランダム文字列>
StartGlueCrawler AWS::Lambda::Function AWSBigDataBlog-GlueDynamoExportTa-StartGlueCrawler-<ランダム文字列>
GetGlueCrawlerStatus AWS::Lambda::Function AWSBigDataBlog-GlueDynamoExpo-GetGlueCrawlerStatus-<ランダム文字列>
CreateCurrentView AWS::Lambda::Function AWSBigDataBlog-GlueDynamoExportT-CreateCurrentView-<ランダム文字列>
GetCurrentViewStatus AWS::Lambda::Function AWSBigDataBlog-GlueDynamoExpo-GetCurrentViewStatus-<ランダム文字列>
LambdaRole AWS::IAM::Role AWSBigDataBlog-GlueDynamoExportTableCom-LambdaRole-<ランダム文字列>
DynamoDBExportsBucket AWS::S3::Bucket dynamodb-exports-<アカウント番号>-<リージョン>
EventTriggerRole AWS::IAM::Role AWSBigDataBlog-GlueDynamoExportTa-EventTriggerRole-<ランダム文字列>
GlueCrawlerAndJobRole AWS::IAM::Role AWSGlueServiceRoleDefault

念のためテンプレートも載せておきます。

折り畳み
Description: Template for setting up common infrastructure for exporting DynamoDB tables to S3 with Step Functions
Resources:
  DynamoDBExportsBucket:
    Type: "AWS::S3::Bucket"
    Properties:
      BucketName:
        Fn::Sub: "dynamodb-exports-${AWS::AccountId}-${AWS::Region}"

  GlueCrawlerAndJobRole:
    Type: AWS::IAM::Role
    Properties:
      # Referenced the following documentation while creating this IAM Role
      # http://docs.aws.amazon.com/glue/latest/dg/create-an-iam-role.html
      RoleName: AWSGlueServiceRoleDefault
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
        - Effect: Allow
          Principal:
            Service: glue.amazonaws.com
          Action: sts:AssumeRole
      ManagedPolicyArns:
      - arn:aws:iam::aws:policy/AmazonS3FullAccess
      - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole
      - arn:aws:iam::aws:policy/AWSGlueConsoleFullAccess
      - arn:aws:iam::aws:policy/AmazonDynamoDBReadOnlyAccess

  LambdaRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
        - Action:
          - sts:AssumeRole
          Effect: Allow
          Principal:
            Service:
            - lambda.amazonaws.com
      ManagedPolicyArns:
      - arn:aws:iam::aws:policy/AWSLambdaExecute
      - arn:aws:iam::aws:policy/AmazonAthenaFullAccess
      Policies:
      - PolicyName: GlueJobTrigger
        PolicyDocument:
          Version: "2012-10-17"
          Statement:
          - Action:
            - glue:StartCrawler
            - glue:GetCrawler
            - glue:GetCrawlerMetrics
            - athena:StartQueryExecution
            - s3:ListBucket
            Effect: Allow
            Resource: "*"

  EventTriggerRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
        - Action:
          - sts:AssumeRole
          Effect: Allow
          Principal:
            Service:
            - events.amazonaws.com
      Policies:
      - PolicyName: GlueJobTrigger
        PolicyDocument:
          Version: "2012-10-17"
          Statement:
          - Action:
            - states:StartExecution
            Effect: Allow
            Resource: "*"

  StateExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
        - Action:
          - sts:AssumeRole
          Effect: Allow
          Principal:
            Service:
            - states.amazonaws.com
      Policies:
      - PolicyName: GlueJobTrigger
        PolicyDocument:
          Version: "2012-10-17"
          Statement:
          - Action:
            - "lambda:InvokeFunction"
            - "glue:StartJobRun"
            - "glue:GetJobRun"
            - "glue:GetJobRuns"
            - "glue:BatchStopJobRun"
            Effect: Allow
            Resource: "*"

  DynamoDBExportStateMachine:
    Type: "AWS::StepFunctions::StateMachine"
    Properties:
      StateMachineName: DynamoDBExportAndAthenaLoad
      RoleArn:
        Fn::GetAtt:
        - StateExecutionRole
        - Arn
      DefinitionString:
        Fn::Sub:
        - |-
            {
              "StartAt": "StartJobRun",
              "States": {
                "StartJobRun": {
                  "Type": "Task",
                  "ResultPath": "$.glueresult",
                  "Resource": "arn:aws:states:::glue:startJobRun.sync",
                  "Parameters": {
                    "JobName.$": "$.glue_job_name",
                    "Arguments": {
                      "--table_name.$": "$.table_name",
                      "--read_percentage.$": "$.read_percentage",
                      "--output_prefix.$": "$.output_prefix",
                      "--output_format.$": "$.output_format"
                    }
                  },
                  "Next": "Start Crawler",
                  "Catch": [
                    {
                      "ErrorEquals": [
                        "States.ALL"
                      ],
                      "Next": "Export Failed"
                    }
                  ]
                },
                "Export Failed": {
                  "Type": "Fail",
                  "Cause": "One or more steps could not complete successfully",
                  "Error": "ExportFailed"
                },
                "Start Crawler": {
                  "Type": "Task",
                  "Resource": "${StartGlueCrawler}",
                  "Next": "Wait for Crawler",
                  "Retry": [
                    {
                      "ErrorEquals": ["Lambda.ServiceException"],
                      "IntervalSeconds": 3,
                      "MaxAttempts": 3,
                      "BackoffRate": 1.5
                    }
                  ]
                },
                "Wait for Crawler": {
                  "Type": "Wait",
                  "Seconds": 60,
                  "Next": "Get Crawler Status"
                },
                "Get Crawler Status": {
                  "Type": "Task",
                  "Resource": "${GetGlueCrawlerStatus}",
                  "Next": "Crawler Finished?"
                },
                "Crawler Finished?": {
                  "Type": "Choice",
                  "Choices": [
                    {
                      "Variable": "$.glue_crawler_status",
                      "StringEquals": "SUCCEEDED",
                      "Next": "Create Current View"
                    },
                    {
                      "Variable": "$.glue_crawler_status",
                      "StringEquals": "CANCELLED",
                      "Next": "Export Failed"
                    },
                    {
                      "Variable": "$.glue_crawler_status",
                      "StringEquals": "FAILED",
                      "Next": "Export Failed"
                    }
                  ],
                  "Default": "Wait for Crawler"
                },
                "Create Current View": {
                  "Type": "Task",
                  "Resource": "${CreateCurrentView}",
                  "Next": "Wait for Current View",
                  "Retry": [
                    {
                      "ErrorEquals": ["Lambda.ServiceException"],
                      "IntervalSeconds": 3,
                      "MaxAttempts": 3,
                      "BackoffRate": 1.5
                    }
                  ]
                },
                "Wait for Current View": {
                  "Type": "Wait",
                  "Seconds": 5,
                  "Next": "Get Current View Status"
                },
                "Get Current View Status": {
                  "Type": "Task",
                  "Resource": "${GetCurrentViewStatus}",
                  "Next": "Create Current View Finished?",
                  "Retry": [
                    {
                      "ErrorEquals": ["Lambda.ServiceException"],
                      "IntervalSeconds": 3,
                      "MaxAttempts": 3,
                      "BackoffRate": 1.5
                    }
                  ]
                },
                "Create Current View Finished?": {
                  "Type": "Choice",
                  "Choices": [
                    {
                      "Variable": "$.athena_query_status",
                      "StringEquals": "SUCCEEDED",
                      "Next": "Done"
                    },
                    {
                      "Variable": "$.athena_query_status",
                      "StringEquals": "FAILED",
                      "Next": "Export Failed"
                    },
                    {
                      "Variable": "$.athena_query_status",
                      "StringEquals": "CANCELLED",
                      "Next": "Export Failed"
                    }
                  ],
                  "Default": "Wait for Current View"
                },
                "Done": {
                  "Type": "Pass",
                  "End": true
                }
              }
            }
        - StartGlueCrawler:
            Fn::GetAtt:
            - StartGlueCrawler
            - Arn
          GetGlueCrawlerStatus:
            Fn::GetAtt:
            - GetGlueCrawlerStatus
            - Arn
          CreateCurrentView:
            Fn::GetAtt:
            - CreateCurrentView
            - Arn
          GetCurrentViewStatus:
            Fn::GetAtt:
            - GetCurrentViewStatus
            - Arn

  StartGlueCrawler:
    Type: AWS::Lambda::Function
    Properties:
      Handler: "index.crawler_trigger"
      Runtime: python3.6
      Timeout: "60"
      Role:
        Fn::GetAtt:
        - LambdaRole
        - Arn
      Code:
        ZipFile:
          |-
            import logging
            import datetime

            import boto3

            LOGGER = logging.getLogger()
            LOGGER.setLevel(logging.INFO)

            KEY_CRAWLER_NAME = 'crawler_name'

            GLUE_CLIENT = boto3.client('glue')

            def crawler_trigger(event, context):
              crawler_name = event[KEY_CRAWLER_NAME]
              GLUE_CLIENT.start_crawler(
                Name=crawler_name
              )
              return event

  GetGlueCrawlerStatus:
    Type: AWS::Lambda::Function
    Properties:
      Handler: "index.lambda_handler"
      Runtime: python3.6
      Role:
        Fn::GetAtt:
        - LambdaRole
        - Arn
      Code:
        ZipFile:
          |-
            import json
            import logging
            import datetime

            from dateutil.parser import parse

            import boto3

            LOGGER = logging.getLogger()
            LOGGER.setLevel(logging.INFO)

            GLUE_CLIENT = boto3.client('glue')
            KEY_GLUE_CRAWLER_NAME = "crawler_name"
            KEY_GLUE_CRAWLER_STATUS = "glue_crawler_status"
            KEY_STARTED_ON = "StartedOn"
            KEY_GLUE_RESULT = "glueresult"

            def lambda_handler(event, context):
                crawler_name = event[KEY_GLUE_CRAWLER_NAME]

                resp = GLUE_CLIENT.get_crawler(Name=crawler_name)
                print(resp)

                # In the case of an early exit make sure the key is present but the value
                # is a no op
                event[KEY_GLUE_CRAWLER_STATUS] = ""

                last_crawl = resp['Crawler'].get('LastCrawl')
                if last_crawl is None:
                    return event

                last_crawl_start = last_crawl.get('StartTime')
                # Timestamp is stored as milliseconds since the epoch
                glue_job_start = datetime.datetime.fromtimestamp(event[KEY_GLUE_RESULT][KEY_STARTED_ON] / 1000)
                # If the last crawl is before the snapshot_timestamp then we're still waiting
                # for the current run we've initiated to finish
                if last_crawl_start is None or last_crawl_start.timestamp() < glue_job_start.timestamp():
                    return event

                event[KEY_GLUE_CRAWLER_STATUS] = last_crawl['Status']

                return event

  CreateCurrentView:
    Type: AWS::Lambda::Function
    Properties:
      Handler: "index.create_current_view_trigger"
      Runtime: python3.6
      Timeout: 30
      Role:
        Fn::GetAtt:
        - LambdaRole
        - Arn
      Code:
        ZipFile:
          |-
            import logging
            import os
            import boto3

            LOGGER = logging.getLogger()
            LOGGER.setLevel(logging.INFO)

            KEY_TABLE_NAME = "table_name"
            SNAPHSHOT_TIMESTAMP = "snapshot_timestamp"
            KEY_OUTPUT_PREFIX = "output_prefix"

            ATHENA_CLIENT = boto3.client('athena')
            S3_CLIENT = boto3.client('s3')

            def create_current_view_trigger(event, context):
              table_name = event[KEY_TABLE_NAME].lower().replace("-", "_")
              output_prefix = event[KEY_OUTPUT_PREFIX]
              snapshot_timestamp = get_snapshot_timestamp(output_prefix)
              
              query = f"CREATE OR REPLACE VIEW \"dynamodb_exports\".\"{table_name}\" AS SELECT * FROM \"dynamodb_exports\".\"snapshots_{table_name}\" WHERE {SNAPHSHOT_TIMESTAMP} = '{snapshot_timestamp}';"
              LOGGER.info(query)

              # Athena always has permissions to write to the bucket of the following structure
              account_id = get_account_id(context)
              region = os.environ['AWS_REGION']
              output = f"s3://aws-athena-query-results-{account_id}-{region}"

              # It's not uncommon for the start_query_execution response to return a successful response
              # and then for the query to fail asynchronously. Check the Athena Query History to double check.
              resp = ATHENA_CLIENT.start_query_execution(
                  QueryString=query,
                  ResultConfiguration={
                      'OutputLocation': output
                  }
              )
              LOGGER.info(resp)
              event['athena_query_execution_id'] = resp['QueryExecutionId']

              return event

            # AWS Account ID is not available in the environment, but you can get it breaking a part the Lambda
            # arn
            def get_account_id(context):
                return context.invoked_function_arn.split(":")[4]

            def get_snapshot_timestamp(output_prefix):
              split = output_prefix.split("/")
              bucket = split[2]
              fmt = split[3]
              table = split[4]

              # The trailing slash is really important as it allows us to query S3 for the CommonPrefixes
              # which can be thought of as the next "folder" after a given prefix.
              prefix = f"{fmt}/{table}/"

              # Not sure if there can be so many prefixes that there's a continuation token. For now
              # assuming that there is not
              resp = S3_CLIENT.list_objects_v2(
                Bucket=bucket,
                Delimiter='/',
                Prefix=prefix
              )

              # Each prefix will look like 'parquet/Reviews/snapshot_timestamp=2019-01-11T06:15/'
              prefixes = [x['Prefix'].split('/')[2].split('=')[1] for x in resp['CommonPrefixes']]
              
              # There's no documentation on what order the CommonPrefixes come on so assume none, but we can 
              # assume they're lexographically sortable give how the snapshot timestamp is stored
              prefixes.sort(reverse=True)

              # If the state machine has gotten this far there will always be at least one prefix
              return prefixes[0]

  GetCurrentViewStatus:
    Type: AWS::Lambda::Function
    Properties:
      Handler: "index.lambda_handler"
      Runtime: python3.6
      Role:
        Fn::GetAtt:
        - LambdaRole
        - Arn
      Code:
        ZipFile:
          |-
            import logging
            import os
            import boto3

            LOGGER = logging.getLogger()
            LOGGER.setLevel(logging.INFO)

            ATHENA_CLIENT = boto3.client('athena')

            def lambda_handler(event, context):
                query_id = event['athena_query_execution_id']
                resp = ATHENA_CLIENT.get_query_execution(QueryExecutionId=query_id)

                event['athena_query_status'] = resp['QueryExecution']['Status']['State']

                return event

Outputs:
  DynamoDBExportsBucket:
    Description: S3 bucket for DynamoDB exports
    Export:
      Name: DynamoDBExportsBucket
    Value:
      Ref: DynamoDBExportsBucket
  GlueCrawlerAndJobRole:
    Description: IAM Role for Glue Crawlers and Jobs
    Export:
      Name: GlueCrawlerAndJobRole
    Value:
      Ref: GlueCrawlerAndJobRole
  EventTriggerRole:
    Description: IAM Role for CloudWatch Events
    Export:
      Name: EventTriggerRole
    Value:
      Fn::GetAtt:
      - EventTriggerRole
      - Arn
  ExportStateMachineArn:
    Description: ARN for the Export State Machine
    Export:
      Name: ExportStateMachineArn
    Value:
      Ref: DynamoDBExportStateMachine
2020/11/06追記:Step Functions の Glue Workflow への置き換えについて

(追記ここから)

現在は、Glue ジョブやクローラーの依存関係や定期実行は Glue Workflow ですべて管理できるようになっています。

Glue Workflow に置き換えることで、本エントリの構成における Step Functions, Lambda関数、CloudWatch Events ルールはいずれも不要になり、シンプルな構成かつ料金も下がります。

Glue Workflow については以下をあわせてご参照ください。

(追記ここまで)

3. テーブルエクスポートスタックの作成

冒頭のブログの二つ目のテンプレートを用いて実装します。ここで作成されるリソースは以下です。

論理 ID タイプ リソース名
ExportConverterGlueJob AWS::Glue::Job ReviewsExportTo<フォーマット名>
GlueCrawler AWS::Glue::Crawler ReviewscsvCrawler
Trigger AWS::Events::Rule AWSBigDataBlog-GlueDynamoExportTableExport-Trigger-<ランダム文字列>

なお、このテンプレートでは、デプロイ時に指定するパラメータが定義されています。

  • MaxConsumedReadThroughput:読み取り容量の消費割合の上限
  • OutputFormat:アウトプットのフォーマット
  • TableName:エクスポート対象の DynamoDB テーブル名

今回は csv フォーマットでエクスポートしてみます。

こちらもテンプレートを載せておきます。

折り畳み
Description: Export a DynamoDB table to S3 with a Glue Job
Parameters:
  TableName:
    Type: String
    Description: DynamoDB Table Name to export
  MaxConsumedReadThroughput:
    Type: Number
    Description: The maximum amount of Read Capacity Units the export is allowed to consumed expressed as a percentage
    MinValue: 0.001
    MaxValue: 1.0
    Default: 0.25
  OutputFormat:
    Type: String
    Description: Output format of the export. One of avro, csv, json, orc, parquet, or xml.
    AllowedValues:
    - avro
    - csv
    - json
    - orc
    - parquet
    - xml
Resources:
  Trigger:
    Type: "AWS::Events::Rule"
    Properties:
      Description:
        Fn::Sub:
        - "Start Export of ${Name} every night at midnight"
        - Name:
            Ref: TableName
      Targets:
      - Id: StepFunctions
        Arn:
          Fn::ImportValue: ExportStateMachineArn
        RoleArn:
          Fn::ImportValue: EventTriggerRole
        Input:
          Fn::Sub:
          - |-
              {
                "glue_job_name": "${GlueJobName}",
                "output_prefix": "${OutputPrefix}",
                "table_name": "${TableName}",
                "read_percentage": "${ReadPercentage}",
                "crawler_name": "${CrawlerName}",
                "output_format": "${OutputFormat}"
              }
          - GlueJobName:
              Fn::Sub:
              - "${Name}ExportTo${OutputFormat}"
              - Name:
                  Ref: TableName
                OutputFormat:
                  Ref: OutputFormat
            OutputPrefix:
              Fn::Sub:
              - "s3://${Bucket}/${OutputFormat}/${TableName}"
              - Bucket:
                  Fn::ImportValue: DynamoDBExportsBucket
                TableName:
                  Ref: TableName
                OutputFormat:
                  Ref: OutputFormat
            OutputFormat:
              Ref: OutputFormat
            ReadPercentage:
              Ref: MaxConsumedReadThroughput
            CrawlerName:
              Fn::Sub:
              - "${Name}${OutputFormat}Crawler"
              - Name:
                  Ref: TableName
                OutputFormat:
                  Ref: OutputFormat
      ScheduleExpression: "cron(0 10 * * ? *)"
      State: DISABLED

  # Export table to S3 in the parquet format
  ExportConverterGlueJob:
    Type: "AWS::Glue::Job"
    Properties:
      Name:
        Fn::Sub:
        - "${Name}ExportTo${OutputFormat}"
        - Name:
            Ref: TableName
          OutputFormat:
            Ref: OutputFormat
      Role:
        Fn::ImportValue: GlueCrawlerAndJobRole
      MaxRetries: 3
      Description:
        Fn::Sub:
        - Exports a DynamoDB table to ${OutputFormat}
        - OutputFormat:
            Ref: OutputFormat
      Command:
        # DO NOT CHANGE NAME. CloudFormation docs are wrong. Use Glue API docs:
        # http://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-jobs-job.html#aws-glue-api-jobs-job-JobCommand
        Name: "glueetl"
        ScriptLocation: "s3://aws-bigdata-blog/artifacts/using-glue-to-access-dynamodb-tables/export-dynamodb-table.py"
      AllocatedCapacity: 10
      ExecutionProperty:
        MaxConcurrentRuns: 3
      DefaultArguments:
        "--TempDir":
          Fn::Sub:
          - "s3://${Bucket}/glue-temp-dir"
          - Bucket:
              Fn::ImportValue: DynamoDBExportsBucket

  GlueCrawler:
    Type: AWS::Glue::Crawler
    Properties:
      Role:
        Fn::ImportValue: GlueCrawlerAndJobRole
      Name:
        Fn::Sub:
        - "${Name}${OutputFormat}Crawler"
        - Name:
            Ref: TableName
          OutputFormat:
            Ref: OutputFormat
      Description:
        Fn::Sub:
        - "Add new partitions and handle schema updates to the ${Name} table"
        - Name:
            Ref: TableName
      SchemaChangePolicy:
        UpdateBehavior: UPDATE_IN_DATABASE
        DeleteBehavior: DELETE_FROM_DATABASE
      DatabaseName: "dynamodb_exports"
      TablePrefix: "snapshots_"
      Targets:
        S3Targets:
        - Path:
            Fn::Sub:
            - "s3://${Bucket}/${OutputFormat}/${TableName}"
            - Bucket:
                Fn::ImportValue: DynamoDBExportsBucket
              TableName:
                Ref: TableName
              OutputFormat:
                Ref: OutputFormat

デプロイされたリソースの確認

次のステップに進む前に、2. と 3. のステップで作成された各種リソースを確認してみます。

Step Functions ステートマシン

このように作成されています。

AWS CLI での確認。

% aws stepfunctions describe-state-machine --state-machine-arn arn:aws:states:ap-northeast-1:000000000000:stateMachine:DynamoDBExportAndAthenaLoad
{
    "stateMachineArn": "arn:aws:states:ap-northeast-1:000000000000:stateMachine:DynamoDBExportAndAthenaLoad",
    "name": "DynamoDBExportAndAthenaLoad",
    "status": "ACTIVE",
    "definition": "{\n 略 \n}",
    "roleArn": "arn:aws:iam::000000000000:role/AWSBigDataBlog-GlueDynamoExport-StateExecutionRole-CHS5BEKC9VBC",
    "type": "STANDARD",
    "creationDate": "2020-09-11T21:02:48.654000+09:00",
    "loggingConfiguration": {
        "level": "OFF",
        "includeExecutionData": false
    }
}

定義の内訳はこのようになっています。

折り畳み
{
  "StartAt": "StartJobRun",
  "States": {
    "StartJobRun": {
      "Type": "Task",
      "ResultPath": "$.glueresult",
      "Resource": "arn:aws:states:::glue:startJobRun.sync",
      "Parameters": {
        "JobName.$": "$.glue_job_name",
        "Arguments": {
          "--table_name.$": "$.table_name",
          "--read_percentage.$": "$.read_percentage",
          "--output_prefix.$": "$.output_prefix",
          "--output_format.$": "$.output_format"
        }
      },
      "Next": "Start Crawler",
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "Export Failed"
        }
      ]
    },
    "Export Failed": {
      "Type": "Fail",
      "Cause": "One or more steps could not complete successfully",
      "Error": "ExportFailed"
    },
    "Start Crawler": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:000000000000:function:AWSBigDataBlog-GlueDynamoExportTa-StartGlueCrawler-LHEP9BGFS2NZ",
      "Next": "Wait for Crawler",
      "Retry": [
        {
          "ErrorEquals": ["Lambda.ServiceException"],
          "IntervalSeconds": 3,
          "MaxAttempts": 3,
          "BackoffRate": 1.5
        }
      ]
    },
    "Wait for Crawler": {
      "Type": "Wait",
      "Seconds": 60,
      "Next": "Get Crawler Status"
    },
    "Get Crawler Status": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:000000000000:function:AWSBigDataBlog-GlueDynamoExpo-GetGlueCrawlerStatus-1D6OO6MQMQ4YJ",
      "Next": "Crawler Finished?"
    },
    "Crawler Finished?": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.glue_crawler_status",
          "StringEquals": "SUCCEEDED",
          "Next": "Create Current View"
        },
        {
          "Variable": "$.glue_crawler_status",
          "StringEquals": "CANCELLED",
          "Next": "Export Failed"
        },
        {
          "Variable": "$.glue_crawler_status",
          "StringEquals": "FAILED",
          "Next": "Export Failed"
        }
      ],
      "Default": "Wait for Crawler"
    },
    "Create Current View": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:000000000000:function:AWSBigDataBlog-GlueDynamoExportT-CreateCurrentView-K1KBFSLTS6G1",
      "Next": "Wait for Current View",
      "Retry": [
        {
          "ErrorEquals": ["Lambda.ServiceException"],
          "IntervalSeconds": 3,
          "MaxAttempts": 3,
          "BackoffRate": 1.5
        }
      ]
    },
    "Wait for Current View": {
      "Type": "Wait",
      "Seconds": 5,
      "Next": "Get Current View Status"
    },
    "Get Current View Status": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:000000000000:function:AWSBigDataBlog-GlueDynamoExpo-GetCurrentViewStatus-SOWZAONHY5MJ",
      "Next": "Create Current View Finished?",
      "Retry": [
        {
          "ErrorEquals": ["Lambda.ServiceException"],
          "IntervalSeconds": 3,
          "MaxAttempts": 3,
          "BackoffRate": 1.5
        }
      ]
    },
    "Create Current View Finished?": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.athena_query_status",
          "StringEquals": "SUCCEEDED",
          "Next": "Done"
        },
        {
          "Variable": "$.athena_query_status",
          "StringEquals": "FAILED",
          "Next": "Export Failed"
        },
        {
          "Variable": "$.athena_query_status",
          "StringEquals": "CANCELLED",
          "Next": "Export Failed"
        }
      ],
      "Default": "Wait for Current View"
    },
    "Done": {
      "Type": "Pass",
      "End": true
    }
  }
}

ステートマシンでは、各ステップで Lambda 関数が呼び出されています。

それぞれのコードを確認しておきます。

Lambda 関数①(Start Crawler)

import logging
import datetime

import boto3

LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)

KEY_CRAWLER_NAME = 'crawler_name'

GLUE_CLIENT = boto3.client('glue')

def crawler_trigger(event, context):
  crawler_name = event[KEY_CRAWLER_NAME]
  GLUE_CLIENT.start_crawler(
    Name=crawler_name
  )
  return event

Lambda 関数②(Get Crawler Status)

import json
import logging
import datetime

from dateutil.parser import parse

import boto3

LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)

GLUE_CLIENT = boto3.client('glue')
KEY_GLUE_CRAWLER_NAME = "crawler_name"
KEY_GLUE_CRAWLER_STATUS = "glue_crawler_status"
KEY_STARTED_ON = "StartedOn"
KEY_GLUE_RESULT = "glueresult"

def lambda_handler(event, context):
    crawler_name = event[KEY_GLUE_CRAWLER_NAME]

    resp = GLUE_CLIENT.get_crawler(Name=crawler_name)
    print(resp)

    # In the case of an early exit make sure the key is present but the value
    # is a no op
    event[KEY_GLUE_CRAWLER_STATUS] = ""

    last_crawl = resp['Crawler'].get('LastCrawl')
    if last_crawl is None:
        return event

    last_crawl_start = last_crawl.get('StartTime')
    # Timestamp is stored as milliseconds since the epoch
    glue_job_start = datetime.datetime.fromtimestamp(event[KEY_GLUE_RESULT][KEY_STARTED_ON] / 1000)
    # If the last crawl is before the snapshot_timestamp then we're still waiting
    # for the current run we've initiated to finish
    if last_crawl_start is None or last_crawl_start.timestamp() < glue_job_start.timestamp():
        return event

    event[KEY_GLUE_CRAWLER_STATUS] = last_crawl['Status']

    return event

Lambda 関数③(Create Current View)

import logging
import os
import boto3

LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)

KEY_TABLE_NAME = "table_name"
SNAPHSHOT_TIMESTAMP = "snapshot_timestamp"
KEY_OUTPUT_PREFIX = "output_prefix"

ATHENA_CLIENT = boto3.client('athena')
S3_CLIENT = boto3.client('s3')

def create_current_view_trigger(event, context):
  table_name = event[KEY_TABLE_NAME].lower().replace("-", "_")
  output_prefix = event[KEY_OUTPUT_PREFIX]
  snapshot_timestamp = get_snapshot_timestamp(output_prefix)
  
  query = f"CREATE OR REPLACE VIEW \"dynamodb_exports\".\"{table_name}\" AS SELECT * FROM \"dynamodb_exports\".\"snapshots_{table_name}\" WHERE {SNAPHSHOT_TIMESTAMP} = '{snapshot_timestamp}';"
  LOGGER.info(query)

  # Athena always has permissions to write to the bucket of the following structure
  account_id = get_account_id(context)
  region = os.environ['AWS_REGION']
  output = f"s3://aws-athena-query-results-{account_id}-{region}"

  # It's not uncommon for the start_query_execution response to return a successful response
  # and then for the query to fail asynchronously. Check the Athena Query History to double check.
  resp = ATHENA_CLIENT.start_query_execution(
      QueryString=query,
      ResultConfiguration={
          'OutputLocation': output
      }
  )
  LOGGER.info(resp)
  event['athena_query_execution_id'] = resp['QueryExecutionId']

  return event

# AWS Account ID is not available in the environment, but you can get it breaking a part the Lambda
# arn
def get_account_id(context):
    return context.invoked_function_arn.split(":")[4]

def get_snapshot_timestamp(output_prefix):
  split = output_prefix.split("/")
  bucket = split[2]
  fmt = split[3]
  table = split[4]

  # The trailing slash is really important as it allows us to query S3 for the CommonPrefixes
  # which can be thought of as the next "folder" after a given prefix.
  prefix = f"{fmt}/{table}/"

  # Not sure if there can be so many prefixes that there's a continuation token. For now
  # assuming that there is not
  resp = S3_CLIENT.list_objects_v2(
    Bucket=bucket,
    Delimiter='/',
    Prefix=prefix
  )

  # Each prefix will look like 'parquet/Reviews/snapshot_timestamp=2019-01-11T06:15/'
  prefixes = [x['Prefix'].split('/')[2].split('=')[1] for x in resp['CommonPrefixes']]
  
  # There's no documentation on what order the CommonPrefixes come on so assume none, but we can 
  # assume they're lexographically sortable give how the snapshot timestamp is stored
  prefixes.sort(reverse=True)

  # If the state machine has gotten this far there will always be at least one prefix
  return prefixes[0]

Lambda 関数④(Get Current View Status)

import logging
import os
import boto3

LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)

ATHENA_CLIENT = boto3.client('athena')

def lambda_handler(event, context):
    query_id = event['athena_query_execution_id']
    resp = ATHENA_CLIENT.get_query_execution(QueryExecutionId=query_id)

    event['athena_query_status'] = resp['QueryExecution']['Status']['State']

    return event

Glue クローラ

% aws glue get-crawler --name ReviewscsvCrawler
{
    "Crawler": {
        "Name": "ReviewscsvCrawler",
        "Role": "AWSGlueServiceRoleDefault",
        "Targets": {
            "S3Targets": [
                {
                    "Path": "s3://dynamodb-exports-000000000000-ap-northeast-1/csv/Reviews",
                    "Exclusions": []
                }
            ],
            "JdbcTargets": [],
            "DynamoDBTargets": [],
            "CatalogTargets": []
        },
        "DatabaseName": "dynamodb_exports",
        "Description": "Add new partitions and handle schema updates to the Reviews table",
        "Classifiers": [],
        "SchemaChangePolicy": {
            "UpdateBehavior": "UPDATE_IN_DATABASE",
            "DeleteBehavior": "DELETE_FROM_DATABASE"
        },
        "State": "READY",
        "TablePrefix": "snapshots_",
        "CrawlElapsedTime": 0,
        "CreationTime": "2020-09-11T21:15:27+09:00",
        "LastUpdated": "2020-09-11T21:15:27+09:00",
        "Version": 1
    }
}

Glue ジョブ

外部の S3 にあるスクリプトを呼び出しています。

% aws glue get-job --job-name ReviewsExportTocsv
{
    "Job": {
        "Name": "ReviewsExportTocsv",
        "Description": "Exports a DynamoDB table to csv",
        "Role": "AWSGlueServiceRoleDefault",
        "CreatedOn": "2020-09-11T21:15:26.598000+09:00",
        "LastModifiedOn": "2020-09-11T21:15:26.598000+09:00",
        "ExecutionProperty": {
            "MaxConcurrentRuns": 3
        },
        "Command": {
            "Name": "glueetl",
            "ScriptLocation": "s3://aws-bigdata-blog/artifacts/using-glue-to-access-dynamodb-tables/export-dynamodb-table.py",
            "PythonVersion": "2"
        },
        "DefaultArguments": {
            "--TempDir": "s3://dynamodb-exports-000000000000-ap-northeast-1/glue-temp-dir"
        },
        "MaxRetries": 3,
        "AllocatedCapacity": 10,
        "Timeout": 2880,
        "MaxCapacity": 10.0
    }
}
2020/11/06追記:Glue ETL スクリプトの 各種バージョンについて

(追記ここから)

ここでは、以下のバージョン・設定となっています。

  • PythonVersion:2
  • GlueVersion:指定なし(0.9)
  • WorkerType:指定なし(Standard)
  • NumberOfWorkers:指定なし(代わりに MaxCapacity=10)

2020年11月時点としては、以下の構成に置き換えることをおすすめします。特に、GlueVersion 2.0 に変更することを強く推奨します。

  • PythonVersion:3
  • GlueVersion:2.0
  • WorkerType:G.1X もしくは G.2X
  • NumberOfWorkers:10

(追記ここまで)

このスクリプトの内訳は、冒頭のブログに記載のある以下が該当します。

import sys
import datetime
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext

ARG_TABLE_NAME = "table_name"
ARG_READ_PERCENT = "read_percentage"
ARG_OUTPUT = "output_prefix"
ARG_FORMAT = "output_format"

PARTITION = "snapshot_timestamp"

args = getResolvedOptions(sys.argv,
  [
    'JOB_NAME',
    ARG_TABLE_NAME,
    ARG_READ_PERCENT,
    ARG_OUTPUT,
    ARG_FORMAT
  ]
)

table_name = args[ARG_TABLE_NAME]
read = args[ARG_READ_PERCENT]
output_prefix = args[ARG_OUTPUT]
fmt = args[ARG_FORMAT]

print("Table name:", table_name)
print("Read percentage:", read)
print("Output prefix:", output_prefix)
print("Format:", fmt)

date_str = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M')
output = "%s/%s=%s" % (output_prefix, PARTITION, date_str)

sc = SparkContext()
glueContext = GlueContext(sc)

table = glueContext.create_dynamic_frame.from_options(
  "dynamodb",
  connection_options={
    "dynamodb.input.tableName": table_name,
    "dynamodb.throughput.read.percent": read
  }
)

glueContext.write_dynamic_frame.from_options(
  frame=table,
  connection_type="s3",
  connection_options={
    "path": output
  },
  format=fmt,
  transformation_ctx="datasink"
)
2020/11/06追記:Glue ETL スクリプトのDynamoDB 読み取り並列度について

(追記ここから)

ここではdynamodb.splitsというパラメータが有効化されていないため、読み取りが並列化されていません。今回の構成では Glue クラスターのリソースとして DPU = 10 が割り当てられていますが、そのほとんどが活用されないということになります。パラメータを明示的に指定することをお勧めします。

(追記ここまで)

Events ルール

今回は Amazon EventBridge の画面から確認しましたが、CloudWatch の画面からも確認できます。

CloudFormation からデプロイした際には定期実行が無効に設定されています。この仕組みを用いて定期実行させたい場合には、有効化する必要があります。

また、後続の手順で手動実行を行います。その際には、ステートマシンに引き渡すインプットをここからコピーしたものを用います。

AWS CLI での確認結果です。

% aws events describe-rule --name AWSBigDataBlog-GlueDynamoExportTableExport-Trigger-1CX0PNI843MPY
{
    "Name": "AWSBigDataBlog-GlueDynamoExportTableExport-Trigger-1CX0PNI843MPY",
    "Arn": "arn:aws:events:ap-northeast-1:000000000000:rule/AWSBigDataBlog-GlueDynamoExportTableExport-Trigger-1CX0PNI843MPY",
    "ScheduleExpression": "cron(0 10 * * ? *)",
    "State": "DISABLED",
    "Description": "Start Export of Reviews every night at midnight",
    "EventBusName": "default"
}

4. DynamoDB から S3 へのエクスポートの実行

リソースの作成が完了したため、実際に実行してみます。

ステートマシンの画面より、[ 実行の開始 ] を押下します。

先ほどコピーしたインプットを貼り付け、実行します。

実行の状況は以下のようにグラフィカルに確認できます。正常に終了するのを待ちます。

5. エクスポート後の内容の確認

ステートマシンの実行が正常に終了すると、 Glue テーブルが 2つ作成されています。

一つ目のテーブル。

AWS CLI でも見てみます。

% aws glue get-table --database-name dynamodb_exports --name reviews
{
    "Table": {
        "Name": "reviews",
        "DatabaseName": "dynamodb_exports",
        "CreateTime": "2020-09-11T22:44:08+09:00",
        "UpdateTime": "2020-09-11T22:44:08+09:00",
        "Retention": 0,
        "StorageDescriptor": {
            "Columns": [
                {
                    "Name": "rating",
                    "Type": "bigint"
                },
                {
                    "Name": "review",
                    "Type": "string"
                },
                {
                    "Name": "book",
                    "Type": "string"
                },
                {
                    "Name": "author",
                    "Type": "string"
                },
                {
                    "Name": "user",
                    "Type": "string"
                },
                {
                    "Name": "snapshot_timestamp",
                    "Type": "string"
                }
            ],
            "Location": "",
            "Compressed": false,
            "NumberOfBuckets": 0,
            "SerdeInfo": {},
            "SortColumns": [],
            "StoredAsSubDirectories": false
        },
        "PartitionKeys": [],
        "ViewOriginalText": "/* Presto View: eyJvcmlnaW5hbFNxbCI6IlNFTEVDVCAqXG5GUk9NXG4gIGR5bmFtb2RiX2V4cG9ydHMuc25hcHNob3RzX3Jldmlld3NcbldIRVJFIChcInNuYXBzaG90X3RpbWVzdGFtcFwiID0gJzIwMjAtMDktMTFUMTM6NDEnKVxuIiwiY2F0YWxvZyI6ImF3c2RhdGFjYXRhbG9nIiwic2NoZW1hIjoiZGVmYXVsdCIsImNvbHVtbnMiOlt7Im5hbWUiOiJyYXRpbmciLCJ0eXBlIjoiYmlnaW50In0seyJuYW1lIjoicmV2aWV3IiwidHlwZSI6InZhcmNoYXIifSx7Im5hbWUiOiJib29rIiwidHlwZSI6InZhcmNoYXIifSx7Im5hbWUiOiJhdXRob3IiLCJ0eXBlIjoidmFyY2hhciJ9LHsibmFtZSI6InVzZXIiLCJ0eXBlIjoidmFyY2hhciJ9LHsibmFtZSI6InNuYXBzaG90X3RpbWVzdGFtcCIsInR5cGUiOiJ2YXJjaGFyIn1dfQ== */",
        "ViewExpandedText": "/* Presto View */",
        "TableType": "VIRTUAL_VIEW",
        "Parameters": {
            "comment": "Presto View",
            "presto_view": "true"
        },
        "CreatedBy": "arn:aws:sts::000000000000:assumed-role/AWSBigDataBlog-GlueDynamoExportTableCom-LambdaRole-YDE0JG71FHAD/AWSBigDataBlog-GlueDynamoExportT-CreateCurrentView-K1KBFSLTS6G1",
        "IsRegisteredWithLakeFormation": false,
        "CatalogId": "000000000000"
    }
}

2つ目のテーブル。

 % aws glue get-table --database-name dynamodb_exports --name snapshots_reviews
{
    "Table": {
        "Name": "snapshots_reviews",
        "DatabaseName": "dynamodb_exports",
        "Owner": "owner",
        "CreateTime": "2020-09-11T22:42:51+09:00",
        "UpdateTime": "2020-09-11T22:42:51+09:00",
        "LastAccessTime": "2020-09-11T22:42:51+09:00",
        "Retention": 0,
        "StorageDescriptor": {
            "Columns": [
                {
                    "Name": "rating",
                    "Type": "bigint"
                },
                {
                    "Name": "review",
                    "Type": "string"
                },
                {
                    "Name": "book",
                    "Type": "string"
                },
                {
                    "Name": "author",
                    "Type": "string"
                },
                {
                    "Name": "user",
                    "Type": "string"
                }
            ],
            "Location": "s3://dynamodb-exports-000000000000-ap-northeast-1/csv/Reviews/",
            "InputFormat": "org.apache.hadoop.mapred.TextInputFormat",
            "OutputFormat": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
            "Compressed": false,
            "NumberOfBuckets": -1,
            "SerdeInfo": {
                "SerializationLibrary": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
                "Parameters": {
                    "field.delim": ","
                }
            },
            "BucketColumns": [],
            "SortColumns": [],
            "Parameters": {
                "CrawlerSchemaDeserializerVersion": "1.0",
                "CrawlerSchemaSerializerVersion": "1.0",
                "UPDATED_BY_CRAWLER": "ReviewscsvCrawler",
                "areColumnsQuoted": "false",
                "averageRecordSize": "73",
                "classification": "csv",
                "columnsOrdered": "true",
                "compressionType": "none",
                "delimiter": ",",
                "objectCount": "20",
                "recordCount": "4",
                "sizeKey": "297",
                "skip.header.line.count": "1",
                "typeOfData": "file"
            },
            "StoredAsSubDirectories": false
        },
        "PartitionKeys": [
            {
                "Name": "snapshot_timestamp",
                "Type": "string"
            }
        ],
        "TableType": "EXTERNAL_TABLE",
        "Parameters": {
            "CrawlerSchemaDeserializerVersion": "1.0",
            "CrawlerSchemaSerializerVersion": "1.0",
            "UPDATED_BY_CRAWLER": "ReviewscsvCrawler",
            "areColumnsQuoted": "false",
            "averageRecordSize": "73",
            "classification": "csv",
            "columnsOrdered": "true",
            "compressionType": "none",
            "delimiter": ",",
            "objectCount": "20",
            "recordCount": "4",
            "sizeKey": "297",
            "skip.header.line.count": "1",
            "typeOfData": "file"
        },
        "CreatedBy": "arn:aws:sts::000000000000:assumed-role/AWSGlueServiceRoleDefault/AWS-Crawler",
        "IsRegisteredWithLakeFormation": false,
        "CatalogId": "000000000000"
    }
}

これらのテーブルに対して、 Athena の画面からクエリを実行できます。

出力先の S3 バケットにはこのようにオブジェクトが生成されています。

% aws s3 ls dynamodb-exports-000000000000-ap-northeast-1 --recursive
2020-09-11 22:41:25          0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00000
2020-09-11 22:41:25        149 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00001
2020-09-11 22:41:25        148 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00002
2020-09-11 22:41:25          0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00003
2020-09-11 22:41:26          0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00004
2020-09-11 22:41:26          0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00005
2020-09-11 22:41:26          0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00006
2020-09-11 22:41:26          0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00007
2020-09-11 22:41:26          0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00008
2020-09-11 22:41:26          0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00009
2020-09-11 22:41:26          0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00010
2020-09-11 22:41:26          0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00011
2020-09-11 22:41:26          0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00012
2020-09-11 22:41:26          0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00013
2020-09-11 22:41:26          0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00014
2020-09-11 22:41:26          0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00015
2020-09-11 22:41:26          0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00016
2020-09-11 22:41:26          0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00017
2020-09-11 22:41:26          0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00018
2020-09-11 22:41:26          0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00019

ファイルサイズが 0 でないものを 2つ確認すると、それぞれ以下のような中身になっています。DynamoDB テーブルに登録したアイテムが、 csv 形式でエクスポートされています。

Rating,Review,Book,Author,User
5,"A thrilling journey through the world of Hogwarts","Harry Potter and the Philosopher's Stone",J.K.Rowling,Tristan
Rating,Review,Book,Author,User
4,"Harry is pretty brave, but Hermione is the clear hero","Harry Potter and the Sorcerer's Stone",J.K.Rowling,Adeline

これで DynamoDB から S3 へのエクスポートを確認できました。Event ルールの定期実行を有効化すれば、自動で継続的なエクスポートが実現できます。

終わりに

AWS Glue と Step Functions を利用した、DynamoDB テーブルの S3 バケットへのエクスポートを確認しました。

AWS Glue のことがよく分かっていなくても、用意されたテンプレートに従って実行するだけで仕組みを実装することができました。必要に応じてカスタマイズして使用すれば、様々な要件に対応できそうですね。

以上、千葉(幸)がお送りしました。

この記事をシェアする

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.