DynamoDB Streams를 사용하여 DynamoDB 테이블의 변경 사항을 실시간으로 수집해보자!!

2021.08.10

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

안녕하세요, 클래스메서드의 정현재입니다.

얼마 전 DynamoDB의 기본 개념에 대해 블로그를 작성한 적이 있는데요, 이번에는 좀 더 나아가 DynamoDB Streams에 대한 이야기를 해보려고 합니다.

이번 블로그에서는 DynamoDB Streams에 대한 개념을 설명한 후, 간단한 실습을 다뤄보도록 하겠습니다.

DynamoDB Streams란?

DynamoDB Streams란 DynamoDB 테이블의 변경사항에 대한 정보들을 추적할 수 있는 기능입니다.

DynamoDB 테이블에 DynamoDB Streams 기능을 활성화하면 테이블에서 일어나는 모든 Insert, Modify, Remove 이벤트를 stream record로서 받을 수 있고, 해당 정보는 Queue에 쌓여 24시간 동안 유지된 뒤 삭제 됩니다. 테이블의 항목들이 변경될 때마다 실시간 데이터 처리 등을 하고 싶은 경우 매우 유용하게 사용될 수 있겠죠. 이번 실습에서도 다룰 내용이지만 굉장히 간단히 Lambda의 trigger와 연결하여 어떠한 사후 처리를 실행할 수 있습니다.

그럼 실제 DynamoDB Streams를 사용해보며 이해를 높여보도록 하겠습니다.

이번 실습에서는 DynamoDB Streams를 활성화 하고 Lambda의 trigger와 결합하여 DynamoDB 테이블의 변경사항을 S3 버킷에 로깅해보도록 하겠습니다.

DynamoDB Streams 실습

DynamoDB Streams 활성화

우선, DynamoDB 테이블을 하나 작성합니다.

작성한 테이블을 선택한 후, 개요 탭에서 DynamoDB 스트림 관리 버튼을 클릭합니다.

DynamoDB 테이블에 변경 사항이 생겼을 때, 어떠한 내용을 스트림 레코드에 포함시킬지 설정하는 화면이 나옵니다. 이번 실습에서는 [새 이미지와 이전 이미지]를 선택하도록 하겠습니다.

활성화 버튼을 누르면 위와 같이 DynamoDB Streams 기능이 활성화 된 것을 확인할 수 있습니다. DynamoDB Streams 기능은 언제든지 활성화 하고, 비활성화 할 수 있습니다.

그럼 이렇게 활성화 한 DynamoDB Streams를 Lambda의 trigger에 연결해보도록 하겠습니다.

Lambda trigger 연결

DynamoDB Streams로부터 받은 레코드를 S3 버킷에 저장하는 Lambda 함수를 만들어보겠습니다.

우선 S3와 DynamoDB에 접근할 수 있는 IAM 역할을 부여하여 Lambda 함수를 하나 생성합니다. (이번 실습에서는 Lambda 함수의 런타임으로서 Python 3.8을 설정하였습니다.)

생성한 Lambda 함수에서 트리거 추가를 클릭하여 스트림 기능을 활성화한 DynamoDB 테이블을 선택한 후 추가 버튼을 누릅니다. 이제 DynamoDB 테이블에서 변경사항이 생길 때마다 이 Lambda 함수가 실행되게 됩니다. 그럼 Lambda 함수의 코드를 작성해보겠습니다.

import json
import boto3
from datetime import datetime

def lambda_handler(event, context):

    s3 = boto3.resource('s3')
    bucket_name = "stream-test-log" ## S3 버킷 이름

    for record in event["Records"]:

        key = record["dynamodb"]["Keys"]["UserId"]["S"]

        file_name = key + "_" + datetime.now().strftime('%Y%m%d_%H%M%S') + ".json" ## S3 파일 이름
        obj_name = s3.Object(bucket_name, file_name)

        obj_name.put(Body=json.dumps(record))

DynamoDB 테이블의 변경사항에 대한 스트림 레코드는 event["Records"]에 JSON형태로 담겨있습니다. S3에 저장할 파일의 이름을 설정하기 위해 변경된 항목의 파티션 키를 가져와 적절히 설정해준 후, S3에 해당 스트림 레코드를 저장하는 코드를 작성하였습니다.

작성한 Lambda함수를 Deploy하고, 실제로 스트림 레코드에 어떠한 정보가 담겨 있는지 확인해보도록 하겠습니다.

스트림 레코드 내용 확인

아까 작성한 DynamoDB 테이블에서 다음과 같은 작업을 수행해봅시다.

  1. DynamoDB 테이블에 항목 추가
    UserId(String) msg(String)
    dynamo Insert Item
  2. 생성한 항목 수정

    UserId(String) msg(String)
    dynamo Modify Item
  3. 생성한 항목 삭제

위의 작업을 수행할 때마다 Lambda 함수가 수행되어 S3 버킷에 스트림 레코드 정보가 저장될 것입니다. INSERT, MODIFY, REMOVE 이벤트 별로 스트림 레코드 정보를 들여다보겠습니다.

INSERT

{
    "eventID": "9b75fc7d627937c965aa7143f7158650", 
    "eventName": "INSERT", 
    "eventVersion": "1.1", 
    "eventSource": "aws:dynamodb", 
    "awsRegion": "[region]", 
    "dynamodb": {
        "ApproximateCreationDateTime": 1628577171.0, 
        "Keys": {
            "UserId": {
                "S": "dynamo"
            }
        }, 
        "NewImage": {
            "msg": {
                "S": "Insert Item"
            }, 
            "UserId": {
                "S": "dynamo"
            }
        }, 
        "SequenceNumber": "23241900000000016191110770", 
        "SizeBytes": 38, 
        "StreamViewType": "NEW_AND_OLD_IMAGES"
    }, 
    "eventSourceARN": "arn:aws:dynamodb:[region]:[AWS Account ID]:table/dynamodb-streams-handson/stream/2021-08-06T11:11:34.568"
}

MODIFY

{
    "eventID": "ac3336e4ac17005525be2c754ae75a6d", 
    "eventName": "MODIFY", 
    "eventVersion": "1.1", 
    "eventSource": "aws:dynamodb", 
    "awsRegion": "[region]", 
    "dynamodb": {
        "ApproximateCreationDateTime": 1628577180.0, 
        "Keys": {
            "UserId": {
                "S": "dynamo"
            }
        }, 
        "NewImage": {
            "msg": {
                "S": "Modify Item"
            }, 
            "UserId": {
                "S": "dynamo"
            }
        }, 
        "OldImage": {
            "msg": {
                "S": "Insert Item"
            }, 
            "UserId": {
                "S": "dynamo"
            }
        }, 
        "SequenceNumber": "23242000000000016191119891", 
        "SizeBytes": 64, 
        "StreamViewType": "NEW_AND_OLD_IMAGES"
    }, 
    "eventSourceARN": "arn:aws:dynamodb:[region]:[AWS Account ID]:table/dynamodb-streams-handson/stream/2021-08-06T11:11:34.568"
}

REMOVE

{
    "eventID": "0a96116c67d0ff912e8fb15b02d20220", 
    "eventName": "REMOVE", 
    "eventVersion": "1.1", 
    "eventSource": "aws:dynamodb", 
    "awsRegion": "[region]", 
    "dynamodb": {
        "ApproximateCreationDateTime": 1628577186.0, 
        "Keys": {
            "UserId": {
                "S": "dynamo"
            }
        }, 
        "OldImage": {
            "msg": {
                "S": "Modify Item"
            }, 
            "UserId": {
                "S": "dynamo"
            }
        }, 
        "SequenceNumber": "23242100000000016191125879", 
        "SizeBytes": 38, 
        "StreamViewType": "NEW_AND_OLD_IMAGES"
    }, 
    "eventSourceARN": "arn:aws:dynamodb:[region]:[AWS Account ID]:table/dynamodb-streams-handson/stream/2021-08-06T11:11:34.568"
}

이와 같이 변경 내용에 대한 정보가 JSON형태로 담겨 있는 것을 확인할 수 있습니다.

마치며..

지금까지 DynamoDB Streams 기능에 대해 살펴보았습니다.

이번 실습에서는 스트림 레코드를 S3 버킷에 로깅하는 아주 간단한 내용을 다뤄보았는데요, 더 나아가 DynamoDB 테이블에 변경 이벤트가 일어날 때마다 변경 내용을 외부 API에 전송하여 처리를 하는 등 아주 유용하게 사용될 수 있는 기능이라고 생각합니다. 더군다나 사용하는 방법도 굉장히 간단하고, 모든 리전에서 DynamoDB Streams 읽기 요청 유닛에 대해 250만건까지 무료로 제공하기 때문에 부담 없이 사용할 수 있는 기능이라고 생각합니다.

블로그는 이상입니다. 다음에도 더 좋은 기사로 찾아뵙도록 하겠습니다!!