AWS Lambda와 DynamoDB Streams를 활용한 트위터 포스팅봇 만들기

본 포스트는 AWS의 Lambda와 DynamoDB Streams, Twitter API를 이용하여 Developers.IO의 한국어 포스트를 트위터에 업로드 하는 작업에 관한 내용을 다룹니다. 또한 설계한 아키텍처에 따라 하나씩 구성해보면서 설명합니다. 서버리스에 입문하시는 분들이 간단하게 따라 해보시면 좋을 것 같습니다.
2020.06.10

시작하며

안녕하세요! 클레스메서드의 신입엔지니어 임홍기입니다.
종종 글을 쓰고 있는 이곳 DevelopersIO는 일본 블로그이지만, 회사에 계신 다양한 나라의 분들이
여러 언어로 작성해주고 계셔서 일본어 외의 언어로 작성된 블로그도 늘고 있습니다.
또한 이번 분기에는 저를 비롯한 6명의 한국인이 입사하게 됨으로써, 한글로 작성된 포스트도 점점 늘어나고 있습니다!
따라서, '한국어블로그' 태그가 붙어있는 게시글을 따로 모아서 볼 수 있도록 트위터에 포스팅하는 시스템을 구성해보았습니다.

해당 포스트에서 사용된 한국어블로그 모음입니다! 흥미 있으시면 부디 읽어보세요!

아키텍처

Architecture

전체적인 흐름

  1. RSS feed에서 "한국어블로그" 태그가 붙은 게시글들을 받아옵니다.
  2. 첫 번째 Lambda에서 요청한 게시글 중 새로운 게시글만 DynamoDB에 넣어줍니다.
  3. DynamoDB에 저장된 데이터들은 DynamoDB Streams를 통해 2번째 Lambda로 넘어갑니다.
  4. 두 번째 Lambda에서 DynamoDB에 추가된 게시글을 Twitter API를 사용하여 트위터에 포스팅합니다.
  5. CloudWatch의 Rules를 이용해, 작업 스케줄러를 작성하여 일정 시간마다 실행시켜줍니다.

DynamoDB 테이블 생성 및 Stream 활성

DynamoDB0

먼저 이 부분을 구성해보도록 하겠습니다.

DynamoDB1 DynamoDB2

서비스에서 DynamoDB를 선택한 뒤, Tables 탭에서 Create table을 클릭합니다.
Table name과 Primary key를 정해줍니다. Table 설정은 default로 해줍니다.
Key 값은 중복되지 않는 컬럼의 스키마로 정해주면 됩니다. 저는 겹칠일이 없는 title로 지정해보겠습니다.

DynamoDB3

여기까지 하면 테이블 생성은 끝이 납니다. 여기서 DynamoDB Streams를 활성화 시켜줘야

4

데이터의 변화(생성, 수정, 삭제)가 발생했을 때, 이벤트를 전달할 수 있습니다.
view type에 따라 streams 안에 담겨 오는 데이터가 달라집니다.

5

DynamoDB stream을 활성화했다면 DB 설정은 끝입니다.

Manage Stream의 View type
Keys only : 수정된 데이터의 Key 값만 전송
New Image : 변경 후의 데이터값만 전송
Old Image : 변경되기 전의 데이터값만 전송
New and Old Images : DB가 변경되기 전, 후의 값 모두 전송

자세한 내용은 DynamoDB 스트림작업에서 확인할 수 있습니다.

첫 번째 Lambda 작성

lambda0

이번엔 RSS 데이터를 불러오는 부분을 구성해보도록 하겠습니다.

rssfeed

(한국어블로그의 RSS 데이터)

lambda1

서비스에서 Lambda를 선택하고 Create function을 선택해 새로운 lambda 함수를 작성합니다.
Lambda 서비스에서 사용한 권한은 아래에 기재되어 있습니다.

코드 작성

// request rss ... Dynamodb SDK사용 ...
// 받아온 XML 형태의 데이터를 JSON으로 변환시켜준 뒤, DynamoDB에 저장합니다.
const params = {
    TableName: "PostTest",
    Item:JSON.parse(body).items[i],
    ConditionExpression: 'attribute_not_exists(title)' //Key 값이 중복되지 않는 것만 추가
   };

ddb.put(params, (err, data) => {
     if (err) console.log("error : " + err);
     else console.log(data);
   });

두 번째 Lambda 작성

2lambda0

이번엔 트위터와 연동되는 Lambda 함수를 구성해보도록 하겠습니다.

트위터의 API를 사용하기 위한 twitter app의 등록은 끝났다고 가정하고 진행하겠습니다.

2lambda1

두 번째 함수의 경우, DynamoDB에 새로운 객체가 추가되었을 때 실행될 수 있도록 트리거를 설정해주어야 합니다.

2lambda2

추가되면 DynamoDB와 Lambda가 연결됩니다. 앞으론 DB에 이벤트가 일어날 때마다, 이 함수가 실행될 것입니다.

코드 작성

//twitter module 사용 ... twitter token 설정 ...
for (let i = 0; i < event.Records.length; i++) {
    if (event.Records[i].eventName === "INSERT") {
      let message = event.Records[i].dynamodb.NewImage["title"];
      let link    = event.Records[i].dynamodb.NewImage["link"];
      client.post('statuses/update', {status: "#DevelopersIO" + " " + message + "\n" + link}, 
        function(error, tweet, response) {
            if (!error) console.log("tweet success: " + tweet + ", response : " + response);
            else console.log("tweet fail: " + error);
        }
    });      
    }

2lambda3

CloudWatch에서 이벤트의 레코드객체의 로그를 살펴보면 INSERT 이벤트가 발생했다는 것을 알 수 있습니다.
이를 통해 생성, 삭제, 수정되었을 때의 분기를 나눠서 작업할 수도 있습니다.

CloudWatch로 정기적으로 호출하기

cloudwatch0

마지막으로 CloudWatch에서 첫 번째 Lambda를 매일 12시, 18시에 호출해보겠습니다.

cw1

CloudWatch의 Event 탭에 있는 Rules에서 rule을 생성합니다.

cw2-1

Schedule의 Cron expression을 선택하고 실행하고자 하는 시간을 표현식에 맞게 작성합니다.
매일 12시, 18시에 호출하고자 하면 표현식은 (0 12,18 * * ? *)이 됩니다.
하지만 호출하고 있는 환경 자체는 표준시(UTC)를 따르기 때문에, -9시간을 해주어야 합니다. (0 3,9 * * ? *)

cw3

표현식을 작성한 뒤, Tagets에서 호출하고자 하는 2번째 Lambda를 지정합니다.

Cron표현식이 헷갈릴 때에는 crontab guru와 같은 사이트를 이용하는 것을 추천드립니다.

cw4 cw5

Rule의 이름과 설명을 적었다면 끝입니다. 작성된 Rule을 눌러서 제대로 작성되었는지 확인합니다.

cw6

CloudWatch가 Lambda에 트리거 된 걸 확인할 수 있습니다.

테스트

이것으로 아키텍처의 구조대로 전부 구현하였습니다. 잘 동작하고 있는지 테스트를 해보겠습니다.

test0

현재 제 트위터에는 아무 글도 없습니다.

test01

DynamoDB에는 현재 모든 기사의 데이터가 입력되어 있기 때문에 제가 작성한 한국어기사를 지우고, 첫 번째 Lambda를 실행시켜 보겠습니다.

test02

Log를 보면 2건의 트위터 요청이 성공했다는 것을 알 수 있습니다. 트위터에도 올라갔는지 확인해볼까요?

test03

트위터에도 성공적으로 글이 올라갔습니다!

사용한 리소스의 Role 설정

  1. 첫 번째 Lambda

AWSLambdaBasicExecutionRole

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "logs:CreateLogGroup",
            "Resource": "arn:aws:logs:[region]:[AccountID]:*"
        },{
            "Effect": "Allow",
            "Action": ["logs:CreateLogStream", "logs:PutLogEvents"],
            "Resource": "arn:aws:logs:[region]:[AccountID]:log-group:/aws/lambda/putdynamodb:*"
        }
    ]
}

DynamoDBRole

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": "dynamodb:PutItem",
            "Resource": "arn:aws:dynamodb:[region]:[AccountID]:table/PostTest"
        }
    ]
}
  1. 두번 째 Lambda

AWSLambdaBasicExecutionRole 리소스 위치 외에 동일

DynamoDBStreamRole

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "dynamodb:GetShardIterator",
                "dynamodb:DescribeStream",
                "dynamodb:ListStreams",
                "dynamodb:GetRecords"
            ],
            "Resource": "arn:aws:logs:[region]:[AccountID]:log-group:/aws/lambda/postTweet:*"
        }
    ]
}

단순히 객체를 넣는 권한과는 달리 Stream과 Shard(샤드) 권한을 주어야 합니다. 자세한 내용은 Lambda 실행 역할 만들기를 참조해주세요.

스트림 처리를 위한 권한들이 수행하는 작업

GetShardIterator : 특정 stream의 shard(데이터 구조)를 가지고 와서 iterator를 반환합니다.
Shard Iterator란? : 특정 shard 안에서 data를 찾아갈 수 있도록 해 주는 Index Key
DescribeStream : 특정 stream의 현재 상태를 반환합니다.
ListStreams : 계정이 가지고 있는 stream의 리스트를 반환합니다.
GetRecords : Shard Iterator를 통해서 실제 데이터 반환합니다.

마지막으로

이상으로 AWS Lambda와 DynamoDB Stream으로 게시글을 포스팅하는 서비스를 구성해보았습니다.
서버리스를 처음 접해보았는데 서버를 구성할 필요 없이 코드가 실행되고, 다른 서비스와 연결하고 분리하는 것이 간단하다는 점이 정말 좋았습니다.
추후 한국어 포스트만 모아서 볼 수 있는 Developers.IO Korea 블로그와, Twitter가 공개될 예정이니 많이 기대해주세요!!!

참고

DynamoDB에 대한 자세한 문서는 DynamoDB 개발자 안내서를 읽어주세요!

DynamoDB의 SDK 사용법은 DynamoDB SDK를 읽어주세요!

DynamoDB에서 발생하는 오류는 DynamoDB 오류 처리를 읽어보시면 도움이 됩니다.