CSVデータをS3にアップロード時AWS Lambdaをトリガーし、自動的にDynamoDBへデータを書き込んでみた
はじめに
Amazon S3バケットにアップロードしたCSVデータをAmazon DynamoDBテーブルに自動的に書き込むAWS Lambdaの作成方法を紹介します。
CSVデータをAmazon S3バケットにアップロード時、AWS LambdaをトリガーしAmazon DynamoDBテーブルにデータを書き込む方法について紹介します。
CSVデータをAmazon S3バケットにアップロード時、AWS LambdaをトリガーしDynamoDBにデータを書き込んでみた
構成は以下の通りです。
本構成では、AWSマネジメントコンソールからS3バケットにCSVファイルをアップロードすると、S3のPUTイベントをトリガーとしてLambda関数が起動します。Lambda関数は、CSVファイルの内容を読み取り、DynamoDBテーブルに書き込む処理を行います。
この構成は様々なシナリオで活用できます。例えば、お客様からのお問い合わせに対して、SMS送信で自動的に回答するシステムを構築する場合が挙げられます。
お問い合わせ内容ごとに適切な回答文をあらかじめDynamoDBで管理しておき、お問い合わせを受信したらその内容に応じて、対応する回答文をDynamoDBから取得してSMSで送信することで、迅速かつ的確な対応が可能になります。
回答文の追加や修正も、CSVファイルを更新してS3にアップロードするだけで簡単に反映できるのがメリットです。これにより、運用コストを削減しつつ、柔軟なデータ管理が実現できます。
前提条件
- アップロード用S3バケットを作成済み
- バケット名:cm-hirai-s3-csv-uploads-to-dynamodb
- アップロードするCSVデータは以下の通り
- 先頭行は各列のタイトルになりますが、アルファベットで指定ください。指定したタイトル名がDynamoDBテーブルに書き込まれる属性名となります。
inquiry_type,response_text,status 製品,製品ページをご覧ください。,ok 注文,注文確認ページからご確認をお願いいたします。,ok 配送,メールに記載の追跡番号からご確認いただけます。,ng
DynamoDBテーブル作成
以下の設定でテーブルを作成します。
- パーティションキー:inquiry_type(文字列)
- キャパシティ設定:オンデマンド
- 他はデフォルト
Lambda関数の作成
以下の設定です
- ランタイム:Python 3.12
- 適用するIAMポリシー
- AmazonDynamoDBFullAccess
- AmazonS3ReadOnlyAccess
- タイムアウト:10秒
import boto3 import csv import json def get_s3_file_content(bucket_name, object_key): s3 = boto3.client('s3') csv_file = s3.get_object(Bucket=bucket_name, Key=object_key) return csv_file['Body'].read().decode('utf-8') def parse_csv_data(csv_content): csv_data = csv.DictReader(csv_content.splitlines()) return list(csv_data) def delete_all_items_from_dynamodb(table): scan = table.scan() with table.batch_writer() as batch: for each in scan['Items']: batch.delete_item(Key={'inquiry_type': each['inquiry_type']}) # プライマリキーの属性名に合わせて修正 def write_csv_data_to_dynamodb(table, csv_data): with table.batch_writer() as batch: for item in csv_data: batch.put_item(Item=item) def lambda_handler(event, context): print('event:' + json.dumps(event, ensure_ascii=False)) bucket_name = event['Records'][0]['s3']['bucket']['name'] object_key = event['Records'][0]['s3']['object']['key'] csv_content = get_s3_file_content(bucket_name, object_key) csv_data = parse_csv_data(csv_content) dynamodb = boto3.resource('dynamodb') table_name = 'cm-hirai-s3_csv_uploads_to_dynamodb' table = dynamodb.Table(table_name) delete_all_items_from_dynamodb(table) write_csv_data_to_dynamodb(table, csv_data)
テーブル名とプライマリキー名は、実際の環境に合わせて適切な値に変更してください。
実装ポイント
CSVの読み込み
Lambda関数内では、boto3
を使ってS3からCSVファイルを取得し、csv
モジュールを使ってパースします。DictReader
を使うことで、CSVの1行目をキーとして扱い、各行をdictとして取得できます。
DynamoDBへの書き込み
batch_writer
を使用することで、複数の書き込みや削除操作をバッチ処理として一括(最大25件)で実行できます。これにより、大量のデータを効率的に処理できます。
トリガーの設定
Lambda関数にS3イベントをトリガーとして設定します。
トリガーの設定は以下の通りです。
- バケット:s3/cm-hirai-s3-csv-uploads-to-dynamodb
- イベントタイプ:PUT
- プレフィックス:cm-hirai-s3-csv-uploads-to-dynamodb.csv
テスト
再掲ですが、アップロードするCSVデータは以下です。
inquiry_type,response_text,status 製品,製品ページをご覧ください。,ok 注文,注文確認ページからご確認をお願いいたします。,ok 配送,メールに記載の追跡番号からご確認いただけます。,ng
S3バケットに「s3-csv-uploads-to-dynamodb.csv」をアップロードすると、DynamoDBへ書き込みが確認できました。
アップロードする度に、テーブル内の項目は全削除しますので、項目数が減る場合も反映されます。
ファイルを以下に変更して、アップロードします。
inquiry_type,response_text,status 製品,製品ページをご覧ください。,ok 支払い,注文確認メールをご確認ください。,ok
反映されていることが確認できました。
最後に
本記事では、Amazon S3にアップロードされたCSVファイルのデータを、AWS Lambdaを使ってAmazon DynamoDBテーブルに自動的に書き込む方法について解説しました。
この構成を活用することで、CSVデータの取り込みを効率化し、DynamoDBの利点を活かしたデータ管理が可能になります。
参考
https://docs.python.org/ja/3/library/csv.html
https://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/developerguide/programming-with-python.html