Lambda(python3.6)を使って大量のデータをDynamoDBに追加するときはbatch_writerが便利

どうも!大阪オフィスの西村祐二です。

Lambdaを使ってファイルに記載された大量のデータをDynamoDBに追加する機会がありました。
実行時間の制限もあるため、効率よく追加できないかなと思っていろいろ探していたら
batch_writer()を使えば
プロセスのスピードアップとサービスに対する書き込み要求数の削減が可能とのこと
だったので試してみました。

http://boto3.readthedocs.io/en/latest/guide/dynamodb.html#batch-writing

細かなところではありますが備忘録も兼ねて、ブログとして残しておこうと思います。

便利だと思ったところ

一度にputできる数の制限は気にしなくていい

一度にputできる数は最大25と制限があります。

http://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/developerguide/Limits.html

以前、DynamoDBを勉強したときにこの制限のことは知っていたので、
今回「プログラムどうかこうかな~」と思っていましたが
batch_writer()を使えば、特に気にせず裏でよしなにやってくれていました。

Request重複時に上書きしてくれる

BatchWriteItemの実行時、 同じアイテムへのRequestが重複すると

botocore.exceptions.ClientError: An error occurred (ValidationException)

のようなエラーがでるのですが、
こちらもbatch_writer(overwrite_by_pkeys=['partition_key', 'sort_key'])
のように、記載しておけば重複時に上書きしてくれます。

また、BatchWriteItemについて下記ブログがとても参考になります。

BatchWriteItemを利用してAmazon DynamoDBのテーブルを空にする

試してみる

設定

■Lambda
・ランタイム:python3.6
・IAM:DynamoDBへのアクセス、BatchWriteItemの許可

■DynamoDB
・テーブル名:test-batch
今回わかりやすく、ブログのアクセス数管理を想定したテーブルにしてみました。
プライマリーキーを「user_name」、ソートキーを「blog」、アクセス数として「count」としています。

シナリオ:50アイテム追加してみる

制限25個ですが
user1がblog1~blog50の50本書いたとして、そのアクセス数を追加してみます。

batch_writer()を使った処理の部分は17行目から25行目です。

"""This is sample program."""
import logging
import boto3

LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)
DYNAMO = boto3.resource('dynamodb')

def lambda_handler(event, context):
    """
    DynamoDB put item
    """
    try:
        table_name = "test-batch"

        table = DYNAMO.Table(table_name)
        with table.batch_writer() as batch:
            for i in range(50):
                batch.put_item(
                    Item={
                        'user_name': 'user1',
                        'blog': 'blog' + str(i),
                        'count': i,
                    }
                )
        LOGGER.info("Completed registration")
        return "end"
    except Exception as error:
        LOGGER.error(error)
        raise error

問題なく処理は完了しテーブルにデータが追加されています。

シナリオ:同じアイテムに重複requestしてみる

BatchWriteItem内でrequestが重複するように下記の処理を行ってみます。
・put_item:user1, blog1, 111
・put_item:user1, blog1, 222
・delete_item:user1, blog2
・put_item:user1, blog2, 333

■失敗させてみる
比較するために、わざと失敗させてみたいと思います。

"""This is sample program."""
import logging
import boto3

LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)
DYNAMO = boto3.resource('dynamodb')


def lambda_handler(event, context):
    """
    DynamoDB put item
    """
    try:
        table_name = "test-batch"

        # put items (use Batch Writing)
        table = DYNAMO.Table(table_name)
        with table.batch_writer() as batch:
            batch.put_item(
                Item={
                    'user_name': 'user1',
                    'blog': 'blog1',
                    'count': '111',
                }
            )
            batch.put_item(
                Item={
                    'user_name': 'user1',
                    'blog': 'blog1',
                    'count': '222',
                }
            )
            batch.delete_item(
                Key={
                    'user_name': 'user1',
                    'blog': 'blog2'
                }
            )
            batch.put_item(
                Item={
                    'user_name': 'user1',
                    'blog': 'blog2',
                    'count': '333',
                }
            )
        LOGGER.info("Completed registration")
        return "end"
    except Exception as error:
        LOGGER.error(error)
        raise error

想定どおり、下記のようなにエラーとなりました。

■成功させてみる
19行目を
batch_writer(overwrite_by_pkeys=['user_name', 'blog'])
のように修正して、再実行してみます。

"""This is sample program."""
import logging
import boto3


LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)
DYNAMO = boto3.resource('dynamodb')

def lambda_handler(event, context):
    """
    DynamoDB put item
    """
    try:
        table_name = "test-batch"

        # put items (use Batch Writing)
        table = DYNAMO.Table(table_name)
        with table.batch_writer(overwrite_by_pkeys=['user_name', 'blog']) as batch:
            batch.put_item(
                Item={
                    'user_name': 'user1',
                    'blog': 'blog1',
                    'count': '111',
                }
            )
            batch.put_item(
                Item={
                    'user_name': 'user1',
                    'blog': 'blog1',
                    'count': '222',
                }
            )
            batch.delete_item(
                Key={
                    'user_name': 'user1',
                    'blog': 'blog2'
                }
            )
            batch.put_item(
                Item={
                    'user_name': 'user1',
                    'blog': 'blog2',
                    'count': '333',
                }
            )
        LOGGER.info("Completed registration")
        return "end"
    except Exception as error:
        LOGGER.error(error)
        raise error

想定通りに、BatchWriteItem内で重複したところは上書きされる形で追加されています。

さいごに

いかがだったでしょうか。

batch_writer()を使って、大量のデータをテーブルに追加する処理や
重複request時の上書きする処理を試してみました。

誰かの参考になれば幸いです。