Lambda+SQSを使ってDynamoDBの特定のデータを別アカウントのDynamoDBへコピー(移行)させてみる
どうも!大阪オフィスの西村祐二です。
開発環境で使っていたデータを本番環境など別環境でも使いたい場面はよくあると思います。
私もあるAWSアカウント上にあるDynamoDBの特定のデータを別のアカウントのDynamoDBへコピーしたい場面に出くわしましたので、Lambda+SQSを使ってDynamoDBの負荷をあげずにコピーする仕組みを作ってみました。
※今回紹介する方法以外にも色んな方法があります。こんな方法もあるんだな程度で考えてもらえれば幸いです。
構成図
この構成の良さそうな点
個人的に思うこの構成の良さそうな点をあげてみます。
- アカウントを跨ぐときの設定がSQSで許可するだけで簡単
- ある程度速度調整ができる(今回は負荷をあげないようにゆっくりコピーするようにしているが速くもできる)
- コスト削減できるかも?
ただ、以下を考慮しないといけないです。
- 複数回DynamoDBに書き込まれても大丈夫なように考慮する
- Lambdaが起動しまくることを考慮する
では、さっそくやっていきましょう。
主な作業
- コピー先のDynamoDBテーブル作成
- SQSの設定
- SQSをトリガーとしたLambdaの作成
- コピー元のDynamoDBからデータを取得し、コピー先アカウントのSQSへputするスクリプト作成
コピー元アカウントのDynamoDBテーブル(アカウントA)
テストデータとして、ユーザ情報をもつテーブルがあるとします。
その中の特定のデータだけ別のアカウントへコピーしたいというシナリオです。
プライマリキーは「userid」としています。
コピー先のDynamoDBテーブル(アカウントB)
コピー元と同じテーブルを作成しておきます。
プライマリキーは「userid」として作成しています。
SQSの設定(アカウントB)
別のAWSアカウント(アカウントA)からのアクセスを受け付けるSQSキューを作成します。
▼「test」というキューを作成します。他の値はデフォルトのままです。
▼「アクセス許可の追加」をクリックし、別アカウントからキューにアクセスできるようにします。
▼12桁のAWSアカウントを入力し、許可するアクションを設定します。今回はすべてアクションを許可しています。
▼プリンシパルの値がrootで設定されているので、適切なIAMユーザに変更しておくのがいいかと思います。
Lambdaの作成(アカウントB)
SQSをトリガーとしたコピー先のDynamoDBテーブルにputするLambdaを作成します。
まず、LambdaのポリシーにDynamoDBとSQSを実行する権限を付与しておいてください。
ちなみに、LambdaからSQSを実行するためのマネージドポリシーがあります。「AWSLambdaSQSQueueExecutionRole」
Lambdaのタイムアウト時間はトリガーに設定したSQSのキューの「デフォルトの可視性タイムアウト」の値以上に設定する必要があります。今回は30秒に設定しているので、Lambdaのタイムアウト時間を30秒に設定します。
- DynamoDBへの書き込み速度を調整します。
- 同時実行数を「1」にしておく
- そうしないと複数のLambdaが並列に実行されて、DynamoDBの負荷があがる
- また、同時実行の制限に引っかかり、他のLambdaに影響与える可能性がある
- トリガーとなるSQSのバッチサイズを「1」にしておく
- Lambdaが複数のデータを取得しないように制限
- DynamoDBの負荷をあげないように1アイテムずつputさせるため
DynamoDBへputするプログラムをpythonで作成します。
import boto3 import os import ast # dynamo DYNAMO = boto3.resource('dynamodb') DYNAMO_CLIENT = DYNAMO.meta.client DYNAMO_TABLE_NAME = os.getenv('TABLE_NAME', 'test') TABLE = DYNAMO.Table(DYNAMO_TABLE_NAME) def put_dynamo(data): """コピー先のDynamoDBテーブルへputする.""" try: response = TABLE.put_item( Item={ 'userid': data['userid'], 'firstname': data['firstname'], 'lastname': data['lastname'], 'createdAt': data['createdAt'], 'updatedAt': data['updatedAt'] } ) return response except Exception as error: raise error def conv_dict(str): """Str -> Dict.""" dict = ast.literal_eval(str) return dict def lambda_handler(event, context): """main.""" try: for i in range(len(event['Records'])): print(event['Records'][i-1]['body']) getdata = event['Records'][i-1]['body'] put_dynamo(conv_dict(getdata)) except Exception as error: raise error
コピー元のDynamoDBからデータを取得し、SQSに送付するスクリプト作成
今回は定期的にコピーを行う予定がなかったので、ローカルでの実行を想定しています。
ディレクトリ構成は下記のようになっています。
別アカウントにコピーしたいデータのキーを行ごとにテキストファイル(get_key_list.txt)に書き出しておきます。
├── data_put_sqs.py └── list └── get_key_list.txt
プログラムはpythonで作成します。
import boto3 import os import json import decimal from time import sleep from boto3.dynamodb.conditions import Key, Attr from botocore.exceptions import ClientError # dynamo DYNAMO = boto3.resource('dynamodb') DYNAMO_CLIENT = DYNAMO.meta.client DYNAMO_TABLE_NAME = os.getenv('TABLE_NAME', 'test') TABLE = DYNAMO.Table(DYNAMO_TABLE_NAME) # SQS SQS = boto3.resource('sqs') QUEUE = SQS.Queue('url') URL = 'https://sqs.ap-northeast-1.amazonaws.com/<aws_account>/<sqs_name>' class DecimalEncoder(json.JSONEncoder): """dynamodb get_item return number type for encoder.""" def default(self, obj): """Return encode data.""" if isinstance(obj, decimal.Decimal): if obj % 1 > 0: return float(obj) else: return int(obj) return super(DecimalEncoder, self).default(obj) def get_item(key): """DynamoDBからitemを取得する.""" try: response = TABLE.get_item( Key={ 'userid': str(key) } ) encode = json.dumps( response['Item'], cls=DecimalEncoder, ensure_ascii=False) return encode except ClientError as e: print(e.response['Error']['Message']) raise e def put_sqs(item): """sqsのキューにメッセージをputする.""" resp = QUEUE.send_message( MessageBody=item, QueueUrl=URL, DelaySeconds=0 ) def key_list(filename): """コピー対象のuseridをテキストから取得.""" path = os.getcwd() + f'/list/{filename}' with open(path) as f: # 一行ごとをリストに入れてreturn return f.read().split() def main(): """main.""" try: filelist = ['get_key_list.txt'] for f in filelist: for i in key_list(f): result = get_item(i) put_sqs(result) # 1秒スリープ処理 sleep(1) except Exception as e: raise e if __name__ == "__main__": main()
- 13行目
- コピー元のDynamoDBテーブル名を指定します。
-
19行目
- コピー先のSQSキュー名を指定します。
コピー元の負荷もなるべくあげたくなかったので、1アイテム取得し、SQSにputした後、1秒のスリープ処理を入れています。
動作確認
下記で実行します。
$ python data_put_sqs.py
- アカウントAのコピー元のDynamoDBの負荷状況をみてみます。
今回3000アイテムほどコピーしましたが、コピー元のDyanamoDBの読み込みキャパシティは低い値で一定した負荷でコピーすることができました。
(へこんでいる部分はテスト実行したところの間になります。)
- アカウントBのコピー先のDynamoDBの負荷状況をみてみます。
こちらも同様にコピー先のDyanamoDBの書き込みキャパシティは一定の低い負荷でコピーすることができました。
(へこんでいる部分はテスト実行したところの間になります。)
さいごに
いかがだったでしょうか。
Lambda+SQSを使ってDynamoDBの特定のデータを負荷をあげずに別アカウントのDynamoDBへコピー(移行)する仕組みを作ってみました。
誰かの参考になれば幸いです。