AWS LambdaとDynamoDB Streamsを活用してTwitter Botを作ってみた

AWSのLambdaとDynamoDB Streams、Twitter APIを利用してDevelopers.IOの韓国語ポストをツイッターにアップロードする作業に関する内容です。また、設計したアーキテクチャに従って1つずつ構成しながら説明します。サーバーレスに入門する方は、簡単に真似してみるといいと思います。
2020.06.11

初めに

こんにちは!classmethodの新卒エンジニアホンギです。
記事を書いているここDevelopersIOは日本のブログですが、会社にいらっしゃる様々な国の方々が
いろんな言語でブログを書いてくださって日本語以外の言語で作成されたブログも増えています。
また、今期には私を含め、6人の韓国人が入社することになり、ハングルで作成されたポストも増えています!
よって「韓国語ブログ」のタグが貼ってある記事を集めて閲覧できるようにTwitterにポスティングするシステムを構築してみました。

ポストで使用された韓国語ブログ集です!興味がありましたら是非読んでみてください!

Architecture

Architecture

流れ

  1. RSS feedで「韓国語ブログ」タグが貼ってある記事を受け取ります。
  2. 最初のLambdaで要請した記事のうち、新しい記事だけをDynamoDBに入れます。
  3. DynamoDBに保存されたデータは、DynamoDB Streamsを通して2番目のLambdaに移動されます。
  4. 2番目のLambdaでDynamoDBに追加された記事をTwitter APIを使用してTwitterにポスティングします。
  5. CloudWatchのEvent Rulesを利用して作業スケジューラーを作成し、一定時間ごとに実行させます。

動作環境

  • Lambda Runtime : Node.js 12.x

DynamoDBテーブル作成及びStream有効化

DynamoDB0

まず、この部分を構成してみましょう。

DynamoDB1 DynamoDB2

サービスでDynamoDBを選択し、TablesタブでCreate tableをクリックします。
Table nameとPrimary Keyを定義します。Table設定はdefaultにします。
Keyは重複しないカラムのスキーマに設定してください。私は重なることがないtitleに指定してみます。

DynamoDB3

ここまですると、テーブルの作成は終わりです。ここでDynamoDB Streamsを有効化させると

4

データの変化(生成、修正、削除)が発生した場合、イベントをお届けできます。
view typeによってstreamsの中に入るデータが変わります。

5

DynamoDB streamが有効化されたら、DBの設定は終わりです。

Manage StreamのView type
Keys only : 修正されたデータのKeyのみ送信
New Image : 変更後のデータのみ転送
Old Image : 変更される前のデータのみ転送
New and Old Images : DBが変更される前と後全て転送

詳細はDynamoDB Streamsで確認することができます。

1番目のLambda作成

lambda0

今回はRSSデータの読み込み部分を構成してみましょう。

rssfeed

(韓国語ブログのRSSデータ)

lambda1

サービスでLambdaを選択し、Create functionを選択して新しいlambda関数を作成します。
Lambdaサービスで使用した権限は、下記に記載されています。

コード作成

// request rss ... Dynamodb SDK使用 ...
// 受信したXML形式のデータをJSONに変換させ、DynamoDBに保存します。
const params = {
    TableName: "PostTest",
    Item:JSON.parse(body).items[i],
    ConditionExpression: 'attribute_not_exists(title)' //Keyが重複しないもののみ追加する。
   };

ddb.put(params, (err, data) => {
     if (err) console.log("error : " + err);
     else console.log(data);
   });

2番目のLambda作成

2lambda0

今度はTwitterと連動するLambda関数を構成してみましょう。

TwitterのAPIを使うためのtwitter appの登録は終わったと仮定して進めます。

2lambda1

2番目の関数の場合、DynamoDBに新しいオブジェクトが追加された時に実行されるようにトリガを設定する必要があります。

2lambda2

追加されるとDynamoDBとLambdaが連結されます。これからは、DBにイベントが起きるたびに、この関数が実行されるでしょう。

コード作成

//twitter module 使用 ... twitter token 設定 ...
for (let i = 0; i < event.Records.length; i++) {
    if (event.Records[i].eventName === "INSERT") {
      let message = event.Records[i].dynamodb.NewImage["title"];
      let link    = event.Records[i].dynamodb.NewImage["link"];
      client.post('statuses/update', {status: "#DevelopersIO" + " " + message + "\n" + link}, 
        function(error, tweet, response) {
            if (!error) console.log("tweet success: " + tweet + ", response : " + response);
            else console.log("tweet fail: " + error);
        }
    });      
    }

2lambda3

CloudWatchでのイベントのレコードオブジェクトのログを見ると、INSERTイベントが発生していることが分かります。
これによって、生成、削除、修正された時の分岐を分けて作業することもできます。

CloudWatchで定期的に呼び出す

cloudwatch0

最後に、CloudWatchから最初のLambdaを毎日12時、18時に呼び出してみます。

cw1

CloudWatchのEventタブにあるRulesでruleを作成します。

cw2-1

ScheduleのCron expressionを選択し、実行する時間を表現式に合わせて作成します。
毎日12時、18時に呼び出そうとすると表現式は(0 12,18 * * ? *)になります。
しかし、呼び出している環境の時間基準はUTCに従っていますので、9時間を引き算することが必要です。(0 3,9 * * ? *)
表現式はCronを使用したスケジュール式をお読みください。

cw3

表現式を作成し、Tagetsで呼び出そうとする2番目のLambdaを指定します。

Cron表現式が紛らわしかった場合はcrontab guruのようなサイトを利用することをおすすめします。

cw4 cw5

Ruleの名前と説明を書いたら終わりです。
作成されたRuleを押して、正しく作成されたことを確認しましょう。

cw6

CloudWatchがLambdaにトリガされていることを確認できます。

テスト

これでアーキテクチャの構造通りに全てを実現しました。上手く動作しているのかテストをしてみます。

test0

現在、私のツイッターには何の記事もありません。

test01

DynamoDBには現在すべての記事のデータが入力されているので、私が作成した韓国語記事を消し、最初のLambdaを実行してみます。

test02

Logを見ると、2件のツイッターリクエストが成功したことが分かります。ツイッターにも掲載されたか確認してみよう。

test03

ツイッターにもうまく記事が掲載されたことが確認できます。

使用したリソースのRole設定

  1. 1番目のLambda

AWSLambdaBasicExecutionRole

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "logs:CreateLogGroup",
            "Resource": "arn:aws:logs:[region]:[AccountID]:*"
        },{
            "Effect": "Allow",
            "Action": ["logs:CreateLogStream", "logs:PutLogEvents"],
            "Resource": "arn:aws:logs:[region]:[AccountID]:log-group:/aws/lambda/putdynamodb:*"
        }
    ]
}

DynamoDBRole

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": "dynamodb:PutItem",
            "Resource": "arn:aws:dynamodb:[region]:[AccountID]:table/PostTest"
        }
    ]
}
  1. 2番目のLambda

AWSLambdaBasicExecutionRole リソース位置以外同じ

DynamoDBStreamRole

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "dynamodb:GetShardIterator",
                "dynamodb:DescribeStream",
                "dynamodb:ListStreams",
                "dynamodb:GetRecords"
            ],
            "Resource": "arn:aws:logs:[region]:[AccountID]:log-group:/aws/lambda/postTweet:*"
        }
    ]
}

単にオブジェクトを入れる権限とは異なり、StreamとShard(シャード)権限を与える必要があります。詳細はLambda実行ロール作りを参照してください。

最後に

以上で、AWSLambdaとDynamoDB Streamで記事をポスティングするサービスを構成してみました。
サーバレスに初めて接しましたが、サーバを構成する必要がなくコードが実行され、他のサービスと連結して分離するのが簡単だという点が本当に良かったです。
今後、韓国語のポストだけを集めて見ることができるDevelopers.IO KoreaブログとTwitterが公開される予定ですので、楽しみにしていてください!!

参考

DynamoDBについての詳しい文書はDynamoDB開発者ガイドをお読みください!

DynamoDBのSDK使用はDynamoDB SDKをお読みください!

DynamoDBで発生するエラーはDynamoDBエラー処理を読むと役に立ちます。

DynamoDB ストリームを使用したテーブルアクティビティのキャプチャ