PythonでDynamoDBから取得したデータをcsvにしてS3にアップロードする
データアナリティクス事業本部の下地です。pythonでDynamoDBのデータを取得、csv形式で保存しS3にアップロードしてみたので内容をまとめたいと思います。
環境
環境は以下のようになっております。
- mac: 10.15.7
- python: 3.7.0
- boto3: 1.12.33
全体像
手順としては以下のようになっています。
- DynamoDBテーブルを作成
- データを入力
- scanを実行しデータを取得
- 取得データをcsvとして保存
- S3へアップロード
実装
実装は、jupyter notebook
を使用して進めます。
まず必要な関数のimportと変数の定義を行います。
import csv from boto3.session import Session session = Session(profile_name= プロファイル名) table_name = DynamoDBテーブル名 bucket_name = バケット名 local_path = "test1.csv" # ローカルのcsvの保存先 s3_path = "test-folder/test1.csv" # S3のアップロード先
1. DynamoDBテーブルを作成
DynamoDBクライアントのcreate_table
メソッドを使用してテーブルを作成します。
dynamo = session.client("dynamodb") response = dynamo.create_table( AttributeDefinitions=[ {'AttributeName': 'Username','AttributeType': 'S'}, {'AttributeName': 'Timestamp','AttributeType': 'S'},], TableName=table_name, KeySchema=[ {'AttributeName': 'Username','KeyType': 'HASH'}, {'AttributeName': 'Timestamp','KeyType': 'RANGE'},], ProvisionedThroughput={'ReadCapacityUnits': 2,'WriteCapacityUnits': 2}, StreamSpecification={'StreamEnabled': True,'StreamViewType': 'NEW_AND_OLD_IMAGES'} )
2. データを入力
次に、put_item
メソッドを使用して作成したテーブルに3件のデータを投入します。
put_data1 = dynamo.put_item(TableName=table_name, Item={'Timestamp': {'S': '2020-09-30'},'Username': {'S': 'taro'}}) put_data2 = dynamo.put_item(TableName=table_name, Item={'Timestamp': {'S': '2020-09-30'},'Username': {'S': 'jiro'}}) put_data3 = dynamo.put_item(TableName=table_name, Item={'Timestamp': {'S': '2020-09-30'},'Username': {'S': 'hanako'}})
3. scanを実行しデータを取得
データ投入後、scan
メソッドを使用してテーブルデータを取得します。
get_data = dynamo.scan(TableName=table_name) print(get_data['Items'])
先ほど入力した3件のデータが入っていることがわかります。
[{'Timestamp': {'S': '2020-09-30'}, 'Username': {'S': 'hanako'}}, {'Timestamp': {'S': '2020-09-30'}, 'Username': {'S': 'jiro'}}, {'Timestamp': {'S': '2020-09-30'}, 'Username': {'S': 'taro'}}]
4. 取得データをcsvとして保存
では取得したデータを使用してcsvファイルを作成します。
fieldnames = ['Username', 'Timestamp'] # header情報 # 取得データよりcsvファイルを作成 with open(local_path, 'w') as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() for data in get_data['Items']: timestamp = data['Timestamp']['S'] username = data['Username']['S'] writer.writerow({"Username": username, "Timestamp":timestamp})
データの確認を行います。
with open (local_path) as f: reader = csv.reader(f) for row in reader: print(row)
ヘッダーと3件の情報が入っています。
['Timestamp', 'Username'] ['2020-09-30', 'hanako'] ['2020-09-30', 'jiro'] ['2020-09-30', 'taro']
5. S3へアップロード
最後に、作成したcsvフィアルをS3へアップロードします。
s3 = session.resource('s3') #S3オブジェクトを取得 bucket = s3.Bucket(bucket_name) bucket.upload_file(local_path, s3_path) # アップロード
cliコマンドで確認します。
$ aws s3 ls s3://バケット名/test-folder/ --profile プロファイル名 2020-09-30 16:55:04 0 2020-09-30 17:02:58 73 test.csv
S3にアップロードされていることが確認できました!
まとめ
DynamoDBテーブルの作成からS3へアップロードまでをお届けしました。DynamoDBから取得したデータをcsvにするだけを試すつもりでしたがboto3を使用したテーブル作成など少し脱線しましたが良い勉強になりました。この記事がどなたかの助けになれば幸いです。