DynamoDB上のアイテムをコードで管理するPythonスクリプトを作ってみた

2020.05.23

こんにちは、CX事業本部の若槻です。

DynamoDBにシステムの定義に必要なアイテム(APIの認可情報など)を登録する場合は、そのアイテムをコード化してGitHubなどでバージョン管理したい場合があります。

しかし、AWS CloudFormationではDynamoDBのテーブル自体をリソースとして管理することはできますが、テーブル上のアイテムについては管理することができません。そこで別途アイテムをコードで管理する方法としては以下が考えられます。

今回は、後者の方法をとり、DynamoDB上のアイテムをコードで管理するPythonスクリプトを作ってみました。

スクリプト

ディレクトリ構成は以下のようになります。initialize.pydeploy.pyの2つのスクリプトを使い、src_dataディレクトリ配下のJSONファイルでテーブルごとのアイテムを管理します。

.
├── deploy.py
├── initialize.py
└── src_data
    └── <table_name>.json

スクリプトはGitHubにもアップロードしてあります。

ここではそれぞれのスクリプトについて説明をしていきます。

initialize.py:ソースデータを初期化するスクリプト

スクリプトinitialize.pyの利用は必須ではありません。アイテムを管理したいDynamoDBテーブル上に既にアイテムが登録されている場合に、initialize.pyを実行すればそのテーブル上のアイテムをスキャンして<table_name>.jsonファイルを作成(初期化)することができます。

initialize.py

import os
import json
import sys
from typing import List
from decimal import Decimal

import boto3
from boto3.resources.base import ServiceResource
from boto3.dynamodb.types import Binary


def main(table_name: str) -> None:
    table_items = scan_table(table_name, dynamodb_resource)
    put_to_src_data(table_name, table_items)


def scan_table(
    table_name: str,
    dynamodb_resource: ServiceResource
) -> List[dict]:
    table = dynamodb_resource.Table(table_name)
    resp = table.scan()
    table_items = resp['Items']
    while 'LastEvaluatedKey' in resp:
        resp = table.scan(
            ExclusiveStartKey=resp['LastEvaluatedKey']
        )
        table_items.extend(resp['Items'])
    return table_items


def default(obj) -> object:
    if isinstance(obj, Decimal):
        if int(obj) == obj:
            return int(obj)
        else:
            return float(obj)
    elif isinstance(obj, Binary):
        return obj.value
    elif isinstance(obj, bytes):
        return obj.decode()
    elif isinstance(obj, set):
        return list(obj)
    try:
        return str(obj)
    except Exception:
        return None


def put_to_src_data(
    table_name: str,
    table_items: List[dict]
) -> None:
    f = open(f'src_data/{table_name}.json', 'w')
    json.dump(table_items, f, default=default, ensure_ascii=False, indent=4)


if __name__ == '__main__':
    dynamodb_resource = boto3.resource('dynamodb')
    _, table_name = sys.argv

    main(table_name)

スクリプトは第1引数にテーブル名を指定して実行します。

$ python initialize.py <テーブル名>

動作確認

テーブル名sample-table、パーティションキーtype、ソートキーnameのDynamoDBテーブルに以下のようにアイテムが既に登録されているとします。 image.png

sample-tableテーブルに対してスクリプトを実行します。

$ python initialize.py sample-table

src_dataディレクトリ配下にsample-table.jsonファイルが作成されました。テーブル上のアイテムが配列としてファイルに書き込まれています。

$ cat ./src_data/sample-table.json
[
    {
        "9th_rank": 3,
        "name": "ichinose",
        "type": "cute"
    },
    {
        "9th_rank": 1,
        "name": "hojo",
        "type": "cool"
    },
    {
        "9th_rank": 2,
        "name": "sagisawa",
        "type": "cool"
    }
]

この<table_name>.jsonファイルが、テーブル上のアイテムをコードで管理するためのソースデータとなります。

deploy.py:ソースデータをもとにデプロイを行うスクリプト

ソースデータである<table_name>.jsonファイルを編集してdeploy.pyスクリプトを実行することにより、テーブルにアイテムをデプロイ(追加・更新・削除)することができます。

deploy.py

import json
import sys
from typing import List

import boto3
from boto3.resources.base import ServiceResource
from botocore.client import BaseClient

KEY_SPLITTER = ':'


def main(
    table_name: str,
    dynamodb_resource: ServiceResource,
    dynamodb_client: BaseClient
) -> None:
    partition_key, sort_key = get_table_keys(table_name, dynamodb_client)
    src_items = get_src_items(table_name)
    put_to_table(table_name, src_items, dynamodb_resource)
    table_items = scan_table(table_name, dynamodb_resource)
    diff_keys = get_diff_keys(src_items, table_items, partition_key, sort_key)
    delete_diff_items(
        table_name,
        partition_key,
        sort_key,
        diff_keys,
        dynamodb_resource
    )


def get_table_keys(table_name: str, dynamodb_client: BaseClient) -> (str, str):
    resp = dynamodb_client.describe_table(TableName=table_name)
    sort_key = ''
    for key_schema in resp['Table']['KeySchema']:
        if key_schema['KeyType'] == 'HASH':
            partition_key = key_schema['AttributeName']
        elif key_schema['KeyType'] == 'RANGE':
            sort_key = key_schema['AttributeName']
    return partition_key, sort_key


def get_src_items(
    table_name: str
) -> List[dict]:
    return json.load(
        open(f'./src_data/{table_name}.json')
    )


def put_to_table(
    table_name: str,
    src_items: List[dict],
    dynamodb_resource: ServiceResource
) -> None:
    table = dynamodb_resource.Table(table_name)
    with table.batch_writer() as batch:
        for item in src_items:
            batch.put_item(
                Item=item
            )


def scan_table(
    table_name: str,
    dynamodb_resource: ServiceResource
) -> List[dict]:
    table = dynamodb_resource.Table(table_name)
    resp = table.scan(Limit=10)
    table_items = resp['Items']
    while 'LastEvaluatedKey' in resp:
        resp = table.scan(
            ExclusiveStartKey=resp['LastEvaluatedKey'],
            Limit=10
        )
        table_items += resp['Items']
    return table_items


def get_diff_keys(
    src_items: List[dict],
    table_items: List[dict],
    partition_key: str,
    sort_key: str
) -> List[str]:
    src_keys = []
    for src_item in src_items:
        src_keys.append(
            src_item[partition_key] + KEY_SPLITTER + (src_item[sort_key] if sort_key != '' else '')
        )
    table_keys = []
    for table_item in table_items:
        table_keys.append(
            table_item[partition_key] + KEY_SPLITTER + (table_item[sort_key] if sort_key != '' else '')
        )
    return list(set(table_keys) - set(src_keys))


def delete_diff_items(
    table_name: str,
    partition_key: str,
    sort_key: str,
    diff_keys: List[str],
    dynamodb_resource: ServiceResource
) -> None:
    table = dynamodb_resource.Table(table_name)
    for diff_key in diff_keys:
        key = {partition_key: diff_key.split(KEY_SPLITTER)[0]}
        if sort_key != '':
            sort_key_d = {sort_key: diff_key.split(KEY_SPLITTER)[1]}
            key.update(**sort_key_d)
        table.delete_item(
            Key=key
        )


if __name__ == '__main__':
    dynamodb_resource = boto3.resource('dynamodb')
    dynamodb_client = boto3.client('dynamodb')
    _, table_name = sys.argv

    main(table_name, dynamodb_resource, dynamodb_client)

スクリプトは第1引数にテーブル名を指定して実行します。

$ python deploy.py <テーブル名>

動作確認

  • アイテムの追加

先ほど作成したsrc_data/sample-table.jsonを以下のように更新します。アイテムを1つ追加しています。

[
    {
        "9th_rank": 3,
        "name": "ichinose",
        "type": "cute"
    },
    {
        "9th_rank": 1,
        "name": "hojo",
        "type": "cool"
    },
    {
        "9th_rank": 2,
        "name": "sagisawa",
        "type": "cool"
    },
    {
        "9th_rank": 4,
        "name": "kamiya",
        "type": "cool"
    }
]

sample-tableテーブルに対してスクリプトを実行します。

$ python deploy.py sample-table

コンソール画面を見ると、ソースデータに追加したアイテムがテーブルに追加できています。 image.png

  • アイテムの編集

src_data/sample-table.jsonを以下のように更新します。各アイテムに8th_rank属性を追加しています。

[
    {
        "9th_rank": 3,
        "name": "ichinose",
        "type": "cute",
        "8th_rank": 6
    },
    {
        "9th_rank": 1,
        "name": "hojo",
        "type": "cool",
        "8th_rank": 2
    },
    {
        "9th_rank": 2,
        "name": "sagisawa",
        "type": "cool",
        "8th_rank": 7
    },
    {
        "9th_rank": 4,
        "name": "kamiya",
        "type": "cool",
        "8th_rank": 16
    }
]

sample-tableテーブルに対してスクリプトを実行します。

$ python deploy.py sample-table

コンソール画面を見ると、ソースデータでのアイテムの更新がテーブル上でも反映されています。 image.png

  • アイテムの削除

src_data/sample-table.jsonを以下のように更新します。アイテムを1つ削除しています。

[
    {
        "9th_rank": 1,
        "name": "hojo",
        "type": "cool",
        "8th_rank": 2
    },
    {
        "9th_rank": 2,
        "name": "sagisawa",
        "type": "cool",
        "8th_rank": 7
    },
    {
        "9th_rank": 4,
        "name": "kamiya",
        "type": "cool",
        "8th_rank": 16
    }
]

sample-tableテーブルに対してスクリプトを実行します。

$ python deploy.py sample-table

コンソール画面を見ると、ソースデータで~~失踪~~削除したアイテムがテーブル上でも削除できています。 image.png

※ここまで、ソートキーがあるテーブルで動作確認をしましたが、ソートキーなしのテーブルでも動作することは確認済みです。

おわりに

以上、DynamoDB上のアイテムをコードで管理するPythonスクリプトのご紹介でした。あとはsrc_data配下のJSONファイルをGitHubで管理するなり、スクリプトをCI/CDサービス上で動かしてデプロイ自動化するなりすれば捗るかと思います。

参考