この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
どうも!大阪オフィスの西村祐二です。
開発環境で使っていたデータを本番環境など別環境でも使いたい場面はよくあると思います。
私もあるAWSアカウント上にあるDynamoDBの特定のデータを別のアカウントのDynamoDBへコピーしたい場面に出くわしましたので、Lambda+SQSを使ってDynamoDBの負荷をあげずにコピーする仕組みを作ってみました。
※今回紹介する方法以外にも色んな方法があります。こんな方法もあるんだな程度で考えてもらえれば幸いです。
構成図
この構成の良さそうな点
個人的に思うこの構成の良さそうな点をあげてみます。
- アカウントを跨ぐときの設定がSQSで許可するだけで簡単
- ある程度速度調整ができる(今回は負荷をあげないようにゆっくりコピーするようにしているが速くもできる)
- コスト削減できるかも?
ただ、以下を考慮しないといけないです。
- 複数回DynamoDBに書き込まれても大丈夫なように考慮する
- Lambdaが起動しまくることを考慮する
では、さっそくやっていきましょう。
主な作業
- コピー先のDynamoDBテーブル作成
- SQSの設定
- SQSをトリガーとしたLambdaの作成
- コピー元のDynamoDBからデータを取得し、コピー先アカウントのSQSへputするスクリプト作成
コピー元アカウントのDynamoDBテーブル(アカウントA)
テストデータとして、ユーザ情報をもつテーブルがあるとします。
その中の特定のデータだけ別のアカウントへコピーしたいというシナリオです。
プライマリキーは「userid」としています。
コピー先のDynamoDBテーブル(アカウントB)
コピー元と同じテーブルを作成しておきます。
プライマリキーは「userid」として作成しています。
SQSの設定(アカウントB)
別のAWSアカウント(アカウントA)からのアクセスを受け付けるSQSキューを作成します。
▼「test」というキューを作成します。他の値はデフォルトのままです。
▼「アクセス許可の追加」をクリックし、別アカウントからキューにアクセスできるようにします。
▼12桁のAWSアカウントを入力し、許可するアクションを設定します。今回はすべてアクションを許可しています。
▼プリンシパルの値がrootで設定されているので、適切なIAMユーザに変更しておくのがいいかと思います。
Lambdaの作成(アカウントB)
SQSをトリガーとしたコピー先のDynamoDBテーブルにputするLambdaを作成します。
まず、LambdaのポリシーにDynamoDBとSQSを実行する権限を付与しておいてください。
ちなみに、LambdaからSQSを実行するためのマネージドポリシーがあります。「AWSLambdaSQSQueueExecutionRole」
Lambdaのタイムアウト時間はトリガーに設定したSQSのキューの「デフォルトの可視性タイムアウト」の値以上に設定する必要があります。今回は30秒に設定しているので、Lambdaのタイムアウト時間を30秒に設定します。
- DynamoDBへの書き込み速度を調整します。
- 同時実行数を「1」にしておく
- そうしないと複数のLambdaが並列に実行されて、DynamoDBの負荷があがる
- また、同時実行の制限に引っかかり、他のLambdaに影響与える可能性がある
- トリガーとなるSQSのバッチサイズを「1」にしておく
- Lambdaが複数のデータを取得しないように制限
- DynamoDBの負荷をあげないように1アイテムずつputさせるため
DynamoDBへputするプログラムをpythonで作成します。
import boto3
import os
import ast
# dynamo
DYNAMO = boto3.resource('dynamodb')
DYNAMO_CLIENT = DYNAMO.meta.client
DYNAMO_TABLE_NAME = os.getenv('TABLE_NAME', 'test')
TABLE = DYNAMO.Table(DYNAMO_TABLE_NAME)
def put_dynamo(data):
"""コピー先のDynamoDBテーブルへputする."""
try:
response = TABLE.put_item(
Item={
'userid': data['userid'],
'firstname': data['firstname'],
'lastname': data['lastname'],
'createdAt': data['createdAt'],
'updatedAt': data['updatedAt']
}
)
return response
except Exception as error:
raise error
def conv_dict(str):
"""Str -> Dict."""
dict = ast.literal_eval(str)
return dict
def lambda_handler(event, context):
"""main."""
try:
for i in range(len(event['Records'])):
print(event['Records'][i-1]['body'])
getdata = event['Records'][i-1]['body']
put_dynamo(conv_dict(getdata))
except Exception as error:
raise error
コピー元のDynamoDBからデータを取得し、SQSに送付するスクリプト作成
今回は定期的にコピーを行う予定がなかったので、ローカルでの実行を想定しています。
ディレクトリ構成は下記のようになっています。
別アカウントにコピーしたいデータのキーを行ごとにテキストファイル(get_key_list.txt)に書き出しておきます。
├── data_put_sqs.py
└── list
└── get_key_list.txt
プログラムはpythonで作成します。
data_put_sqs.py
import boto3
import os
import json
import decimal
from time import sleep
from boto3.dynamodb.conditions import Key, Attr
from botocore.exceptions import ClientError
# dynamo
DYNAMO = boto3.resource('dynamodb')
DYNAMO_CLIENT = DYNAMO.meta.client
DYNAMO_TABLE_NAME = os.getenv('TABLE_NAME', 'test')
TABLE = DYNAMO.Table(DYNAMO_TABLE_NAME)
# SQS
SQS = boto3.resource('sqs')
QUEUE = SQS.Queue('url')
URL = 'https://sqs.ap-northeast-1.amazonaws.com/<aws_account>/<sqs_name>'
class DecimalEncoder(json.JSONEncoder):
"""dynamodb get_item return number type for encoder."""
def default(self, obj):
"""Return encode data."""
if isinstance(obj, decimal.Decimal):
if obj % 1 > 0:
return float(obj)
else:
return int(obj)
return super(DecimalEncoder, self).default(obj)
def get_item(key):
"""DynamoDBからitemを取得する."""
try:
response = TABLE.get_item(
Key={
'userid': str(key)
}
)
encode = json.dumps(
response['Item'], cls=DecimalEncoder, ensure_ascii=False)
return encode
except ClientError as e:
print(e.response['Error']['Message'])
raise e
def put_sqs(item):
"""sqsのキューにメッセージをputする."""
resp = QUEUE.send_message(
MessageBody=item,
QueueUrl=URL,
DelaySeconds=0
)
def key_list(filename):
"""コピー対象のuseridをテキストから取得."""
path = os.getcwd() + f'/list/{filename}'
with open(path) as f:
# 一行ごとをリストに入れてreturn
return f.read().split()
def main():
"""main."""
try:
filelist = ['get_key_list.txt']
for f in filelist:
for i in key_list(f):
result = get_item(i)
put_sqs(result)
# 1秒スリープ処理
sleep(1)
except Exception as e:
raise e
if __name__ == "__main__":
main()
- 13行目
- コピー元のDynamoDBテーブル名を指定します。
-
19行目
- コピー先のSQSキュー名を指定します。
コピー元の負荷もなるべくあげたくなかったので、1アイテム取得し、SQSにputした後、1秒のスリープ処理を入れています。
動作確認
下記で実行します。
$ python data_put_sqs.py
- アカウントAのコピー元のDynamoDBの負荷状況をみてみます。
今回3000アイテムほどコピーしましたが、コピー元のDyanamoDBの読み込みキャパシティは低い値で一定した負荷でコピーすることができました。
(へこんでいる部分はテスト実行したところの間になります。)
- アカウントBのコピー先のDynamoDBの負荷状況をみてみます。
こちらも同様にコピー先のDyanamoDBの書き込みキャパシティは一定の低い負荷でコピーすることができました。
(へこんでいる部分はテスト実行したところの間になります。)
さいごに
いかがだったでしょうか。
Lambda+SQSを使ってDynamoDBの特定のデータを負荷をあげずに別アカウントのDynamoDBへコピー(移行)する仕組みを作ってみました。
誰かの参考になれば幸いです。