DynamoDB Streams 를 활용하여 Elasticsearch 와 데이터 싱크 맞추기

AWS DynamoDB Stream 을 활용한 DynamoDB 및 ElasticSearch 의 데이터 동기화에 대한 글입니다. AWS Lambda 는 TypeScript 로 작성하였고, serverless framework 로 배포합니다.
2019.08.19

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

안녕하세요! 클래스메소드 컨설팅부의 김태우입니다 :D

이번 글에서는 DynamoDB 의 Stream 기능을 활용하여 Amazon ElasticSearch 도메인(클러스터)에 (준)실시간으로 데이터를 어떻게 연동할 수 있는지 알아보겠습니다. 데이터를 "준"실시간으로 연동한다고 했는데요, 이것은 Elasticsearch 에서 데이터를 인덱싱하는데 필요한 시간이 대략 1초 가량 걸리기때문에 실시간이 아닌 준실시간이 됩니다. 그럼 시작해보겠습니다!ㅎㅎ

목차

DynamoDB Stream 이란?

DynamoDB Stream이란, 이름 그대로 DynamoDB 테이블의 데이터에 변경사항(데이터 생성/수정/삭제)이 발생하면 이를 이벤트 스트리밍 형식으로 처리할 수 있는 기능을 의미합니다. 스트리밍 이벤트가 발생하면 등록된 AWS Lambda 함수를 호출하게 되며, Lambda 함수에서 이벤트를 핸들링하게 됩니다. 이 때, 사용자의 코드의 잘못이 아닌 이유로 Lambda 함수가 실패하는 경우, 즉, DynamoDB Stream 에서 이벤트 소스 Lambda 함수 호출에 실패하는 경우에는 AWS Lambda는 데이터가 만료될 때까지(최대 7일) 실패한 레코드 배치를 처리하려고 시도합니다. 따라서, 매우 높은 수준의 안정성을 제공하고 있으므로 믿고 쓸 수 있는것이죠!ㅎㅎ

ElasticSearch 란?

Elasticsearch 는 Apache Lucene 기반의 분산처리 검색엔진으로, 주로 (구)ELK 스택이라고 알려진 Elastic stack 의 핵심 컴포넌트로 잘 알려져 있습니다. 바로 몇일전까지는 Amazon Elasticsearch Service 에서는 6.7버전까지 이용가능했으나 (현시점 최신버전은 7.3) 이제부터는 6.8 및 7.1 버전도 이용가능하도록 업데이트 되었습니다! 6.x버전에서 7.x버전으로 업데이트되면서 주요한 변경사항에 대해서는 Elastic의 공식문서를 참조해주세요. Elasticsearch 에 대한 자료는 이미 많은 분들이 정리해주셨고 쉽게 찾아볼 수 있기때문에 본 블로그에서는 Elasticsearch 의 기본컨셉 등에 대한 내용은 설명하지 않겠습니다 :)

어떨 때 DynamoDB Stream + Elasticsearch 를 사용하게 되는걸까?

많은 Use case 가 있겠지만 기본적으로는 DynamoDB 는 검색을 위한 데이터베이스가 아니라는 점이 아쉬울 경우에 사용하게 됩니다. 다시 말해, DynamoDB 를 메인 DB 로 사용하는게 좋을 것 같지만, 검색기능이 부족하다고 판단되는 경우에 DynamoDB Stream 을 활용하여 Elasticsearch 에 데이터를 동기화시키는 전략을 사용하면 매우 훌륭한 조합으로 사용할 수 있게 됩니다. 다만, 앞서 언급한 것 처럼 Elasticsearch (정확히는 Lucene) 의 인덱싱 속도가 1초가량 걸리기때문에 도입하실 때 이 부분은 주의해야합니다. DynamoDB 의 GSI, LSI 등을 통해 손쉽게 쿼리를 구성할 수 있는 경우에는 굳이 Elasticsearch 를 도입하지 않아도 괜찮겠지만, DynamoDB 를 사용하면서 처음에는 의도치 않았던 복잡한 쿼리를 처리해야하는 등의 이슈가 발생할 수가 있겠죠. 본 블로그에서는 이런 경우에, GSI 등으로 어떻게든 해보려고 노력하지 않아도 DynamoDB Stream 을 통해 Elasticsearch 에 데이터 동기화를 시키는 방법도 있다는 점을 전달하고자 합니다! :)

직접 만들어봅시다

DynamoDB Stream 을 Elasticsearch 에 데이터를 동기화시키는 방법은 아래와 같이 매우 간단합니다.

  1. DynamoDB Table 이 없는 경우 생성한다.

  2. 해당 DynamoDB 의 Table 에서 Stream 을 생성해준다.

  3. Amazon Elasticsearch Service 의 도메인(클러스터)을 생성한다.

  4. Lambda 함수를 만들어서 DynamoDB 의 Stream 이벤트를 전달받도록 설정한다.

1. DynamoDB 테이블 생성하기

일단, 본 블로그에서는 새로운 DynamoDB 테이블을 만드는 것으로 시작하겠습니다. 저는 테이블명을 Employee 로 정하고, 파티션키를 id (String) 으로 지정하겠습니다. 다른 것들은 전부 Default 세팅으로 진행해도 상관없습니다.

2. DynamoDB Stream 생성하기

테이블이 만들어지면 Manage Stream 버튼을 눌러서 DynamoDB Stream 를 생성합니다.

View Type 을 선택하게 됩니다. Keys Only 는 말그대로 변경된 데이터의 key 만을 전달하는 방식입니다. New Image 는 새로 생성된 데이터 (생성, 변경의 경우)가 전달되는 데이터입니다. Old Image 는 기존 데이터(변경, 삭제의 경우)가 전달되는 데이터입니다. 본 블로그에서는 New and old Images 를 선택하겠습니다.

Stream 이 생성되면 아래와 같이 arn 을 확인할 수 있습니다.

3. Amazon Elasticsearch Service Domain 생성

AWS 의 Amazon Elasticsearch Service 에서는 Elasticsearch 클러스터를 도메인이라는 이름으로 부릅니다. Amazon Elasticsearch Service 콘솔에 접속하고 Create a New Domain 버튼을 클릭합니다.

개발 및 테스트 타입으로 설정하고 Elasticsearch 버전을 선택합니다. 여기서는 6.7버전으로 진행하도록 하겠습니다.

도메인명은 명명규칙에 유의하며 적당히 작성합니다. 본 블로그에서는 비용을 절약하기 위해 인스턴스타입은 t2.small.elasticsearch 로 설정하겠습니다.

다른것들은 다 기본값으로 두고, 테스트를 간편하게 진행하기 위해 Public Access 가 가능하도록하고 아무런 보안설정을 하지 않도록 하겠습니다.

Elasticsearch 도메인이 생성되기까지 대략 10~20분정도가 소요됩니다. 도메인이 생성되면 아래와 같이 endpoint 를 확인할 수 있습니다.

4. Lambda 함수 작성

본 블로그에서는 TypeScript 로 Lambda 함수를 작성하겠습니다.

dynamoStreamHandler.ts

import { DynamoDBStreamEvent } from "aws-lambda";
import { insertDataToES, modifyDataToES, removeDataToES } from "./streamUtils";

const esDomainUrl = 'https://search-my-search-domain-qdttvcr4xck44yqqhledc6g5qm.ap-northeast-1.es.amazonaws.com';
const indexName = 'employee';
const typeName = 'doc';

export function handler(event: DynamoDBStreamEvent) {
  const records = event.Records;

  const keyName = 'id';
  const requestUrl = esDomainUrl + '/' + indexName + '/' + typeName + '/';

  records.forEach(r => {
    const type = r.eventName;
    const dynamodb = r.dynamodb!;

    switch (type) {
      case "INSERT":
        insertDataToES(keyName, requestUrl, dynamodb);
        break;
      case "MODIFY":
        modifyDataToES(keyName, requestUrl, dynamodb);
        break;
      case "REMOVE":
        removeDataToES(keyName, requestUrl, dynamodb);
        break;
      } 
  });
}

streamUtils.ts

import { StreamRecord } from "aws-lambda";
import AWS from "aws-sdk";
import axios from "axios";

export function insertDataToES(keyName: string, requestUrl: string, dynamodb: StreamRecord) {
    const insertedData = AWS.DynamoDB.Converter.unmarshall( dynamodb.NewImage! );
    requestUrl += insertedData[keyName] + '/';

    axios.post(requestUrl, insertedData).then(result => {
        console.log(result);
    }).catch(err => {
        console.error(err);
    })
}

export function modifyDataToES(keyName: string, requestUrl: string, dynamodb: StreamRecord) {
    const modifiedData = AWS.DynamoDB.Converter.unmarshall( dynamodb.NewImage! );
    const wrappedData = { "doc": modifiedData };
    requestUrl += modifiedData[keyName] + '/_update';

    axios.post(requestUrl, wrappedData).then(result => {
        console.log(result);
    }).catch(err => {
        console.error(err);
    })
}

export function removeDataToES(keyName: string, requestUrl: string, dynamodb: StreamRecord) {
    const removedData = AWS.DynamoDB.Converter.unmarshall( dynamodb.OldImage! );
    requestUrl += removedData[keyName];

    axios.delete(requestUrl).then(result => {
        console.log(result);
    }).catch(err => {
        console.error(err);
    })
}

위와 같이 작성된 Lambda 함수를 serverless 프레임워크를 통해 배포하겠습니다.

serverless.yml

service:
  name: ddb-stream-to-es-test

frameworkVersion: '>=1.0.0 <2.0.0'

provider:
  name: aws
  runtime: nodejs8.10
  stage: dev
  region: ap-northeast-1

functions:
  DynamoDBCallback:
    handler: dist/dynamoStreamHandler.handler
    events:
      - stream:
          arn: 이곳에 아까 만든 DynamoDB Stream 의 arn 을 옮겨적습니다.
          type: dynamodb
          batchSize: 100
    memorySize: 128
    timeout: 60

제가 테스트를 진행했던 프로젝트를 Github 에 올려두었으니 다른 파일들은 해당 repository 를 참조해주세요 :)

아래 명령어를 입력하여 배포해보겠습니다.

npm run deploy

배포가 성공적으로 진행된 모습입니다.

5. 테스트하고 결과 확인하기

이제 DynamoDB 에 데이터를 생성하여 실제로 Elasticsearch 에 데이터 동기화를 해보도록 하겠습니다. 새로운 Employee 데이터를 생성합니다.

데이터가 생성되면 곧바로 DynamoDB Stream 을 통해 작성한 Lambda 함수가 호출됩니다. Lambda 함수가 호출되면 CloudWatch Logs 의 Log Group 에 로그가 남게되니 확인해보겠습니다.

Elasticsearch 에 인덱스를 생성함과 동시에 데이터가 Insert 된 것 같습니다! 곧바로 Elasticsearch 의 Endpoint 를 통해 데이터가 들어갔는지 확인해보겠습니다.

INSERT 는 성공했습니다. MODIFY 및 REMOVE 도 제대로 동작하는지 살펴보겠습니다.

role 필드를 추가했습니다.

CloudWatch Logs 및 Elasticsearch 에서도 정상적으로 확인할 수 있었습니다! 마지막으로 REMOVE 도 작동하는지 보겠습니다.

깔끔하게 지워졌습니다. Elasticsearch 의 인덱스는 남아있네요.

마무리

DynamoDB + Elasticsearch 조합은 꽤나 자주 쓰이는 패턴임에도 불구하고 블로그로 정리된 글을 찾을 수 없어서 제가 직접 작성해봤습니다. DynamoDB Stream 을 생성하는것은 버튼 한번만 누르면 될 정도로 쉽고, 결국 간단한 Lambda 함수 하나만 작성하면 바로 Elasticsearch 와 데이터 동기화가 된다는 점은 정말 멋진것 같습니다. 또 DynamoDB 의 여러가지 사용패턴을 좀 더 공부해봐야겠다는 생각이 들었습니다 :)