この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
データアナリティクス事業本部の下地です。pythonでDynamoDBのデータを取得、csv形式で保存しS3にアップロードしてみたので内容をまとめたいと思います。
環境
環境は以下のようになっております。
- mac: 10.15.7
- python: 3.7.0
- boto3: 1.12.33
全体像
手順としては以下のようになっています。
- DynamoDBテーブルを作成
- データを入力
- scanを実行しデータを取得
- 取得データをcsvとして保存
- S3へアップロード
実装
実装は、jupyter notebook
を使用して進めます。
まず必要な関数のimportと変数の定義を行います。
test.ipynb
import csv
from boto3.session import Session
session = Session(profile_name= プロファイル名)
table_name = DynamoDBテーブル名
bucket_name = バケット名
local_path = "test1.csv" # ローカルのcsvの保存先
s3_path = "test-folder/test1.csv" # S3のアップロード先
1. DynamoDBテーブルを作成
DynamoDBクライアントのcreate_table
メソッドを使用してテーブルを作成します。
test.ipynb
dynamo = session.client("dynamodb")
response = dynamo.create_table(
AttributeDefinitions=[
{'AttributeName': 'Username','AttributeType': 'S'},
{'AttributeName': 'Timestamp','AttributeType': 'S'},],
TableName=table_name,
KeySchema=[
{'AttributeName': 'Username','KeyType': 'HASH'},
{'AttributeName': 'Timestamp','KeyType': 'RANGE'},],
ProvisionedThroughput={'ReadCapacityUnits': 2,'WriteCapacityUnits': 2},
StreamSpecification={'StreamEnabled': True,'StreamViewType': 'NEW_AND_OLD_IMAGES'}
)
2. データを入力
次に、put_item
メソッドを使用して作成したテーブルに3件のデータを投入します。
test.ipynb
put_data1 = dynamo.put_item(TableName=table_name,
Item={'Timestamp': {'S': '2020-09-30'},'Username': {'S': 'taro'}})
put_data2 = dynamo.put_item(TableName=table_name,
Item={'Timestamp': {'S': '2020-09-30'},'Username': {'S': 'jiro'}})
put_data3 = dynamo.put_item(TableName=table_name,
Item={'Timestamp': {'S': '2020-09-30'},'Username': {'S': 'hanako'}})
3. scanを実行しデータを取得
データ投入後、scan
メソッドを使用してテーブルデータを取得します。
test.ipynb
get_data = dynamo.scan(TableName=table_name)
print(get_data['Items'])
先ほど入力した3件のデータが入っていることがわかります。
[{'Timestamp': {'S': '2020-09-30'}, 'Username': {'S': 'hanako'}},
{'Timestamp': {'S': '2020-09-30'}, 'Username': {'S': 'jiro'}},
{'Timestamp': {'S': '2020-09-30'}, 'Username': {'S': 'taro'}}]
4. 取得データをcsvとして保存
では取得したデータを使用してcsvファイルを作成します。
test.ipynb
fieldnames = ['Username', 'Timestamp'] # header情報
# 取得データよりcsvファイルを作成
with open(local_path, 'w') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for data in get_data['Items']:
timestamp = data['Timestamp']['S']
username = data['Username']['S']
writer.writerow({"Username": username, "Timestamp":timestamp})
データの確認を行います。
test.ipynb
with open (local_path) as f:
reader = csv.reader(f)
for row in reader:
print(row)
ヘッダーと3件の情報が入っています。
['Timestamp', 'Username']
['2020-09-30', 'hanako']
['2020-09-30', 'jiro']
['2020-09-30', 'taro']
5. S3へアップロード
最後に、作成したcsvフィアルをS3へアップロードします。
test.ipynb
s3 = session.resource('s3') #S3オブジェクトを取得
bucket = s3.Bucket(bucket_name)
bucket.upload_file(local_path, s3_path) # アップロード
cliコマンドで確認します。
$ aws s3 ls s3://バケット名/test-folder/ --profile プロファイル名
2020-09-30 16:55:04 0
2020-09-30 17:02:58 73 test.csv
S3にアップロードされていることが確認できました!
まとめ
DynamoDBテーブルの作成からS3へアップロードまでをお届けしました。DynamoDBから取得したデータをcsvにするだけを試すつもりでしたがboto3を使用したテーブル作成など少し脱線しましたが良い勉強になりました。この記事がどなたかの助けになれば幸いです。