はじめに
Amazon S3バケットにアップロードしたCSVデータをAmazon DynamoDBテーブルに自動的に書き込むAWS Lambdaの作成方法を紹介します。
CSVデータをAmazon S3バケットにアップロード時、AWS LambdaをトリガーしAmazon DynamoDBテーブルにデータを書き込む方法について紹介します。
CSVデータをAmazon S3バケットにアップロード時、AWS LambdaをトリガーしDynamoDBにデータを書き込んでみた
構成は以下の通りです。
本構成では、AWSマネジメントコンソールからS3バケットにCSVファイルをアップロードすると、S3のPUTイベントをトリガーとしてLambda関数が起動します。Lambda関数は、CSVファイルの内容を読み取り、DynamoDBテーブルに書き込む処理を行います。
この構成は様々なシナリオで活用できます。例えば、お客様からのお問い合わせに対して、SMS送信で自動的に回答するシステムを構築する場合が挙げられます。
お問い合わせ内容ごとに適切な回答文をあらかじめDynamoDBで管理しておき、お問い合わせを受信したらその内容に応じて、対応する回答文をDynamoDBから取得してSMSで送信することで、迅速かつ的確な対応が可能になります。
回答文の追加や修正も、CSVファイルを更新してS3にアップロードするだけで簡単に反映できるのがメリットです。これにより、運用コストを削減しつつ、柔軟なデータ管理が実現できます。
前提条件
- アップロード用S3バケットを作成済み
- バケット名:cm-hirai-s3-csv-uploads-to-dynamodb
- アップロードするCSVデータは以下の通り
- 先頭行は各列のタイトルになりますが、アルファベットで指定ください。指定したタイトル名がDynamoDBテーブルに書き込まれる属性名となります。
inquiry_type,response_text,status
製品,製品ページをご覧ください。,ok
注文,注文確認ページからご確認をお願いいたします。,ok
配送,メールに記載の追跡番号からご確認いただけます。,ng
DynamoDBテーブル作成
以下の設定でテーブルを作成します。
- パーティションキー:inquiry_type(文字列)
- キャパシティ設定:オンデマンド
- 他はデフォルト
Lambda関数の作成
以下の設定です
- ランタイム:Python 3.12
- 適用するIAMポリシー
- AmazonDynamoDBFullAccess
- AmazonS3ReadOnlyAccess
- タイムアウト:10秒
import boto3
import csv
import json
def get_s3_file_content(bucket_name, object_key):
s3 = boto3.client('s3')
csv_file = s3.get_object(Bucket=bucket_name, Key=object_key)
return csv_file['Body'].read().decode('utf-8')
def parse_csv_data(csv_content):
csv_data = csv.DictReader(csv_content.splitlines())
return list(csv_data)
def delete_all_items_from_dynamodb(table):
scan = table.scan()
with table.batch_writer() as batch:
for each in scan['Items']:
batch.delete_item(Key={'inquiry_type': each['inquiry_type']}) # プライマリキーの属性名に合わせて修正
def write_csv_data_to_dynamodb(table, csv_data):
with table.batch_writer() as batch:
for item in csv_data:
batch.put_item(Item=item)
def lambda_handler(event, context):
print('event:' + json.dumps(event, ensure_ascii=False))
bucket_name = event['Records'][0]['s3']['bucket']['name']
object_key = event['Records'][0]['s3']['object']['key']
csv_content = get_s3_file_content(bucket_name, object_key)
csv_data = parse_csv_data(csv_content)
dynamodb = boto3.resource('dynamodb')
table_name = 'cm-hirai-s3_csv_uploads_to_dynamodb'
table = dynamodb.Table(table_name)
delete_all_items_from_dynamodb(table)
write_csv_data_to_dynamodb(table, csv_data)
テーブル名とプライマリキー名は、実際の環境に合わせて適切な値に変更してください。
実装ポイント
CSVの読み込み
Lambda関数内では、boto3
を使ってS3からCSVファイルを取得し、csv
モジュールを使ってパースします。DictReader
を使うことで、CSVの1行目をキーとして扱い、各行をdictとして取得できます。
DynamoDBへの書き込み
batch_writer
を使用することで、複数の書き込みや削除操作をバッチ処理として一括(最大25件)で実行できます。これにより、大量のデータを効率的に処理できます。
トリガーの設定
Lambda関数にS3イベントをトリガーとして設定します。
トリガーの設定は以下の通りです。
- バケット:s3/cm-hirai-s3-csv-uploads-to-dynamodb
- イベントタイプ:PUT
- プレフィックス:cm-hirai-s3-csv-uploads-to-dynamodb.csv
テスト
再掲ですが、アップロードするCSVデータは以下です。
inquiry_type,response_text,status
製品,製品ページをご覧ください。,ok
注文,注文確認ページからご確認をお願いいたします。,ok
配送,メールに記載の追跡番号からご確認いただけます。,ng
S3バケットに「s3-csv-uploads-to-dynamodb.csv」をアップロードすると、DynamoDBへ書き込みが確認できました。
アップロードする度に、テーブル内の項目は全削除しますので、項目数が減る場合も反映されます。
ファイルを以下に変更して、アップロードします。
inquiry_type,response_text,status
製品,製品ページをご覧ください。,ok
支払い,注文確認メールをご確認ください。,ok
反映されていることが確認できました。
最後に
本記事では、Amazon S3にアップロードされたCSVファイルのデータを、AWS Lambdaを使ってAmazon DynamoDBテーブルに自動的に書き込む方法について解説しました。
この構成を活用することで、CSVデータの取り込みを効率化し、DynamoDBの利点を活かしたデータ管理が可能になります。
参考
https://docs.python.org/ja/3/library/csv.html
https://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/developerguide/programming-with-python.html