Lambda+SQSを使ってDynamoDBの特定のデータを別アカウントのDynamoDBへコピー(移行)させてみる

Lambda+SQSを使ってDynamoDBの特定のデータを別アカウントのDynamoDBへコピー(移行)させてみる

Clock Icon2018.09.10

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

どうも!大阪オフィスの西村祐二です。

開発環境で使っていたデータを本番環境など別環境でも使いたい場面はよくあると思います。

私もあるAWSアカウント上にあるDynamoDBの特定のデータを別のアカウントのDynamoDBへコピーしたい場面に出くわしましたので、Lambda+SQSを使ってDynamoDBの負荷をあげずにコピーする仕組みを作ってみました。

※今回紹介する方法以外にも色んな方法があります。こんな方法もあるんだな程度で考えてもらえれば幸いです。

構成図

この構成の良さそうな点

個人的に思うこの構成の良さそうな点をあげてみます。

  • アカウントを跨ぐときの設定がSQSで許可するだけで簡単
  • ある程度速度調整ができる(今回は負荷をあげないようにゆっくりコピーするようにしているが速くもできる)
  • コスト削減できるかも?

ただ、以下を考慮しないといけないです。

  • 複数回DynamoDBに書き込まれても大丈夫なように考慮する
  • Lambdaが起動しまくることを考慮する

では、さっそくやっていきましょう。

主な作業

  • コピー先のDynamoDBテーブル作成
  • SQSの設定
  • SQSをトリガーとしたLambdaの作成
  • コピー元のDynamoDBからデータを取得し、コピー先アカウントのSQSへputするスクリプト作成

コピー元アカウントのDynamoDBテーブル(アカウントA)

テストデータとして、ユーザ情報をもつテーブルがあるとします。

その中の特定のデータだけ別のアカウントへコピーしたいというシナリオです。

プライマリキーは「userid」としています。

コピー先のDynamoDBテーブル(アカウントB)

コピー元と同じテーブルを作成しておきます。

プライマリキーは「userid」として作成しています。

SQSの設定(アカウントB)

別のAWSアカウント(アカウントA)からのアクセスを受け付けるSQSキューを作成します。

▼「test」というキューを作成します。他の値はデフォルトのままです。

▼「アクセス許可の追加」をクリックし、別アカウントからキューにアクセスできるようにします。

▼12桁のAWSアカウントを入力し、許可するアクションを設定します。今回はすべてアクションを許可しています。

▼プリンシパルの値がrootで設定されているので、適切なIAMユーザに変更しておくのがいいかと思います。

Lambdaの作成(アカウントB)

SQSをトリガーとしたコピー先のDynamoDBテーブルにputするLambdaを作成します。

まず、LambdaのポリシーにDynamoDBとSQSを実行する権限を付与しておいてください。

ちなみに、LambdaからSQSを実行するためのマネージドポリシーがあります。「AWSLambdaSQSQueueExecutionRole」

Lambdaのタイムアウト時間はトリガーに設定したSQSのキューの「デフォルトの可視性タイムアウト」の値以上に設定する必要があります。今回は30秒に設定しているので、Lambdaのタイムアウト時間を30秒に設定します。

  • DynamoDBへの書き込み速度を調整します。
  • 同時実行数を「1」にしておく
  • そうしないと複数のLambdaが並列に実行されて、DynamoDBの負荷があがる
  • また、同時実行の制限に引っかかり、他のLambdaに影響与える可能性がある
  • トリガーとなるSQSのバッチサイズを「1」にしておく
  • Lambdaが複数のデータを取得しないように制限
  • DynamoDBの負荷をあげないように1アイテムずつputさせるため

DynamoDBへputするプログラムをpythonで作成します。

import boto3
import os
import ast

# dynamo
DYNAMO = boto3.resource('dynamodb')
DYNAMO_CLIENT = DYNAMO.meta.client
DYNAMO_TABLE_NAME = os.getenv('TABLE_NAME', 'test')
TABLE = DYNAMO.Table(DYNAMO_TABLE_NAME)


def put_dynamo(data):
    """コピー先のDynamoDBテーブルへputする."""
    try:
        response = TABLE.put_item(
            Item={
                'userid': data['userid'],
                'firstname': data['firstname'],
                'lastname': data['lastname'],
                'createdAt': data['createdAt'],
                'updatedAt': data['updatedAt']
            }
        )
        return response
    except Exception as error:
        raise error


def conv_dict(str):
    """Str -> Dict."""
    dict = ast.literal_eval(str)
    return dict


def lambda_handler(event, context):
    """main."""
    try:
        for i in range(len(event['Records'])):
            print(event['Records'][i-1]['body'])
            getdata = event['Records'][i-1]['body']
            put_dynamo(conv_dict(getdata))
    except Exception as error:
        raise error

コピー元のDynamoDBからデータを取得し、SQSに送付するスクリプト作成

今回は定期的にコピーを行う予定がなかったので、ローカルでの実行を想定しています。

ディレクトリ構成は下記のようになっています。

別アカウントにコピーしたいデータのキーを行ごとにテキストファイル(get_key_list.txt)に書き出しておきます。

├── data_put_sqs.py
└── list
    └── get_key_list.txt

プログラムはpythonで作成します。

import boto3
import os
import json
import decimal
from time import sleep

from boto3.dynamodb.conditions import Key, Attr
from botocore.exceptions import ClientError

# dynamo
DYNAMO = boto3.resource('dynamodb')
DYNAMO_CLIENT = DYNAMO.meta.client
DYNAMO_TABLE_NAME = os.getenv('TABLE_NAME', 'test')
TABLE = DYNAMO.Table(DYNAMO_TABLE_NAME)

# SQS
SQS = boto3.resource('sqs')
QUEUE = SQS.Queue('url')
URL = 'https://sqs.ap-northeast-1.amazonaws.com/<aws_account>/<sqs_name>'


class DecimalEncoder(json.JSONEncoder):
    """dynamodb get_item return number type for encoder."""

    def default(self, obj):
        """Return encode data."""
        if isinstance(obj, decimal.Decimal):
            if obj % 1 > 0:
                return float(obj)
            else:
                return int(obj)
        return super(DecimalEncoder, self).default(obj)


def get_item(key):
    """DynamoDBからitemを取得する."""
    try:
        response = TABLE.get_item(
            Key={
                'userid': str(key)
            }
        )
        encode = json.dumps(
            response['Item'], cls=DecimalEncoder, ensure_ascii=False)
        return encode
    except ClientError as e:
        print(e.response['Error']['Message'])
        raise e


def put_sqs(item):
    """sqsのキューにメッセージをputする."""
    resp = QUEUE.send_message(
        MessageBody=item,
        QueueUrl=URL,
        DelaySeconds=0
    )


def key_list(filename):
    """コピー対象のuseridをテキストから取得."""
    path = os.getcwd() + f'/list/{filename}'
    with open(path) as f:
        # 一行ごとをリストに入れてreturn
        return f.read().split()


def main():
    """main."""
    try:
        filelist = ['get_key_list.txt']
        for f in filelist:
            for i in key_list(f):
                result = get_item(i)
                put_sqs(result)
                # 1秒スリープ処理
                sleep(1)
    except Exception as e:
        raise e

if __name__ == "__main__":
    main()

  • 13行目
  • コピー元のDynamoDBテーブル名を指定します。

  • 19行目

  • コピー先のSQSキュー名を指定します。

コピー元の負荷もなるべくあげたくなかったので、1アイテム取得し、SQSにputした後、1秒のスリープ処理を入れています。

動作確認

下記で実行します。

$ python data_put_sqs.py
  • アカウントAのコピー元のDynamoDBの負荷状況をみてみます。

今回3000アイテムほどコピーしましたが、コピー元のDyanamoDBの読み込みキャパシティは低い値で一定した負荷でコピーすることができました。

(へこんでいる部分はテスト実行したところの間になります。)

  • アカウントBのコピー先のDynamoDBの負荷状況をみてみます。

こちらも同様にコピー先のDyanamoDBの書き込みキャパシティは一定の低い負荷でコピーすることができました。

(へこんでいる部分はテスト実行したところの間になります。)

さいごに

いかがだったでしょうか。

Lambda+SQSを使ってDynamoDBの特定のデータを負荷をあげずに別アカウントのDynamoDBへコピー(移行)する仕組みを作ってみました。

誰かの参考になれば幸いです。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.