CSVデータをS3にアップロード時AWS Lambdaをトリガーし、自動的にDynamoDBへデータを書き込んでみた

2024.06.10

はじめに

Amazon S3バケットにアップロードしたCSVデータをAmazon DynamoDBテーブルに自動的に書き込むAWS Lambdaの作成方法を紹介します。

CSVデータをAmazon S3バケットにアップロード時、AWS LambdaをトリガーしAmazon DynamoDBテーブルにデータを書き込む方法について紹介します。

CSVデータをAmazon S3バケットにアップロード時、AWS LambdaをトリガーしDynamoDBにデータを書き込んでみた

構成は以下の通りです。

本構成では、AWSマネジメントコンソールからS3バケットにCSVファイルをアップロードすると、S3のPUTイベントをトリガーとしてLambda関数が起動します。Lambda関数は、CSVファイルの内容を読み取り、DynamoDBテーブルに書き込む処理を行います。

この構成は様々なシナリオで活用できます。例えば、お客様からのお問い合わせに対して、SMS送信で自動的に回答するシステムを構築する場合が挙げられます。

お問い合わせ内容ごとに適切な回答文をあらかじめDynamoDBで管理しておき、お問い合わせを受信したらその内容に応じて、対応する回答文をDynamoDBから取得してSMSで送信することで、迅速かつ的確な対応が可能になります。

回答文の追加や修正も、CSVファイルを更新してS3にアップロードするだけで簡単に反映できるのがメリットです。これにより、運用コストを削減しつつ、柔軟なデータ管理が実現できます。

前提条件

  • アップロード用S3バケットを作成済み
    • バケット名:cm-hirai-s3-csv-uploads-to-dynamodb
  • アップロードするCSVデータは以下の通り
    • 先頭行は各列のタイトルになりますが、アルファベットで指定ください。指定したタイトル名がDynamoDBテーブルに書き込まれる属性名となります。
inquiry_type,response_text,status
製品,製品ページをご覧ください。,ok
注文,注文確認ページからご確認をお願いいたします。,ok
配送,メールに記載の追跡番号からご確認いただけます。,ng

DynamoDBテーブル作成

以下の設定でテーブルを作成します。

  • パーティションキー:inquiry_type(文字列)
  • キャパシティ設定:オンデマンド
  • 他はデフォルト

Lambda関数の作成

以下の設定です

  • ランタイム:Python 3.12
  • 適用するIAMポリシー
    • AmazonDynamoDBFullAccess
    • AmazonS3ReadOnlyAccess
  • タイムアウト:10秒
import boto3
import csv
import json

def get_s3_file_content(bucket_name, object_key):
    s3 = boto3.client('s3')
    csv_file = s3.get_object(Bucket=bucket_name, Key=object_key)
    return csv_file['Body'].read().decode('utf-8')

def parse_csv_data(csv_content):
    csv_data = csv.DictReader(csv_content.splitlines())
    return list(csv_data)

def delete_all_items_from_dynamodb(table):
    scan = table.scan()
    with table.batch_writer() as batch:
        for each in scan['Items']:
            batch.delete_item(Key={'inquiry_type': each['inquiry_type']}) # プライマリキーの属性名に合わせて修正

def write_csv_data_to_dynamodb(table, csv_data):
    with table.batch_writer() as batch:
        for item in csv_data:
            batch.put_item(Item=item)

def lambda_handler(event, context):
    print('event:' + json.dumps(event, ensure_ascii=False))

    bucket_name = event['Records'][0]['s3']['bucket']['name']
    object_key = event['Records'][0]['s3']['object']['key']

    csv_content = get_s3_file_content(bucket_name, object_key)
    csv_data = parse_csv_data(csv_content)

    dynamodb = boto3.resource('dynamodb')
    table_name = 'cm-hirai-s3_csv_uploads_to_dynamodb'
    table = dynamodb.Table(table_name)

    delete_all_items_from_dynamodb(table)
    write_csv_data_to_dynamodb(table, csv_data)

テーブル名とプライマリキー名は、実際の環境に合わせて適切な値に変更してください。

実装ポイント

CSVの読み込み

Lambda関数内では、boto3を使ってS3からCSVファイルを取得し、csvモジュールを使ってパースします。DictReaderを使うことで、CSVの1行目をキーとして扱い、各行をdictとして取得できます。

DynamoDBへの書き込み

batch_writerを使用することで、複数の書き込みや削除操作をバッチ処理として一括(最大25件)で実行できます。これにより、大量のデータを効率的に処理できます。

トリガーの設定

Lambda関数にS3イベントをトリガーとして設定します。

トリガーの設定は以下の通りです。

  • バケット:s3/cm-hirai-s3-csv-uploads-to-dynamodb
  • イベントタイプ:PUT
  • プレフィックス:cm-hirai-s3-csv-uploads-to-dynamodb.csv

テスト

再掲ですが、アップロードするCSVデータは以下です。

inquiry_type,response_text,status
製品,製品ページをご覧ください。,ok
注文,注文確認ページからご確認をお願いいたします。,ok
配送,メールに記載の追跡番号からご確認いただけます。,ng

S3バケットに「s3-csv-uploads-to-dynamodb.csv」をアップロードすると、DynamoDBへ書き込みが確認できました。

アップロードする度に、テーブル内の項目は全削除しますので、項目数が減る場合も反映されます。

ファイルを以下に変更して、アップロードします。

inquiry_type,response_text,status
製品,製品ページをご覧ください。,ok
支払い,注文確認メールをご確認ください。,ok

反映されていることが確認できました。

最後に

本記事では、Amazon S3にアップロードされたCSVファイルのデータを、AWS Lambdaを使ってAmazon DynamoDBテーブルに自動的に書き込む方法について解説しました。

この構成を活用することで、CSVデータの取り込みを効率化し、DynamoDBの利点を活かしたデータ管理が可能になります。

参考

https://docs.python.org/ja/3/library/csv.html

https://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/developerguide/programming-with-python.html