DynamoDB ストリームから Lambda 関数を呼び出して Kinesis Firehose を経由して S3 バケットに出力してみた

DynamoDB ストリームでキャプチャしたアイテムを Lambda 関数により Kinesis Data Firehose 配信ストリームに引き渡し、最終的に S3 バケットに出力するという仕組みを AWS ブログを参考に試してみました

コンバンハ、千葉(幸)です。

DynamoDB には DynamoDB ストリームという機能があり、DynamoDB テーブルのアイテムに変更があった場合にそれをキャプチャすることができます。

DynamoDB ストリームをイベントとして Lambda 関数を呼び出すことを俗に DynamoDB トリガーと呼びます。

また、DynamoDB テーブルではアイテムに有効期限(TTL)を設けて自動的な削除ができます。TTL により削除されたアイテムも DynamoDB ストリームのキャプチャ対象となるため、「期限切れで削除されたアイテムのみ Lambda 関数を用いて S3 にアーカイブ」ということもできます。

今回は、

  • DynamoDB ストリームで新規に追加されたアイテムをキャプチャし、
  • それをトリガーに Lambda 関数を呼び出し、
  • Kinesis Data Firehose 配信ストリームを通じて S3 バケットに出力する

というパターンを試してみます。

以下のブログを参考にします。

全体像は以下です。

目次

やってみた

今回のステップは以下の通りです。

  1. Lambda 関数用コードの S3 へのアップロード
  2. CloudFormation のデプロイ
  3. DynamoDB テーブルへのアイテム追加
  4. S3 バケットの確認
  5. Athena を用いた分析

使用するスクリプト、 CloudFormation テンプレートはこちらにあるものを使用します。

1. Lambda 関数用コードの S3 へのアップロード

リソースは基本的に次のステップで CloudFormation により一括作成するのですが、一点だけ事前準備が必要です。Lambda 関数にセットするコードを S3 バケットから読み取るようにされているため、そのコードをアップロードします。

以下のスクリプトを zip 化して任意の S3 バケットにアップロードします。

ddb-to-firehose.py

#########################################################################################
#  Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
#  Permission is hereby granted, free of charge, to any person obtaining a copy of this
#  software and associated documentation files (the "Software"), to deal in the Software
#  without restriction, including without limitation the rights to use, copy, modify,
#  merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
#  permit persons to whom the Software is furnished to do so.
#
#  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
#  INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
#  PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
#  HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
#  OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
#  SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#########################################################################################

import os, json, base64, boto3

firehose = boto3.client('firehose')

print('Loading function')

def recToFirehose(streamRecord):
    ddbRecord = streamRecord['NewImage']
    toFirehose = {}
    for c in ddbRecord:
        toFirehose = next(iter(ddbRecord.values()))
    jddbRecord = json.loads(ddbRecord['info']['S'])
    # Transform the record a bit
    try:
        rating = jddbRecord['rating']
    except:
        rating = 0
    try:
        actors = jddbRecord['actors']
    except:
        actors = [' ',' ']
    actor1 = actors[0]
    try:
        actor2 = actor[1]
    except:
        actor2 = ' '
    try:
        genres = jddbRecord['genres']
    except:
        genres = ['','']
    genre1 = genres[0]
    try:
        genre2 = genres[1]
    except:
        genre2 = ' '

    try:
        directors = jddbRecord['directors']
    except:
        directors = [' ',' ']
    director1 = directors[0]
    try:
        director2 = directors[1]
    except:
        director2 = ' '

    toFirehose["actor1"] = actor1
    toFirehose["actor2"] = actor2
    toFirehose["director1"] = director1
    toFirehose["director2"] = director2
    toFirehose["genre1"] = genre1
    toFirehose["genre2"] = genre2
    toFirehose["rating"] = rating
    jtoFirehose = json.dumps(toFirehose)
    response = firehose.put_record(
    DeliveryStreamName=os.environ['DeliveryStreamName'],
    Record= {
                'Data': jtoFirehose + '\n'
            }
        )
    print(response)

def lambda_handler(event, context):
    for record in event['Records']:
        if (record['eventName']) != 'REMOVE':
            recToFirehose(record['dynamodb'])
    return 'Successfully processed {} records.'.format(len(event['Records']))

後続の CloudFormation テンプレートで名称が決め打ちされているため、オブジェクト名はddb-to-firehose.zipである必要があります。

2. CloudFormationのデプロイ

以下のテンプレートを用いて各種リソースをデプロイします。

折り畳み
AWSTemplateFormatVersion: 2010-09-09
Description: >-
  AWS CloudFormation:
Parameters:
  DynamoDBTableName:
    Description: DynamoDB Table Name
    Type: String
    AllowedPattern: '[a-zA-Z0-9]*'
    MinLength: '1'
    MaxLength: '255'
    ConstraintDescription: must contain only alphanumeric characters
  LambdaCodeBucket:
    Description: S3 bucket containing the Lambda function code
    Type: String
Resources:
  myDynamoDBTable:
    Type: 'AWS::DynamoDB::Table'
    Properties:
      TableName: !Ref DynamoDBTableName
      StreamSpecification:
        StreamViewType: NEW_IMAGE
      AttributeDefinitions:
        - AttributeName: Year
          AttributeType: N
        - AttributeName: Title
          AttributeType: S
      KeySchema:
        - AttributeName: Year
          KeyType: HASH
        - AttributeName: Title
          KeyType: RANGE
      ProvisionedThroughput:
        ReadCapacityUnits: 5
        WriteCapacityUnits: 5
      TimeToLiveSpecification:
        Enabled: True
        AttributeName: ExpireTime
  myS3Bucket:
    Type: 'AWS::S3::Bucket'
    Properties:
      PublicAccessBlockConfiguration:
        BlockPublicAcls: true
        BlockPublicPolicy: true
        IgnorePublicAcls: true
        RestrictPublicBuckets: true
  firehoseDeliveryStream:
    DependsOn:
      - deliveryPolicy
    Type: 'AWS::KinesisFirehose::DeliveryStream'
    Properties:
      DeliveryStreamName: !Ref DynamoDBTableName
      ExtendedS3DestinationConfiguration:
        BucketARN: !Join
          - ''
          - - 'arn:aws:s3:::'
            - !Ref myS3Bucket
        BufferingHints:
          IntervalInSeconds: '60'
          SizeInMBs: '1'
        CompressionFormat: UNCOMPRESSED
        Prefix: firehose/
        RoleARN: !GetAtt deliveryRole.Arn
  deliveryRole:
    Type: 'AWS::IAM::Role'
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Sid: ''
            Effect: Allow
            Principal:
              Service: firehose.amazonaws.com
            Action: 'sts:AssumeRole'
            Condition:
              StringEquals:
                'sts:ExternalId': !Ref 'AWS::AccountId'
  deliveryPolicy:
    Type: 'AWS::IAM::ManagedPolicy'
    Properties:
      Description: Managed policy for firehose
      Roles:
        - !Ref deliveryRole
      PolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Action:
              - 's3:AbortMultipartUpload'
              - 's3:GetBucketLocation'
              - 's3:GetObject'
              - 's3:ListBucket'
              - 's3:ListBucketMultipartUploads'
              - 's3:PutObject'
            Resource:
              - !Join
                - ''
                - - 'arn:aws:s3:::'
                  - !Ref myS3Bucket
              - !Join
                - ''
                - - 'arn:aws:s3:::'
                  - !Ref myS3Bucket
                  - '*'
  lambdaExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Sid: ''
            Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: 'sts:AssumeRole'
  ddbToFirehose:
      Type: "AWS::Lambda::Function"
      Properties:
        Handler: "ddb-to-firehose.lambda_handler"
        Role:
          Fn::GetAtt:
            - "lambdaExecutionRole"
            - "Arn"
        Code:
          S3Bucket: !Ref LambdaCodeBucket
          S3Key: "ddb-to-firehose.zip"
        Runtime: "python3.6"
        Timeout: "25"
        Environment:
          Variables:
            DeliveryStreamName: !Ref DynamoDBTableName
  logGroup:
    Type: "AWS::Logs::LogGroup"
    Properties:
      LogGroupName: !Sub "/aws/lambda/${ddbToFirehose}"
  lambdaExecutionPolicy:
    Type: 'AWS::IAM::ManagedPolicy'
    Properties:
      Description: Managed policy for lambda function
      Roles:
        - !Ref lambdaExecutionRole
      PolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Action:
              - 'firehose:PutRecord'
              - 'firehose:PutRecordBatch'
              - 'firehose:UpdateDestination'
            Resource: !GetAtt
               - firehoseDeliveryStream
               - Arn
          - Effect: Allow
            Action:
              - 'logs:CreateLogStream'
              - 'logs:PutLogEvents'
            Resource:
              - !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:${logGroup}:*"
          - Effect: Allow
            Action:
              - 'dynamodb:DescribeStream'
              - 'dynamodb:GetRecords'
              - 'dynamodb:GetShardIterator'
              - 'dynamodb:ListStreams'
            Resource: !GetAtt
               - myDynamoDBTable
               - StreamArn
  EventSourceMapping:
    Type: "AWS::Lambda::EventSourceMapping"
    DependsOn:
      - lambdaExecutionPolicy
    Properties:
      EventSourceArn: !GetAtt
      - myDynamoDBTable
      - StreamArn
      FunctionName: !GetAtt
      - ddbToFirehose
      - Arn
      StartingPosition: "TRIM_HORIZON"
Outputs:
  TableName:
    Value: !Ref myDynamoDBTable
    Description: Table name of the newly created DynamoDB table
  BucketName:
    Value: !Ref myS3Bucket
    Description: My s3 bucket

スタック作成時にパラメータを指定します。

  • DynamoDBTableName
    • このスタックで作成される DynamoDB テーブルの名称
    • 今回はMoviesにしました
  • LambdaCodeBucket
    • 先ほどスクリプトをアップロードしたバケット

今回はスタック名はtest-dynamodb-to-s3で作成しました。

このスタックで作成されるリソースは以下の通りです。

論理 ID タイプ リソース名
myDynamoDBTable AWS::DynamoDB::Table Movies(パラメータで指定した値)
EventSourceMapping AWS::Lambda::EventSourceMapping なし
ddbToFirehose AWS::Lambda::Function <スタック名>-ddbToFirehose-<ランダム文字列>
lambdaExecutionPolicy AWS::IAM::ManagedPolicy <スタック名>-lambdaExecutionPolicy-<ランダム文字列>
lambdaExecutionRole AWS::IAM::Role <スタック名>-lambdaExecutionRole-<ランダム文字列>
logGroup AWS::Logs::LogGroup /aws/lambda/<スタック名>-ddbToFirehose-<ランダム文字列>
firehoseDeliveryStream AWS::KinesisFirehose::DeliveryStream Movies(DynamoDBテーブルと同じ)
deliveryPolicy AWS::IAM::ManagedPolicy <スタック名>-deliveryPolicy-<ランダム文字列>
deliveryRole AWS::IAM::Role <スタック名>-deliveryRole-<ランダム文字列>
myS3Bucket AWS::S3::Bucket <スタック名>-mys3bucket-<ランダム文字列>

Lambda 関数

作成されたリソースを確認しておきます。

トリガーとして DynamoDB ストリームが設定された Lambda 関数が作成されています。コードは手順 1. で設定したものです。

AWS CLI で確認しておきます。

 % aws lambda get-function --function-name <スタック名>-ddbToFirehose-1MELND04EJEIH
{
    "Configuration": {
        "FunctionName": "<スタック名>-ddbToFirehose-1MELND04EJEIH",
        "FunctionArn": "arn:aws:lambda:ap-northeast-1:000000000000:function:<スタック名>-ddbToFirehose-1MELND04EJEIH",
        "Runtime": "python3.6",
        "Role": "arn:aws:iam::000000000000:role/<スタック名>-lambdaExecutionRole-8V65X35YC08Q",
        "Handler": "ddb-to-firehose.lambda_handler",
        "CodeSize": 1418,
        "Description": "",
        "Timeout": 25,
        "MemorySize": 128,
        "LastModified": "2020-09-14T09:03:03.747+0000",
        "CodeSha256": "6GUWMPcAmflOYaBdlxuXPcCowVjb0SSosCoScY664Fk=",
        "Version": "$LATEST",
        "Environment": {
            "Variables": {
                "DeliveryStreamName": "Movies"
            }
        },
        "TracingConfig": {
            "Mode": "PassThrough"
        },
        "RevisionId": "811fda34-0fd5-46ce-9c67-a48da86c653d",
        "State": "Active",
        "LastUpdateStatus": "Successful"
    },
    "Code": {
        "RepositoryType": "S3",
        "Location": "https://awslambda-ap-ne-1-tasks.s3.ap-northeast-1.amazonaws.com/snapshots/000000000000/<スタック名>-ddbToFirehose-1MELND04EJEIH-a9e4d6a5-f6db-43e0-9cf7-fa13509dbe2b
    },
    "Tags": {
        "aws:cloudformation:stack-name": "<スタック名>",
        "aws:cloudformation:stack-id": "arn:aws:cloudformation:ap-northeast-1:000000000000:stack/<スタック名>/0768f760-f669-11ea-a32f-0693e5f01c30",
        "aws:cloudformation:logical-id": "ddbToFirehose"
    }
}

トリガーの設定(イベントソースマッピング)も確認しておきます。

% aws lambda get-event-source-mapping --uuid b83b49e8-5e50-4249-bb77-59970d8d149a
{
    "UUID": "b83b49e8-5e50-4249-bb77-59970d8d149a",
    "BatchSize": 100,
    "MaximumBatchingWindowInSeconds": 0,
    "ParallelizationFactor": 1,
    "EventSourceArn": "arn:aws:dynamodb:ap-northeast-1:000000000000:table/Movies/stream/2020-09-14T09:02:39.713",
    "FunctionArn": "arn:aws:lambda:ap-northeast-1:000000000000:function:<スタック名>-ddbToFirehose-1MELND04EJEIH",
    "LastModified": "2020-09-14T18:05:00+09:00",
    "LastProcessingResult": "No records processed",
    "State": "Enabled",
    "StateTransitionReason": "User action",
    "DestinationConfig": {
        "OnFailure": {}
    },
    "MaximumRecordAgeInSeconds": -1,
    "BisectBatchOnFunctionError": false,
    "MaximumRetryAttempts": -1
}

DynamoDB テーブル

この時点ではアイテムは登録されておらず、テーブルは空です。

DynamoDB ストリームと TTL が有効になっています。

TTL で指定されている属性ExpireTimeは、後続の手順で DynamoDB テーブルにアイテムを追加する際に付加するものです。

こちらも AWS CLI で確認します。

% aws dynamodb describe-table --table-name Movies
{
    "Table": {
        "AttributeDefinitions": [
            {
                "AttributeName": "Title",
                "AttributeType": "S"
            },
            {
                "AttributeName": "Year",
                "AttributeType": "N"
            }
        ],
        "TableName": "Movies",
        "KeySchema": [
            {
                "AttributeName": "Year",
                "KeyType": "HASH"
            },
            {
                "AttributeName": "Title",
                "KeyType": "RANGE"
            }
        ],
        "TableStatus": "ACTIVE",
        "CreationDateTime": "2020-09-14T18:02:39.713000+09:00",
        "ProvisionedThroughput": {
            "NumberOfDecreasesToday": 0,
            "ReadCapacityUnits": 5,
            "WriteCapacityUnits": 5
        },
        "TableSizeBytes": 0,
        "ItemCount": 0,
        "TableArn": "arn:aws:dynamodb:ap-northeast-1:000000000000:table/Movies",
        "TableId": "d69d5d14-099b-4df6-9b13-9951d88e4139",
        "StreamSpecification": {
            "StreamEnabled": true,
            "StreamViewType": "NEW_IMAGE"
        },
        "LatestStreamLabel": "2020-09-14T09:02:39.713",
        "LatestStreamArn": "arn:aws:dynamodb:ap-northeast-1:000000000000:table/Movies/stream/2020-09-14T09:02:39.713"
    }
}

TTL を確認する際は別のコマンドです。

% aws dynamodb describe-time-to-live --table-name Movies
{
    "TimeToLiveDescription": {
        "TimeToLiveStatus": "ENABLED",
        "AttributeName": "ExpireTime"
    }
}

DynamoDB ストリームを確認する際も別コマンドです。ストリームの ARN を確認するために一旦 list します。

% aws dynamodbstreams list-streams
{
    "Streams": [
        {
            "StreamArn": "arn:aws:dynamodb:ap-northeast-1:000000000000:table/Movies/stream/2020-09-14T09:02:39.713",
            "TableName": "Movies",
            "StreamLabel": "2020-09-14T09:02:39.713"
        },
        {
            "StreamArn": "arn:aws:dynamodb:ap-northeast-1:000000000000:table/Movies/stream/2020-09-14T08:49:40.941",
            "TableName": "Movies",
            "StreamLabel": "2020-09-14T08:49:40.941"
        }
    ]
}

最新のストリームの ARN を指定して describe します。

% aws dynamodbstreams describe-stream --stream-arn arn:aws:dynamodb:ap-northeast-1:000000000000:table/Movies/stream/2020-09-14T09:02:39.713
{
    "StreamDescription": {
        "StreamArn": "arn:aws:dynamodb:ap-northeast-1:000000000000:table/Movies/stream/2020-09-14T09:02:39.713",
        "StreamLabel": "2020-09-14T09:02:39.713",
        "StreamStatus": "ENABLED",
        "StreamViewType": "NEW_IMAGE",
        "CreationRequestDateTime": "2020-09-14T18:02:39.713000+09:00",
        "TableName": "Movies",
        "KeySchema": [
            {
                "AttributeName": "Year",
                "KeyType": "HASH"
            },
            {
                "AttributeName": "Title",
                "KeyType": "RANGE"
            }
        ],
        "Shards": [
            {
                "ShardId": "shardId-00000001600074162263-9d9e0ab8",
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "100000000007133185793"
                }
            }
        ]
    }
}

Kinesis Data Firehose 配信ストリーム

Firehose のストリームです。設定画面が縦に長くて収まらないですが、ソースは設定なしで、宛先としてCloudFormation でデプロイされた S3 バケットが指定されています。

AWS CLI での確認結果です。

% aws firehose describe-delivery-stream --delivery-stream-name Movies
{
    "DeliveryStreamDescription": {
        "DeliveryStreamName": "Movies",
        "DeliveryStreamARN": "arn:aws:firehose:ap-northeast-1:000000000000:deliverystream/Movies",
        "DeliveryStreamStatus": "ACTIVE",
        "DeliveryStreamEncryptionConfiguration": {
            "Status": "DISABLED"
        },
        "DeliveryStreamType": "DirectPut",
        "VersionId": "1",
        "CreateTimestamp": "2020-09-14T18:03:29.367000+09:00",
        "Destinations": [
            {
                "DestinationId": "destinationId-000000000001",
                "S3DestinationDescription": {
                    "RoleARN": "arn:aws:iam::000000000000:role/<スタック名>-deliveryRole-76EH8KWJBO5W",
                    "BucketARN": "arn:aws:s3:::<スタック名>-mys3bucket-1ounjvcs4v5ka",
                    "Prefix": "firehose/",
                    "BufferingHints": {
                        "SizeInMBs": 1,
                        "IntervalInSeconds": 60
                    },
                    "CompressionFormat": "UNCOMPRESSED",
                    "EncryptionConfiguration": {
                        "NoEncryptionConfig": "NoEncryption"
                    },
                    "CloudWatchLoggingOptions": {
                        "Enabled": false
                    }
                },
                "ExtendedS3DestinationDescription": {
                    "RoleARN": "arn:aws:iam::000000000000:role/<スタック名>-deliveryRole-76EH8KWJBO5W",
                    "BucketARN": "arn:aws:s3:::<スタック名>-mys3bucket-1ounjvcs4v5ka",
                    "Prefix": "firehose/",
                    "BufferingHints": {
                        "SizeInMBs": 1,
                        "IntervalInSeconds": 60
                    },
                    "CompressionFormat": "UNCOMPRESSED",
                    "EncryptionConfiguration": {
                        "NoEncryptionConfig": "NoEncryption"
                    },
                    "CloudWatchLoggingOptions": {
                        "Enabled": false
                    },
                    "S3BackupMode": "Disabled"
                }
            }
        ],
        "HasMoreDestinations": false
    }
}

3. DynamoDB テーブルへのアイテム追加

スクリプトを用いて DynamoDB テーブルへのアイテム追加を行います。

任意の環境で実行可能ですが、私は手元の端末から実行しました。

スクリプト

以下のスクリプトを用います。

ハイライト部の指定により、TTL の属性ExpireTImeの挿入をしています。ここではアイテムの作成時刻から 1 時間で有効期限を迎えます。

LoadMovieData.py

#########################################################################################
#  Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
#  Permission is hereby granted, free of charge, to any person obtaining a copy of this
#  software and associated documentation files (the "Software"), to deal in the Software
#  without restriction, including without limitation the rights to use, copy, modify,
#  merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
#  permit persons to whom the Software is furnished to do so.
#
#  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
#  INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
#  PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
#  HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
#  OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
#  SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#########################################################################################

import os, sys, time, decimal
from decimal import *
import boto3
import json
dynamodb = boto3.resource('dynamodb', region_name='ap-northeast-1')
table = dynamodb.Table('Movies')


def loadfile(infile):
    jsonobj = json.load(open(infile))
    lc = 1
    for movie in jsonobj:
        lc += 1
        CreateTime = int(time.time())
        ExpireTime = CreateTime + (1* 60* 60)
        response = table.put_item(
           Item={
                'Year': decimal.Decimal(movie['year']),
                'Title': movie['title'],
                'info': json.dumps(movie['info']),
                'CreateTime': CreateTime,
                'ExpireTime': ExpireTime
            }
        )
        if (lc % 10) == 0:
            print ("%d rows inserted" % (lc))

if __name__ == '__main__':
    filename = sys.argv[1]
    if os.path.exists(filename):
        # file exists, continue
        loadfile(filename)
    else:
        print ('Please enter a valid filename')

スクリプトの実行環境において、 boto3 を実行可能である必要があることに注意してください。

また、22-23行目でアイテムの追加対象の DynamoDB テーブルを定義しています。リージョンやテーブル名など、適宜必要に応じて修正してください。

データの元ネタ(JSON)

スクリプトにより DynamoDB テーブルに追加するデータは、moviedata.jsonとして用意されています。

以下のような形式になっています。

moviedata.json

[
    {
        "year": 2013,
        "title": "Rush",
        "info": {
            "directors": ["Ron Howard"],
            "release_date": "2013-09-02T00:00:00Z",
            "rating": 8.3,
            "genres": [
                "Action",
                "Biography",
                "Drama",
                "Sport"
            ],
            "image_url": "http://ia.media-imdb.com/images/M/MV5BMTQyMDE0MTY0OV5BMl5BanBnXkFtZTcwMjI2OTI0OQ@@._V1_SX400_.jpg",
            "plot": "A re-creation of the merciless 1970s rivalry between Formula One rivals James Hunt and Niki Lauda.",
            "rank": 2,
            "running_time_secs": 7380,
            "actors": [
                "Daniel Bruhl",
                "Chris Hemsworth",
                "Olivia Wilde"
            ]
        }
    },
    {
        "year": 2013,
        "title": "Prisoners",
        "info": {
            "directors": ["Denis Villeneuve"],
            "release_date": "2013-08-30T00:00:00Z",
            "rating": 8.2,
            "genres": [
                "Crime",
                "Drama",
                "Thriller"
            ],
            "image_url": "http://ia.media-imdb.com/images/M/MV5BMTg0NTIzMjQ1NV5BMl5BanBnXkFtZTcwNDc3MzM5OQ@@._V1_SX400_.jpg",
            "plot": "When Keller Dover's daughter and her friend go missing, he takes matters into his own hands as the police pursue multiple leads and the pressure mounts. But just how far will this desperate father go to protect his family?",
            "rank": 3,
            "running_time_secs": 9180,
            "actors": [
                "Hugh Jackman",
                "Jake Gyllenhaal",
                "Viola Davis"
            ]
        }
    },
 ---以下略---

以下から zip 形式でダウンロード可能です。展開すると 3.7MB あるので、一部のみ抽出して使用するのもアリかもしれません。

https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/samples/moviedata.zip

スクリプトの実行

今回実行する端末では python3、boto3 は以下のバージョンです。

% python3 --version
Python 3.7.3
% pip3 list
Package         Version
--------------- -------
boto3           1.14.60
botocore        1.17.60
docutils        0.15.2
jmespath        0.10.0
pip             19.0.3
python-dateutil 2.8.1
s3transfer      0.3.3
setuptools      40.8.0
six             1.12.0
urllib3         1.25.10
wheel           0.33.1

スクリプトの引数として JSON ファイルを指定して実行します。

% python3 LoadMovieData.py moviedata.json
10 rows inserted
20 rows inserted
30 rows inserted
40 rows inserted
……
4610 rows inserted

DynamoDB テーブルへの追加の進捗状況が逐次表示されます。 JSON ファイルをそのまま使用すると結構なデータ量になりました。

DynamoDB には以下のようにアイテムが追加されています。

ExpireTIme属性には (TTL)の表示があります。ここでは UNIX 時間が指定されており、この時間を過ぎると削除されます。( バックグラウンドで処理され、期限切れから48時間以内に削除。)

(ここでは TTL が CreationTimeから 1時間後で設定されているので、翌朝覗いた際には跡形もなくアイテムが無くなっていました。)

4. S3 バケットの確認

DynamoDB ストリーム -> Lambda 関数 -> Firehose 配信ストリームを経由して、S3 バケットにはこのような形でアウトプットが出力されています。

ファイルの中身を抜粋すると、以下のような形式でテーブルの中身がエクスポートされています。

{"Year": "2013", "CreateTime": "1600080235", "Title": "Rush", "ExpireTime": "1600083835", "info": "{\"directors\": [\"Ron Howard\"], \"release_date\": \"2013-09-02T00:00:00Z\", \"rating\": 8.3, \"genres\": [\"Action\", \"Biography\", \"Drama\", \"Sport\"], \"image_url\": \"http://ia.media-imdb.com/images/M/MV5BMTQyMDE0MTY0OV5BMl5BanBnXkFtZTcwMjI2OTI0OQ@@._V1_SX400_.jpg\", \"plot\": \"A re-creation of the merciless 1970s rivalry between Formula One rivals James Hunt and Niki Lauda.\", \"rank\": 2, \"running_time_secs\": 7380, \"actors\": [\"Daniel Bruhl\", \"Chris Hemsworth\", \"Olivia Wilde\"]}", "actor1": "Daniel Bruhl", "actor2": " ", "director1": "Ron Howard", "director2": " ", "genre1": "Action", "genre2": "Biography", "rating": 8.3}
{"Year": "2013", "CreateTime": "1600080236", "Title": "Prisoners", "ExpireTime": "1600083836", "info": "{\"directors\": [\"Denis Villeneuve\"], \"release_date\": \"2013-08-30T00:00:00Z\", \"rating\": 8.2, \"genres\": [\"Crime\", \"Drama\", \"Thriller\"], \"image_url\": \"http://ia.media-imdb.com/images/M/MV5BMTg0NTIzMjQ1NV5BMl5BanBnXkFtZTcwNDc3MzM5OQ@@._V1_SX400_.jpg\", \"plot\": \"When Keller Dover's daughter and her friend go missing, he takes matters into his own hands as the police pursue multiple leads and the pressure mounts. But just how far will this desperate father go to protect his family?\", \"rank\": 3, \"running_time_secs\": 9180, \"actors\": [\"Hugh Jackman\", \"Jake Gyllenhaal\", \"Viola Davis\"]}", "actor1": "Hugh Jackman", "actor2": " ", "director1": "Denis Villeneuve", "director2": " ", "genre1": "Crime", "genre2": "Drama", "rating": 8.2}

今回の構成では、 TTL による有効期限切れとなったアイテムは Lambda 関数の処理対象となっていない( Lambda 関数は Invocate されるが Firehose に受け渡さない)ため、新規に追加されたアイテムのみが S3 バケットに出力されています。

TTL により削除されたアイテムがストリームにキャプチャされた場合、Lambda に引き渡されるレコードイベントに以下のような項目が含まれます。この値を活用して、TTL による削除が行われたアイテムのみをエクスポート対象とする、ということも可能です。

"Records":[
    {
        ...

        "userIdentity":{
            "type":"Service",
            "principalId":"dynamodb.amazonaws.com"
        }

        ...

    }
]

5. Athena を用いた分析

S3 バケットに出力されたレコードに対して、Athena を用いてクエリを実行することができます。

任意のデータベースを選択し、以下を実行することでテーブルを作成します。LOCATIONは上記で確認した出力先バケットの任意のパスを指定してください。

CREATE EXTERNAL TABLE `movies`(
  `year` int,
  `createtime` int,
  `title` string,
  `expiretime` int,
  `info` string,
  `actor1` string,
  `actor2` string,
  `director1` string,
  `director2` string,
  `genre1` string,
  `genre2` string,
  `rating` double)
ROW FORMAT SERDE 
  'org.openx.data.jsonserde.JsonSerDe' 
LOCATION
  's3://<スタック名>-mys3bucket-1ounjvcs4v5ka/firehose/2020/09/14/10/'

今回はdefaultデータベース上でmoviesというテーブルを作成しました。

テーブルに対して任意のクエリを実行できます。例えば、genre1の平均評価を確認するクエリは以下です。

SELECT genre1, avg(rating) as avg_rating FROM "default"."movies" group by genre1 order by avg_rating desc;

一通りの動作確認ができました。

終わりに

DynamoDB ストリームでキャプチャしたアイテムを、Lambda 関数と Kinesis Data Firehose 配信ストリームを通じて S3 バケットに出力する構成を試してみました。

Lambda 関数のコード、各種リソースの CloudFormation テンプレートだけでなく DynamoDB テーブルに追加するアイテムのサンプル(しかもそれなりに大規模)が用意されていたのも助かります。

DynamoDB ストリームの使用感を知りたい方の参考になれば幸いです。

以上、千葉(幸)がお送りしました。