この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
コンバンハ、千葉(幸)です。
DynamoDB テーブルの中身を S3 バケットにエクスポートしたい、という場合があるかと思います。S3 にエクスポートしたものに対して、例えば Athena を利用して解析をかけたい、といったケースです。
AWS Glue や AWS Step Functions を利用して、定期的にエクスポートを行う仕組みについて以下のブログで紹介されているので、試してみました。 CloudFormation テンプレートや スクリプトが用意されているので、一通り流すだけでできます。
全体像としては以下のイメージです。
執筆時点より一年以上前のブログを参考にしていることもあり、サービスアップデートによりバッドプラクティスになってしまっている箇所があります。社内でフィードバックを受けましたので、数カ所で補足をします。
- Data Pipeline, EMR, Glue の比較
- Step Functions の Glue Workflow への置き換え
- Glue ETL スクリプトの 各種バージョン
- Glue ETL スクリプトのDynamoDB 読み取り並列度
目次
- DynamoDB から S3 へのバックアップのパターン
- やってみた
- 1. DynamoDB テーブルの作成
- 2. 共通スタックの作成
- 3. テーブルエクスポートスタックの作成
- デプロイされたリソースの確認
- 4. DynamoDB から S3 へのエクスポートの実行
- 5. エクスポート後の内容の確認
- 終わりに
DynamoDB から S3 へのバックアップのパターン
やってみる前に、全体像のおさらいをしておきます。
DynamoDB の中身を S3 にバックアップ・エクスポートする手法について、以下にまとまっています。
DynamoDB 標準機能によるバックアップ
ユーザー側で作り込みをしなくとも、DynamoDB では標準のバックアップの仕組みが用意されています。
- オンデマンドバックアップ
- ポイントインタイムリカバリ(継続的なバックアップ)
これらの機能により取得されたバックアップは S3 に保存されますが、ユーザーが該当 S3 にアクセスすることはできません。
ユーザー側での操作による DynamoDB バックアップ(エクスポート)
ユーザーがアクセスできる S3 バケットへ DynamoDB テーブルをエクスポートする手段として、以下が紹介されています。
- Data Pipeline
- メリット:最も簡単。AWS リソースの使用をできる限り少なく抑えながら1回限りのバックアップを作成する際に適する
- デメリット:他の方法と比較してカスマイズ性が乏しい。直近でのサービスアップデートが行われていない
- Amazon EMR
- メリット:詳細な設定が可能
- デメリット:Amazon EMR に対する習熟が必要
- AWS Glue
- メリット:Athena など他のサービスでも利用できる自動的かつ継続的なバックアップを行う際のベストプラクティス
- デメリット:AWS Glue に対する知見が必要。 Data Pipeline より料金がかかる
今回やってみるのは、一番最後の AWS Glue を使用するパターンです。
Glue はサーバーレスであるため、インフラのメンテナンスが必要ない点、高速に起動する点(Glue 2.0の場合)にメリットがあります。Data Pipeline は直近でのサービスアップデートはほとんど行われていないため、これから採用を検討する際には Glue を選択いただくのがよいかと思います。
参考までに、 Data Pipeline の使用イメージについては、以下エントリをご参照ください。
やってみた
今回は以下のステップに分けて実行していきます。基本的には提供されている CloudFormation テンプレートを使用します。
- DynamoDB テーブルの作成
- 共通スタックの作成
- テーブルエクスポートスタックの作成
- DynamoDB から S3 へのエクスポートの実行
- エクスポート後の内容の確認
1. DynamoDB テーブルの作成
まずはエクスポート元となる DynamoDB を準備します。すでに適当な DynamoDB がある場合は、このステップは飛ばして問題ありません。
今回は、以下のエントリで紹介されている手順に則り作成します。
ブログで紹介されている、以下のテンプレートを用いて CloudFormation スタックをデプロイします。
(ブログのボタンを押下して CloudFormation の画面に遷移した場合、バージニア北部が選択されている状態のため、必要に応じて別のリージョンに切り替えます。以降の手順でも同様です。)
Description: An example DynamoDB table for storing Reviews on books
Resources:
ReviewsTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: Reviews
KeySchema:
- AttributeName: User
KeyType: HASH
- AttributeName: Book
KeyType: RANGE
AttributeDefinitions:
- AttributeName: User
AttributeType: S
- AttributeName: Book
AttributeType: S
ProvisionedThroughput:
ReadCapacityUnits: 5
WriteCapacityUnits: 5
ここで作成されるリソースは以下です。
論理 ID | タイプ | リソース名 |
---|---|---|
ReviewsTable | AWS::DynamoDB::Table | Reviews |
デプロイされた結果を確認すると。このようになっています。
AWS CLI でも確認しておきます。
% aws dynamodb describe-table --table-name Reviews
{
"Table": {
"AttributeDefinitions": [
{
"AttributeName": "Book",
"AttributeType": "S"
},
{
"AttributeName": "User",
"AttributeType": "S"
}
],
"TableName": "Reviews",
"KeySchema": [
{
"AttributeName": "User",
"KeyType": "HASH"
},
{
"AttributeName": "Book",
"KeyType": "RANGE"
}
],
"TableStatus": "ACTIVE",
"CreationDateTime": "2020-09-11T20:41:41.764000+09:00",
"ProvisionedThroughput": {
"NumberOfDecreasesToday": 0,
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5
},
"TableSizeBytes": 0,
"ItemCount": 0,
"TableArn": "arn:aws:dynamodb:ap-northeast-1:000000000000:table/Reviews",
"TableId": "4db9731f-1934-4964-abcc-da9de06207c5"
}
}
テーブルにアイテムを追加する
現状テーブルは空のままなので、アイテムを追加します。
[ 項目 ]-> [項目の作成 ]を選択し……
[ Text ]を選択し、内容を入力して[保存]を押下します。
なお、ここで入力するのは以下の値です。
{
"User": "Tristan",
"Book": "Harry Potter and the Philosopher's Stone",
"Rating": 5,
"Review": "A thrilling journey through the world of Hogwarts",
"Author": "J.K.Rowling"
}
これをもう一度行い、2 項目を登録した状態にします。
2 回目に登録した内容は以下です。
{
"User": "Adeline",
"Book": "Harry Potter and the Sorcerer's Stone",
"Rating": 4,
"Review": "Harry is pretty brave, but Hermione is the clear hero",
"Author": "J.K.Rowling"
}
これで DynamoDB の準備ができました。
2. 共通スタックの作成
冒頭のブログの一つ目のテンプレートを用いて実装します。ここで作成されるリソースは以下です。
論理 ID | タイプ | リソース名 |
---|---|---|
DynamoDBExportStateMachine | AWS::StepFunctions::StateMachine | DynamoDBExportAndAthenaLoad |
StateExecutionRole | AWS::IAM::Role | AWSBigDataBlog-GlueDynamoExport-StateExecutionRole-<ランダム文字列> |
StartGlueCrawler | AWS::Lambda::Function | AWSBigDataBlog-GlueDynamoExportTa-StartGlueCrawler-<ランダム文字列> |
GetGlueCrawlerStatus | AWS::Lambda::Function | AWSBigDataBlog-GlueDynamoExpo-GetGlueCrawlerStatus-<ランダム文字列> |
CreateCurrentView | AWS::Lambda::Function | AWSBigDataBlog-GlueDynamoExportT-CreateCurrentView-<ランダム文字列> |
GetCurrentViewStatus | AWS::Lambda::Function | AWSBigDataBlog-GlueDynamoExpo-GetCurrentViewStatus-<ランダム文字列> |
LambdaRole | AWS::IAM::Role | AWSBigDataBlog-GlueDynamoExportTableCom-LambdaRole-<ランダム文字列> |
DynamoDBExportsBucket | AWS::S3::Bucket | dynamodb-exports-<アカウント番号>-<リージョン> |
EventTriggerRole | AWS::IAM::Role | AWSBigDataBlog-GlueDynamoExportTa-EventTriggerRole-<ランダム文字列> |
GlueCrawlerAndJobRole | AWS::IAM::Role | AWSGlueServiceRoleDefault |
念のためテンプレートも載せておきます。
折り畳み
Description: Template for setting up common infrastructure for exporting DynamoDB tables to S3 with Step Functions
Resources:
DynamoDBExportsBucket:
Type: "AWS::S3::Bucket"
Properties:
BucketName:
Fn::Sub: "dynamodb-exports-${AWS::AccountId}-${AWS::Region}"
GlueCrawlerAndJobRole:
Type: AWS::IAM::Role
Properties:
# Referenced the following documentation while creating this IAM Role
# http://docs.aws.amazon.com/glue/latest/dg/create-an-iam-role.html
RoleName: AWSGlueServiceRoleDefault
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: glue.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/AmazonS3FullAccess
- arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole
- arn:aws:iam::aws:policy/AWSGlueConsoleFullAccess
- arn:aws:iam::aws:policy/AmazonDynamoDBReadOnlyAccess
LambdaRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Action:
- sts:AssumeRole
Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
ManagedPolicyArns:
- arn:aws:iam::aws:policy/AWSLambdaExecute
- arn:aws:iam::aws:policy/AmazonAthenaFullAccess
Policies:
- PolicyName: GlueJobTrigger
PolicyDocument:
Version: "2012-10-17"
Statement:
- Action:
- glue:StartCrawler
- glue:GetCrawler
- glue:GetCrawlerMetrics
- athena:StartQueryExecution
- s3:ListBucket
Effect: Allow
Resource: "*"
EventTriggerRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Action:
- sts:AssumeRole
Effect: Allow
Principal:
Service:
- events.amazonaws.com
Policies:
- PolicyName: GlueJobTrigger
PolicyDocument:
Version: "2012-10-17"
Statement:
- Action:
- states:StartExecution
Effect: Allow
Resource: "*"
StateExecutionRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Action:
- sts:AssumeRole
Effect: Allow
Principal:
Service:
- states.amazonaws.com
Policies:
- PolicyName: GlueJobTrigger
PolicyDocument:
Version: "2012-10-17"
Statement:
- Action:
- "lambda:InvokeFunction"
- "glue:StartJobRun"
- "glue:GetJobRun"
- "glue:GetJobRuns"
- "glue:BatchStopJobRun"
Effect: Allow
Resource: "*"
DynamoDBExportStateMachine:
Type: "AWS::StepFunctions::StateMachine"
Properties:
StateMachineName: DynamoDBExportAndAthenaLoad
RoleArn:
Fn::GetAtt:
- StateExecutionRole
- Arn
DefinitionString:
Fn::Sub:
- |-
{
"StartAt": "StartJobRun",
"States": {
"StartJobRun": {
"Type": "Task",
"ResultPath": "$.glueresult",
"Resource": "arn:aws:states:::glue:startJobRun.sync",
"Parameters": {
"JobName.$": "$.glue_job_name",
"Arguments": {
"--table_name.$": "$.table_name",
"--read_percentage.$": "$.read_percentage",
"--output_prefix.$": "$.output_prefix",
"--output_format.$": "$.output_format"
}
},
"Next": "Start Crawler",
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "Export Failed"
}
]
},
"Export Failed": {
"Type": "Fail",
"Cause": "One or more steps could not complete successfully",
"Error": "ExportFailed"
},
"Start Crawler": {
"Type": "Task",
"Resource": "${StartGlueCrawler}",
"Next": "Wait for Crawler",
"Retry": [
{
"ErrorEquals": ["Lambda.ServiceException"],
"IntervalSeconds": 3,
"MaxAttempts": 3,
"BackoffRate": 1.5
}
]
},
"Wait for Crawler": {
"Type": "Wait",
"Seconds": 60,
"Next": "Get Crawler Status"
},
"Get Crawler Status": {
"Type": "Task",
"Resource": "${GetGlueCrawlerStatus}",
"Next": "Crawler Finished?"
},
"Crawler Finished?": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.glue_crawler_status",
"StringEquals": "SUCCEEDED",
"Next": "Create Current View"
},
{
"Variable": "$.glue_crawler_status",
"StringEquals": "CANCELLED",
"Next": "Export Failed"
},
{
"Variable": "$.glue_crawler_status",
"StringEquals": "FAILED",
"Next": "Export Failed"
}
],
"Default": "Wait for Crawler"
},
"Create Current View": {
"Type": "Task",
"Resource": "${CreateCurrentView}",
"Next": "Wait for Current View",
"Retry": [
{
"ErrorEquals": ["Lambda.ServiceException"],
"IntervalSeconds": 3,
"MaxAttempts": 3,
"BackoffRate": 1.5
}
]
},
"Wait for Current View": {
"Type": "Wait",
"Seconds": 5,
"Next": "Get Current View Status"
},
"Get Current View Status": {
"Type": "Task",
"Resource": "${GetCurrentViewStatus}",
"Next": "Create Current View Finished?",
"Retry": [
{
"ErrorEquals": ["Lambda.ServiceException"],
"IntervalSeconds": 3,
"MaxAttempts": 3,
"BackoffRate": 1.5
}
]
},
"Create Current View Finished?": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.athena_query_status",
"StringEquals": "SUCCEEDED",
"Next": "Done"
},
{
"Variable": "$.athena_query_status",
"StringEquals": "FAILED",
"Next": "Export Failed"
},
{
"Variable": "$.athena_query_status",
"StringEquals": "CANCELLED",
"Next": "Export Failed"
}
],
"Default": "Wait for Current View"
},
"Done": {
"Type": "Pass",
"End": true
}
}
}
- StartGlueCrawler:
Fn::GetAtt:
- StartGlueCrawler
- Arn
GetGlueCrawlerStatus:
Fn::GetAtt:
- GetGlueCrawlerStatus
- Arn
CreateCurrentView:
Fn::GetAtt:
- CreateCurrentView
- Arn
GetCurrentViewStatus:
Fn::GetAtt:
- GetCurrentViewStatus
- Arn
StartGlueCrawler:
Type: AWS::Lambda::Function
Properties:
Handler: "index.crawler_trigger"
Runtime: python3.6
Timeout: "60"
Role:
Fn::GetAtt:
- LambdaRole
- Arn
Code:
ZipFile:
|-
import logging
import datetime
import boto3
LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)
KEY_CRAWLER_NAME = 'crawler_name'
GLUE_CLIENT = boto3.client('glue')
def crawler_trigger(event, context):
crawler_name = event[KEY_CRAWLER_NAME]
GLUE_CLIENT.start_crawler(
Name=crawler_name
)
return event
GetGlueCrawlerStatus:
Type: AWS::Lambda::Function
Properties:
Handler: "index.lambda_handler"
Runtime: python3.6
Role:
Fn::GetAtt:
- LambdaRole
- Arn
Code:
ZipFile:
|-
import json
import logging
import datetime
from dateutil.parser import parse
import boto3
LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)
GLUE_CLIENT = boto3.client('glue')
KEY_GLUE_CRAWLER_NAME = "crawler_name"
KEY_GLUE_CRAWLER_STATUS = "glue_crawler_status"
KEY_STARTED_ON = "StartedOn"
KEY_GLUE_RESULT = "glueresult"
def lambda_handler(event, context):
crawler_name = event[KEY_GLUE_CRAWLER_NAME]
resp = GLUE_CLIENT.get_crawler(Name=crawler_name)
print(resp)
# In the case of an early exit make sure the key is present but the value
# is a no op
event[KEY_GLUE_CRAWLER_STATUS] = ""
last_crawl = resp['Crawler'].get('LastCrawl')
if last_crawl is None:
return event
last_crawl_start = last_crawl.get('StartTime')
# Timestamp is stored as milliseconds since the epoch
glue_job_start = datetime.datetime.fromtimestamp(event[KEY_GLUE_RESULT][KEY_STARTED_ON] / 1000)
# If the last crawl is before the snapshot_timestamp then we're still waiting
# for the current run we've initiated to finish
if last_crawl_start is None or last_crawl_start.timestamp() < glue_job_start.timestamp():
return event
event[KEY_GLUE_CRAWLER_STATUS] = last_crawl['Status']
return event
CreateCurrentView:
Type: AWS::Lambda::Function
Properties:
Handler: "index.create_current_view_trigger"
Runtime: python3.6
Timeout: 30
Role:
Fn::GetAtt:
- LambdaRole
- Arn
Code:
ZipFile:
|-
import logging
import os
import boto3
LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)
KEY_TABLE_NAME = "table_name"
SNAPHSHOT_TIMESTAMP = "snapshot_timestamp"
KEY_OUTPUT_PREFIX = "output_prefix"
ATHENA_CLIENT = boto3.client('athena')
S3_CLIENT = boto3.client('s3')
def create_current_view_trigger(event, context):
table_name = event[KEY_TABLE_NAME].lower().replace("-", "_")
output_prefix = event[KEY_OUTPUT_PREFIX]
snapshot_timestamp = get_snapshot_timestamp(output_prefix)
query = f"CREATE OR REPLACE VIEW \"dynamodb_exports\".\"{table_name}\" AS SELECT * FROM \"dynamodb_exports\".\"snapshots_{table_name}\" WHERE {SNAPHSHOT_TIMESTAMP} = '{snapshot_timestamp}';"
LOGGER.info(query)
# Athena always has permissions to write to the bucket of the following structure
account_id = get_account_id(context)
region = os.environ['AWS_REGION']
output = f"s3://aws-athena-query-results-{account_id}-{region}"
# It's not uncommon for the start_query_execution response to return a successful response
# and then for the query to fail asynchronously. Check the Athena Query History to double check.
resp = ATHENA_CLIENT.start_query_execution(
QueryString=query,
ResultConfiguration={
'OutputLocation': output
}
)
LOGGER.info(resp)
event['athena_query_execution_id'] = resp['QueryExecutionId']
return event
# AWS Account ID is not available in the environment, but you can get it breaking a part the Lambda
# arn
def get_account_id(context):
return context.invoked_function_arn.split(":")[4]
def get_snapshot_timestamp(output_prefix):
split = output_prefix.split("/")
bucket = split[2]
fmt = split[3]
table = split[4]
# The trailing slash is really important as it allows us to query S3 for the CommonPrefixes
# which can be thought of as the next "folder" after a given prefix.
prefix = f"{fmt}/{table}/"
# Not sure if there can be so many prefixes that there's a continuation token. For now
# assuming that there is not
resp = S3_CLIENT.list_objects_v2(
Bucket=bucket,
Delimiter='/',
Prefix=prefix
)
# Each prefix will look like 'parquet/Reviews/snapshot_timestamp=2019-01-11T06:15/'
prefixes = [x['Prefix'].split('/')[2].split('=')[1] for x in resp['CommonPrefixes']]
# There's no documentation on what order the CommonPrefixes come on so assume none, but we can
# assume they're lexographically sortable give how the snapshot timestamp is stored
prefixes.sort(reverse=True)
# If the state machine has gotten this far there will always be at least one prefix
return prefixes[0]
GetCurrentViewStatus:
Type: AWS::Lambda::Function
Properties:
Handler: "index.lambda_handler"
Runtime: python3.6
Role:
Fn::GetAtt:
- LambdaRole
- Arn
Code:
ZipFile:
|-
import logging
import os
import boto3
LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)
ATHENA_CLIENT = boto3.client('athena')
def lambda_handler(event, context):
query_id = event['athena_query_execution_id']
resp = ATHENA_CLIENT.get_query_execution(QueryExecutionId=query_id)
event['athena_query_status'] = resp['QueryExecution']['Status']['State']
return event
Outputs:
DynamoDBExportsBucket:
Description: S3 bucket for DynamoDB exports
Export:
Name: DynamoDBExportsBucket
Value:
Ref: DynamoDBExportsBucket
GlueCrawlerAndJobRole:
Description: IAM Role for Glue Crawlers and Jobs
Export:
Name: GlueCrawlerAndJobRole
Value:
Ref: GlueCrawlerAndJobRole
EventTriggerRole:
Description: IAM Role for CloudWatch Events
Export:
Name: EventTriggerRole
Value:
Fn::GetAtt:
- EventTriggerRole
- Arn
ExportStateMachineArn:
Description: ARN for the Export State Machine
Export:
Name: ExportStateMachineArn
Value:
Ref: DynamoDBExportStateMachine
(追記ここから)
現在は、Glue ジョブやクローラーの依存関係や定期実行は Glue Workflow ですべて管理できるようになっています。
Glue Workflow に置き換えることで、本エントリの構成における Step Functions, Lambda関数、CloudWatch Events ルールはいずれも不要になり、シンプルな構成かつ料金も下がります。
Glue Workflow については以下をあわせてご参照ください。
(追記ここまで)
3. テーブルエクスポートスタックの作成
冒頭のブログの二つ目のテンプレートを用いて実装します。ここで作成されるリソースは以下です。
論理 ID | タイプ | リソース名 |
---|---|---|
ExportConverterGlueJob | AWS::Glue::Job | ReviewsExportTo<フォーマット名> |
GlueCrawler | AWS::Glue::Crawler | ReviewscsvCrawler |
Trigger | AWS::Events::Rule | AWSBigDataBlog-GlueDynamoExportTableExport-Trigger-<ランダム文字列> |
なお、このテンプレートでは、デプロイ時に指定するパラメータが定義されています。
- MaxConsumedReadThroughput:読み取り容量の消費割合の上限
- OutputFormat:アウトプットのフォーマット
- TableName:エクスポート対象の DynamoDB テーブル名
今回は csv フォーマットでエクスポートしてみます。
こちらもテンプレートを載せておきます。
折り畳み
Description: Export a DynamoDB table to S3 with a Glue Job
Parameters:
TableName:
Type: String
Description: DynamoDB Table Name to export
MaxConsumedReadThroughput:
Type: Number
Description: The maximum amount of Read Capacity Units the export is allowed to consumed expressed as a percentage
MinValue: 0.001
MaxValue: 1.0
Default: 0.25
OutputFormat:
Type: String
Description: Output format of the export. One of avro, csv, json, orc, parquet, or xml.
AllowedValues:
- avro
- csv
- json
- orc
- parquet
- xml
Resources:
Trigger:
Type: "AWS::Events::Rule"
Properties:
Description:
Fn::Sub:
- "Start Export of ${Name} every night at midnight"
- Name:
Ref: TableName
Targets:
- Id: StepFunctions
Arn:
Fn::ImportValue: ExportStateMachineArn
RoleArn:
Fn::ImportValue: EventTriggerRole
Input:
Fn::Sub:
- |-
{
"glue_job_name": "${GlueJobName}",
"output_prefix": "${OutputPrefix}",
"table_name": "${TableName}",
"read_percentage": "${ReadPercentage}",
"crawler_name": "${CrawlerName}",
"output_format": "${OutputFormat}"
}
- GlueJobName:
Fn::Sub:
- "${Name}ExportTo${OutputFormat}"
- Name:
Ref: TableName
OutputFormat:
Ref: OutputFormat
OutputPrefix:
Fn::Sub:
- "s3://${Bucket}/${OutputFormat}/${TableName}"
- Bucket:
Fn::ImportValue: DynamoDBExportsBucket
TableName:
Ref: TableName
OutputFormat:
Ref: OutputFormat
OutputFormat:
Ref: OutputFormat
ReadPercentage:
Ref: MaxConsumedReadThroughput
CrawlerName:
Fn::Sub:
- "${Name}${OutputFormat}Crawler"
- Name:
Ref: TableName
OutputFormat:
Ref: OutputFormat
ScheduleExpression: "cron(0 10 * * ? *)"
State: DISABLED
# Export table to S3 in the parquet format
ExportConverterGlueJob:
Type: "AWS::Glue::Job"
Properties:
Name:
Fn::Sub:
- "${Name}ExportTo${OutputFormat}"
- Name:
Ref: TableName
OutputFormat:
Ref: OutputFormat
Role:
Fn::ImportValue: GlueCrawlerAndJobRole
MaxRetries: 3
Description:
Fn::Sub:
- Exports a DynamoDB table to ${OutputFormat}
- OutputFormat:
Ref: OutputFormat
Command:
# DO NOT CHANGE NAME. CloudFormation docs are wrong. Use Glue API docs:
# http://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-jobs-job.html#aws-glue-api-jobs-job-JobCommand
Name: "glueetl"
ScriptLocation: "s3://aws-bigdata-blog/artifacts/using-glue-to-access-dynamodb-tables/export-dynamodb-table.py"
AllocatedCapacity: 10
ExecutionProperty:
MaxConcurrentRuns: 3
DefaultArguments:
"--TempDir":
Fn::Sub:
- "s3://${Bucket}/glue-temp-dir"
- Bucket:
Fn::ImportValue: DynamoDBExportsBucket
GlueCrawler:
Type: AWS::Glue::Crawler
Properties:
Role:
Fn::ImportValue: GlueCrawlerAndJobRole
Name:
Fn::Sub:
- "${Name}${OutputFormat}Crawler"
- Name:
Ref: TableName
OutputFormat:
Ref: OutputFormat
Description:
Fn::Sub:
- "Add new partitions and handle schema updates to the ${Name} table"
- Name:
Ref: TableName
SchemaChangePolicy:
UpdateBehavior: UPDATE_IN_DATABASE
DeleteBehavior: DELETE_FROM_DATABASE
DatabaseName: "dynamodb_exports"
TablePrefix: "snapshots_"
Targets:
S3Targets:
- Path:
Fn::Sub:
- "s3://${Bucket}/${OutputFormat}/${TableName}"
- Bucket:
Fn::ImportValue: DynamoDBExportsBucket
TableName:
Ref: TableName
OutputFormat:
Ref: OutputFormat
デプロイされたリソースの確認
次のステップに進む前に、2. と 3. のステップで作成された各種リソースを確認してみます。
Step Functions ステートマシン
このように作成されています。
AWS CLI での確認。
% aws stepfunctions describe-state-machine --state-machine-arn arn:aws:states:ap-northeast-1:000000000000:stateMachine:DynamoDBExportAndAthenaLoad
{
"stateMachineArn": "arn:aws:states:ap-northeast-1:000000000000:stateMachine:DynamoDBExportAndAthenaLoad",
"name": "DynamoDBExportAndAthenaLoad",
"status": "ACTIVE",
"definition": "{\n 略 \n}",
"roleArn": "arn:aws:iam::000000000000:role/AWSBigDataBlog-GlueDynamoExport-StateExecutionRole-CHS5BEKC9VBC",
"type": "STANDARD",
"creationDate": "2020-09-11T21:02:48.654000+09:00",
"loggingConfiguration": {
"level": "OFF",
"includeExecutionData": false
}
}
定義の内訳はこのようになっています。
折り畳み
{
"StartAt": "StartJobRun",
"States": {
"StartJobRun": {
"Type": "Task",
"ResultPath": "$.glueresult",
"Resource": "arn:aws:states:::glue:startJobRun.sync",
"Parameters": {
"JobName.$": "$.glue_job_name",
"Arguments": {
"--table_name.$": "$.table_name",
"--read_percentage.$": "$.read_percentage",
"--output_prefix.$": "$.output_prefix",
"--output_format.$": "$.output_format"
}
},
"Next": "Start Crawler",
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "Export Failed"
}
]
},
"Export Failed": {
"Type": "Fail",
"Cause": "One or more steps could not complete successfully",
"Error": "ExportFailed"
},
"Start Crawler": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:000000000000:function:AWSBigDataBlog-GlueDynamoExportTa-StartGlueCrawler-LHEP9BGFS2NZ",
"Next": "Wait for Crawler",
"Retry": [
{
"ErrorEquals": ["Lambda.ServiceException"],
"IntervalSeconds": 3,
"MaxAttempts": 3,
"BackoffRate": 1.5
}
]
},
"Wait for Crawler": {
"Type": "Wait",
"Seconds": 60,
"Next": "Get Crawler Status"
},
"Get Crawler Status": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:000000000000:function:AWSBigDataBlog-GlueDynamoExpo-GetGlueCrawlerStatus-1D6OO6MQMQ4YJ",
"Next": "Crawler Finished?"
},
"Crawler Finished?": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.glue_crawler_status",
"StringEquals": "SUCCEEDED",
"Next": "Create Current View"
},
{
"Variable": "$.glue_crawler_status",
"StringEquals": "CANCELLED",
"Next": "Export Failed"
},
{
"Variable": "$.glue_crawler_status",
"StringEquals": "FAILED",
"Next": "Export Failed"
}
],
"Default": "Wait for Crawler"
},
"Create Current View": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:000000000000:function:AWSBigDataBlog-GlueDynamoExportT-CreateCurrentView-K1KBFSLTS6G1",
"Next": "Wait for Current View",
"Retry": [
{
"ErrorEquals": ["Lambda.ServiceException"],
"IntervalSeconds": 3,
"MaxAttempts": 3,
"BackoffRate": 1.5
}
]
},
"Wait for Current View": {
"Type": "Wait",
"Seconds": 5,
"Next": "Get Current View Status"
},
"Get Current View Status": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:000000000000:function:AWSBigDataBlog-GlueDynamoExpo-GetCurrentViewStatus-SOWZAONHY5MJ",
"Next": "Create Current View Finished?",
"Retry": [
{
"ErrorEquals": ["Lambda.ServiceException"],
"IntervalSeconds": 3,
"MaxAttempts": 3,
"BackoffRate": 1.5
}
]
},
"Create Current View Finished?": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.athena_query_status",
"StringEquals": "SUCCEEDED",
"Next": "Done"
},
{
"Variable": "$.athena_query_status",
"StringEquals": "FAILED",
"Next": "Export Failed"
},
{
"Variable": "$.athena_query_status",
"StringEquals": "CANCELLED",
"Next": "Export Failed"
}
],
"Default": "Wait for Current View"
},
"Done": {
"Type": "Pass",
"End": true
}
}
}
ステートマシンでは、各ステップで Lambda 関数が呼び出されています。
それぞれのコードを確認しておきます。
Lambda 関数①(Start Crawler)
import logging
import datetime
import boto3
LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)
KEY_CRAWLER_NAME = 'crawler_name'
GLUE_CLIENT = boto3.client('glue')
def crawler_trigger(event, context):
crawler_name = event[KEY_CRAWLER_NAME]
GLUE_CLIENT.start_crawler(
Name=crawler_name
)
return event
Lambda 関数②(Get Crawler Status)
import json
import logging
import datetime
from dateutil.parser import parse
import boto3
LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)
GLUE_CLIENT = boto3.client('glue')
KEY_GLUE_CRAWLER_NAME = "crawler_name"
KEY_GLUE_CRAWLER_STATUS = "glue_crawler_status"
KEY_STARTED_ON = "StartedOn"
KEY_GLUE_RESULT = "glueresult"
def lambda_handler(event, context):
crawler_name = event[KEY_GLUE_CRAWLER_NAME]
resp = GLUE_CLIENT.get_crawler(Name=crawler_name)
print(resp)
# In the case of an early exit make sure the key is present but the value
# is a no op
event[KEY_GLUE_CRAWLER_STATUS] = ""
last_crawl = resp['Crawler'].get('LastCrawl')
if last_crawl is None:
return event
last_crawl_start = last_crawl.get('StartTime')
# Timestamp is stored as milliseconds since the epoch
glue_job_start = datetime.datetime.fromtimestamp(event[KEY_GLUE_RESULT][KEY_STARTED_ON] / 1000)
# If the last crawl is before the snapshot_timestamp then we're still waiting
# for the current run we've initiated to finish
if last_crawl_start is None or last_crawl_start.timestamp() < glue_job_start.timestamp():
return event
event[KEY_GLUE_CRAWLER_STATUS] = last_crawl['Status']
return event
Lambda 関数③(Create Current View)
import logging
import os
import boto3
LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)
KEY_TABLE_NAME = "table_name"
SNAPHSHOT_TIMESTAMP = "snapshot_timestamp"
KEY_OUTPUT_PREFIX = "output_prefix"
ATHENA_CLIENT = boto3.client('athena')
S3_CLIENT = boto3.client('s3')
def create_current_view_trigger(event, context):
table_name = event[KEY_TABLE_NAME].lower().replace("-", "_")
output_prefix = event[KEY_OUTPUT_PREFIX]
snapshot_timestamp = get_snapshot_timestamp(output_prefix)
query = f"CREATE OR REPLACE VIEW \"dynamodb_exports\".\"{table_name}\" AS SELECT * FROM \"dynamodb_exports\".\"snapshots_{table_name}\" WHERE {SNAPHSHOT_TIMESTAMP} = '{snapshot_timestamp}';"
LOGGER.info(query)
# Athena always has permissions to write to the bucket of the following structure
account_id = get_account_id(context)
region = os.environ['AWS_REGION']
output = f"s3://aws-athena-query-results-{account_id}-{region}"
# It's not uncommon for the start_query_execution response to return a successful response
# and then for the query to fail asynchronously. Check the Athena Query History to double check.
resp = ATHENA_CLIENT.start_query_execution(
QueryString=query,
ResultConfiguration={
'OutputLocation': output
}
)
LOGGER.info(resp)
event['athena_query_execution_id'] = resp['QueryExecutionId']
return event
# AWS Account ID is not available in the environment, but you can get it breaking a part the Lambda
# arn
def get_account_id(context):
return context.invoked_function_arn.split(":")[4]
def get_snapshot_timestamp(output_prefix):
split = output_prefix.split("/")
bucket = split[2]
fmt = split[3]
table = split[4]
# The trailing slash is really important as it allows us to query S3 for the CommonPrefixes
# which can be thought of as the next "folder" after a given prefix.
prefix = f"{fmt}/{table}/"
# Not sure if there can be so many prefixes that there's a continuation token. For now
# assuming that there is not
resp = S3_CLIENT.list_objects_v2(
Bucket=bucket,
Delimiter='/',
Prefix=prefix
)
# Each prefix will look like 'parquet/Reviews/snapshot_timestamp=2019-01-11T06:15/'
prefixes = [x['Prefix'].split('/')[2].split('=')[1] for x in resp['CommonPrefixes']]
# There's no documentation on what order the CommonPrefixes come on so assume none, but we can
# assume they're lexographically sortable give how the snapshot timestamp is stored
prefixes.sort(reverse=True)
# If the state machine has gotten this far there will always be at least one prefix
return prefixes[0]
Lambda 関数④(Get Current View Status)
import logging
import os
import boto3
LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)
ATHENA_CLIENT = boto3.client('athena')
def lambda_handler(event, context):
query_id = event['athena_query_execution_id']
resp = ATHENA_CLIENT.get_query_execution(QueryExecutionId=query_id)
event['athena_query_status'] = resp['QueryExecution']['Status']['State']
return event
Glue クローラ
% aws glue get-crawler --name ReviewscsvCrawler
{
"Crawler": {
"Name": "ReviewscsvCrawler",
"Role": "AWSGlueServiceRoleDefault",
"Targets": {
"S3Targets": [
{
"Path": "s3://dynamodb-exports-000000000000-ap-northeast-1/csv/Reviews",
"Exclusions": []
}
],
"JdbcTargets": [],
"DynamoDBTargets": [],
"CatalogTargets": []
},
"DatabaseName": "dynamodb_exports",
"Description": "Add new partitions and handle schema updates to the Reviews table",
"Classifiers": [],
"SchemaChangePolicy": {
"UpdateBehavior": "UPDATE_IN_DATABASE",
"DeleteBehavior": "DELETE_FROM_DATABASE"
},
"State": "READY",
"TablePrefix": "snapshots_",
"CrawlElapsedTime": 0,
"CreationTime": "2020-09-11T21:15:27+09:00",
"LastUpdated": "2020-09-11T21:15:27+09:00",
"Version": 1
}
}
Glue ジョブ
外部の S3 にあるスクリプトを呼び出しています。
% aws glue get-job --job-name ReviewsExportTocsv
{
"Job": {
"Name": "ReviewsExportTocsv",
"Description": "Exports a DynamoDB table to csv",
"Role": "AWSGlueServiceRoleDefault",
"CreatedOn": "2020-09-11T21:15:26.598000+09:00",
"LastModifiedOn": "2020-09-11T21:15:26.598000+09:00",
"ExecutionProperty": {
"MaxConcurrentRuns": 3
},
"Command": {
"Name": "glueetl",
"ScriptLocation": "s3://aws-bigdata-blog/artifacts/using-glue-to-access-dynamodb-tables/export-dynamodb-table.py",
"PythonVersion": "2"
},
"DefaultArguments": {
"--TempDir": "s3://dynamodb-exports-000000000000-ap-northeast-1/glue-temp-dir"
},
"MaxRetries": 3,
"AllocatedCapacity": 10,
"Timeout": 2880,
"MaxCapacity": 10.0
}
}
(追記ここから)
ここでは、以下のバージョン・設定となっています。
- PythonVersion:2
- GlueVersion:指定なし(0.9)
- WorkerType:指定なし(Standard)
- NumberOfWorkers:指定なし(代わりに MaxCapacity=10)
2020年11月時点としては、以下の構成に置き換えることをおすすめします。特に、GlueVersion 2.0 に変更することを強く推奨します。
- PythonVersion:3
- GlueVersion:2.0
- WorkerType:G.1X もしくは G.2X
- NumberOfWorkers:10
(追記ここまで)
このスクリプトの内訳は、冒頭のブログに記載のある以下が該当します。
import sys
import datetime
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
ARG_TABLE_NAME = "table_name"
ARG_READ_PERCENT = "read_percentage"
ARG_OUTPUT = "output_prefix"
ARG_FORMAT = "output_format"
PARTITION = "snapshot_timestamp"
args = getResolvedOptions(sys.argv,
[
'JOB_NAME',
ARG_TABLE_NAME,
ARG_READ_PERCENT,
ARG_OUTPUT,
ARG_FORMAT
]
)
table_name = args[ARG_TABLE_NAME]
read = args[ARG_READ_PERCENT]
output_prefix = args[ARG_OUTPUT]
fmt = args[ARG_FORMAT]
print("Table name:", table_name)
print("Read percentage:", read)
print("Output prefix:", output_prefix)
print("Format:", fmt)
date_str = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M')
output = "%s/%s=%s" % (output_prefix, PARTITION, date_str)
sc = SparkContext()
glueContext = GlueContext(sc)
table = glueContext.create_dynamic_frame.from_options(
"dynamodb",
connection_options={
"dynamodb.input.tableName": table_name,
"dynamodb.throughput.read.percent": read
}
)
glueContext.write_dynamic_frame.from_options(
frame=table,
connection_type="s3",
connection_options={
"path": output
},
format=fmt,
transformation_ctx="datasink"
)
(追記ここから)
ここではdynamodb.splits
というパラメータが有効化されていないため、読み取りが並列化されていません。今回の構成では Glue クラスターのリソースとして DPU = 10 が割り当てられていますが、そのほとんどが活用されないということになります。パラメータを明示的に指定することをお勧めします。
(追記ここまで)
Events ルール
今回は Amazon EventBridge の画面から確認しましたが、CloudWatch の画面からも確認できます。
CloudFormation からデプロイした際には定期実行が無効に設定されています。この仕組みを用いて定期実行させたい場合には、有効化する必要があります。
また、後続の手順で手動実行を行います。その際には、ステートマシンに引き渡すインプットをここからコピーしたものを用います。
AWS CLI での確認結果です。
% aws events describe-rule --name AWSBigDataBlog-GlueDynamoExportTableExport-Trigger-1CX0PNI843MPY
{
"Name": "AWSBigDataBlog-GlueDynamoExportTableExport-Trigger-1CX0PNI843MPY",
"Arn": "arn:aws:events:ap-northeast-1:000000000000:rule/AWSBigDataBlog-GlueDynamoExportTableExport-Trigger-1CX0PNI843MPY",
"ScheduleExpression": "cron(0 10 * * ? *)",
"State": "DISABLED",
"Description": "Start Export of Reviews every night at midnight",
"EventBusName": "default"
}
4. DynamoDB から S3 へのエクスポートの実行
リソースの作成が完了したため、実際に実行してみます。
ステートマシンの画面より、[ 実行の開始 ] を押下します。
先ほどコピーしたインプットを貼り付け、実行します。
実行の状況は以下のようにグラフィカルに確認できます。正常に終了するのを待ちます。
5. エクスポート後の内容の確認
ステートマシンの実行が正常に終了すると、 Glue テーブルが 2つ作成されています。
一つ目のテーブル。
AWS CLI でも見てみます。
% aws glue get-table --database-name dynamodb_exports --name reviews
{
"Table": {
"Name": "reviews",
"DatabaseName": "dynamodb_exports",
"CreateTime": "2020-09-11T22:44:08+09:00",
"UpdateTime": "2020-09-11T22:44:08+09:00",
"Retention": 0,
"StorageDescriptor": {
"Columns": [
{
"Name": "rating",
"Type": "bigint"
},
{
"Name": "review",
"Type": "string"
},
{
"Name": "book",
"Type": "string"
},
{
"Name": "author",
"Type": "string"
},
{
"Name": "user",
"Type": "string"
},
{
"Name": "snapshot_timestamp",
"Type": "string"
}
],
"Location": "",
"Compressed": false,
"NumberOfBuckets": 0,
"SerdeInfo": {},
"SortColumns": [],
"StoredAsSubDirectories": false
},
"PartitionKeys": [],
"ViewOriginalText": "/* Presto View: eyJvcmlnaW5hbFNxbCI6IlNFTEVDVCAqXG5GUk9NXG4gIGR5bmFtb2RiX2V4cG9ydHMuc25hcHNob3RzX3Jldmlld3NcbldIRVJFIChcInNuYXBzaG90X3RpbWVzdGFtcFwiID0gJzIwMjAtMDktMTFUMTM6NDEnKVxuIiwiY2F0YWxvZyI6ImF3c2RhdGFjYXRhbG9nIiwic2NoZW1hIjoiZGVmYXVsdCIsImNvbHVtbnMiOlt7Im5hbWUiOiJyYXRpbmciLCJ0eXBlIjoiYmlnaW50In0seyJuYW1lIjoicmV2aWV3IiwidHlwZSI6InZhcmNoYXIifSx7Im5hbWUiOiJib29rIiwidHlwZSI6InZhcmNoYXIifSx7Im5hbWUiOiJhdXRob3IiLCJ0eXBlIjoidmFyY2hhciJ9LHsibmFtZSI6InVzZXIiLCJ0eXBlIjoidmFyY2hhciJ9LHsibmFtZSI6InNuYXBzaG90X3RpbWVzdGFtcCIsInR5cGUiOiJ2YXJjaGFyIn1dfQ== */",
"ViewExpandedText": "/* Presto View */",
"TableType": "VIRTUAL_VIEW",
"Parameters": {
"comment": "Presto View",
"presto_view": "true"
},
"CreatedBy": "arn:aws:sts::000000000000:assumed-role/AWSBigDataBlog-GlueDynamoExportTableCom-LambdaRole-YDE0JG71FHAD/AWSBigDataBlog-GlueDynamoExportT-CreateCurrentView-K1KBFSLTS6G1",
"IsRegisteredWithLakeFormation": false,
"CatalogId": "000000000000"
}
}
2つ目のテーブル。
% aws glue get-table --database-name dynamodb_exports --name snapshots_reviews
{
"Table": {
"Name": "snapshots_reviews",
"DatabaseName": "dynamodb_exports",
"Owner": "owner",
"CreateTime": "2020-09-11T22:42:51+09:00",
"UpdateTime": "2020-09-11T22:42:51+09:00",
"LastAccessTime": "2020-09-11T22:42:51+09:00",
"Retention": 0,
"StorageDescriptor": {
"Columns": [
{
"Name": "rating",
"Type": "bigint"
},
{
"Name": "review",
"Type": "string"
},
{
"Name": "book",
"Type": "string"
},
{
"Name": "author",
"Type": "string"
},
{
"Name": "user",
"Type": "string"
}
],
"Location": "s3://dynamodb-exports-000000000000-ap-northeast-1/csv/Reviews/",
"InputFormat": "org.apache.hadoop.mapred.TextInputFormat",
"OutputFormat": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
"Compressed": false,
"NumberOfBuckets": -1,
"SerdeInfo": {
"SerializationLibrary": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
"Parameters": {
"field.delim": ","
}
},
"BucketColumns": [],
"SortColumns": [],
"Parameters": {
"CrawlerSchemaDeserializerVersion": "1.0",
"CrawlerSchemaSerializerVersion": "1.0",
"UPDATED_BY_CRAWLER": "ReviewscsvCrawler",
"areColumnsQuoted": "false",
"averageRecordSize": "73",
"classification": "csv",
"columnsOrdered": "true",
"compressionType": "none",
"delimiter": ",",
"objectCount": "20",
"recordCount": "4",
"sizeKey": "297",
"skip.header.line.count": "1",
"typeOfData": "file"
},
"StoredAsSubDirectories": false
},
"PartitionKeys": [
{
"Name": "snapshot_timestamp",
"Type": "string"
}
],
"TableType": "EXTERNAL_TABLE",
"Parameters": {
"CrawlerSchemaDeserializerVersion": "1.0",
"CrawlerSchemaSerializerVersion": "1.0",
"UPDATED_BY_CRAWLER": "ReviewscsvCrawler",
"areColumnsQuoted": "false",
"averageRecordSize": "73",
"classification": "csv",
"columnsOrdered": "true",
"compressionType": "none",
"delimiter": ",",
"objectCount": "20",
"recordCount": "4",
"sizeKey": "297",
"skip.header.line.count": "1",
"typeOfData": "file"
},
"CreatedBy": "arn:aws:sts::000000000000:assumed-role/AWSGlueServiceRoleDefault/AWS-Crawler",
"IsRegisteredWithLakeFormation": false,
"CatalogId": "000000000000"
}
}
これらのテーブルに対して、 Athena の画面からクエリを実行できます。
出力先の S3 バケットにはこのようにオブジェクトが生成されています。
% aws s3 ls dynamodb-exports-000000000000-ap-northeast-1 --recursive
2020-09-11 22:41:25 0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00000
2020-09-11 22:41:25 149 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00001
2020-09-11 22:41:25 148 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00002
2020-09-11 22:41:25 0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00003
2020-09-11 22:41:26 0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00004
2020-09-11 22:41:26 0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00005
2020-09-11 22:41:26 0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00006
2020-09-11 22:41:26 0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00007
2020-09-11 22:41:26 0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00008
2020-09-11 22:41:26 0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00009
2020-09-11 22:41:26 0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00010
2020-09-11 22:41:26 0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00011
2020-09-11 22:41:26 0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00012
2020-09-11 22:41:26 0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00013
2020-09-11 22:41:26 0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00014
2020-09-11 22:41:26 0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00015
2020-09-11 22:41:26 0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00016
2020-09-11 22:41:26 0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00017
2020-09-11 22:41:26 0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00018
2020-09-11 22:41:26 0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00019
ファイルサイズが 0 でないものを 2つ確認すると、それぞれ以下のような中身になっています。DynamoDB テーブルに登録したアイテムが、 csv 形式でエクスポートされています。
Rating,Review,Book,Author,User
5,"A thrilling journey through the world of Hogwarts","Harry Potter and the Philosopher's Stone",J.K.Rowling,Tristan
Rating,Review,Book,Author,User
4,"Harry is pretty brave, but Hermione is the clear hero","Harry Potter and the Sorcerer's Stone",J.K.Rowling,Adeline
これで DynamoDB から S3 へのエクスポートを確認できました。Event ルールの定期実行を有効化すれば、自動で継続的なエクスポートが実現できます。
終わりに
AWS Glue と Step Functions を利用した、DynamoDB テーブルの S3 バケットへのエクスポートを確認しました。
AWS Glue のことがよく分かっていなくても、用意されたテンプレートに従って実行するだけで仕組みを実装することができました。必要に応じてカスタマイズして使用すれば、様々な要件に対応できそうですね。
以上、千葉(幸)がお送りしました。