PythonでDynamoDBから取得したデータをcsvにしてS3にアップロードする

boto3を使用してDynamoDBテーブルの作成とデータ投入を行います。そしてscanで取得したデータからcsvファイルを作成しS3にアップロードします。
2020.10.02

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

データアナリティクス事業本部の下地です。pythonでDynamoDBのデータを取得、csv形式で保存しS3にアップロードしてみたので内容をまとめたいと思います。

環境

環境は以下のようになっております。

  • mac: 10.15.7
  • python: 3.7.0
  • boto3: 1.12.33

全体像

手順としては以下のようになっています。

  1. DynamoDBテーブルを作成
  2. データを入力
  3. scanを実行しデータを取得
  4. 取得データをcsvとして保存
  5. S3へアップロード

実装

実装は、jupyter notebookを使用して進めます。

まず必要な関数のimportと変数の定義を行います。

test.ipynb

import csv
from boto3.session import Session
session = Session(profile_name= プロファイル名)

table_name = DynamoDBテーブル名
bucket_name = バケット名
local_path = "test1.csv" # ローカルのcsvの保存先
s3_path = "test-folder/test1.csv" # S3のアップロード先

1. DynamoDBテーブルを作成

DynamoDBクライアントのcreate_tableメソッドを使用してテーブルを作成します。

test.ipynb

dynamo = session.client("dynamodb")

response = dynamo.create_table(
    AttributeDefinitions=[
        {'AttributeName': 'Username','AttributeType': 'S'},
        {'AttributeName': 'Timestamp','AttributeType': 'S'},],
    TableName=table_name,
    KeySchema=[
        {'AttributeName': 'Username','KeyType': 'HASH'},
        {'AttributeName': 'Timestamp','KeyType': 'RANGE'},],
    ProvisionedThroughput={'ReadCapacityUnits': 2,'WriteCapacityUnits': 2},
     StreamSpecification={'StreamEnabled': True,'StreamViewType': 'NEW_AND_OLD_IMAGES'}
    )

2. データを入力

次に、put_itemメソッドを使用して作成したテーブルに3件のデータを投入します。

test.ipynb

put_data1 = dynamo.put_item(TableName=table_name,
        Item={'Timestamp': {'S': '2020-09-30'},'Username': {'S': 'taro'}})
put_data2 = dynamo.put_item(TableName=table_name,
        Item={'Timestamp': {'S': '2020-09-30'},'Username': {'S': 'jiro'}})
put_data3 = dynamo.put_item(TableName=table_name,
        Item={'Timestamp': {'S': '2020-09-30'},'Username': {'S': 'hanako'}})

3. scanを実行しデータを取得

データ投入後、scanメソッドを使用してテーブルデータを取得します。

test.ipynb

get_data = dynamo.scan(TableName=table_name)
print(get_data['Items'])

先ほど入力した3件のデータが入っていることがわかります。

[{'Timestamp': {'S': '2020-09-30'}, 'Username': {'S': 'hanako'}},
 {'Timestamp': {'S': '2020-09-30'}, 'Username': {'S': 'jiro'}},
 {'Timestamp': {'S': '2020-09-30'}, 'Username': {'S': 'taro'}}]

4. 取得データをcsvとして保存

では取得したデータを使用してcsvファイルを作成します。

test.ipynb

fieldnames = ['Username', 'Timestamp'] # header情報

# 取得データよりcsvファイルを作成
with open(local_path, 'w') as f:
    writer = csv.DictWriter(f, fieldnames=fieldnames)
    writer.writeheader()
    for data in get_data['Items']:
        timestamp = data['Timestamp']['S']
        username = data['Username']['S']
        writer.writerow({"Username": username, "Timestamp":timestamp})

データの確認を行います。

test.ipynb

with open (local_path) as f:
    reader = csv.reader(f)
    for row in reader:
        print(row)

ヘッダーと3件の情報が入っています。

['Timestamp', 'Username']
['2020-09-30', 'hanako']
['2020-09-30', 'jiro']
['2020-09-30', 'taro']

5. S3へアップロード

最後に、作成したcsvフィアルをS3へアップロードします。

test.ipynb

s3 = session.resource('s3') #S3オブジェクトを取得

bucket = s3.Bucket(bucket_name)
bucket.upload_file(local_path, s3_path) # アップロード

cliコマンドで確認します。

$ aws s3 ls s3://バケット名/test-folder/ --profile プロファイル名
2020-09-30 16:55:04          0 
2020-09-30 17:02:58         73 test.csv

S3にアップロードされていることが確認できました!

まとめ

DynamoDBテーブルの作成からS3へアップロードまでをお届けしました。DynamoDBから取得したデータをcsvにするだけを試すつもりでしたがboto3を使用したテーブル作成など少し脱線しましたが良い勉強になりました。この記事がどなたかの助けになれば幸いです。

参考リンク