Serverless FrameworkでLambda Destinations(非同期呼び出しの宛先指定)を実装してみた

2020.06.29

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

Lambda Destinations(非同期呼び出しの宛先指定)とは

Lambda関数実行完了をトリガーに他のAWSサービスを実行することができる機能です。現在以下 4サービスを実行することができます。

  • Amazon SQS
  • Amazon SNS
  • Lambda
  • EventBridge

Destinationsの設定にはコードを書く必要はなく、Lambdaのマネジメントコンソールから可能です。これまでは関数実行完了時に別のAWSサービスを実行したい場合は、Lambda関数内で(処理の最後の方に)別サービスを呼び出すコードを書く必要がありました *1が、これが不要になり、Lambda関数のコードをより簡潔にすることができます。なんというか、より「マネージド」になった感じです。

また、関数実行に成功した場合と失敗した場合で別々の宛先を指定することができます。もちろん片方だけ指定することもできます。

今回はこの機能をSereverless Frameworkを使って実装していきたいと思います。

パターン1: Destinationsだけ指定(宛先SQSキューは事前に作っておく)

Serverless FrameworkではDestinationsの設定だけ行ない、その宛先となるSQSキューの作成はServerless Frameworkスコープ外とします。

SQSキュー作成

CLIで作成しました。

$ aws sqs create-queue --queue-name fail-destination-queue
$ aws sqs get-queue-attributes \
> --queue-url (前述create-queueコマンド結果のURL) \
> --attribute-names QueueArn

serverless.yml

最後 2行がDestinationsに関する設定箇所です。今回は失敗時の宛先のみ指定しています。

service:
    name: destinations-sample

custom:
    webpack:
        webpackConfig: ./webpack.config.js
        includeModules: true

plugins:
    - serverless-webpack

provider:
    name: aws
    runtime: nodejs12.x
    region: ap-northeast-1
    environment:
        AWS_NODEJS_CONNECTION_REUSE_ENABLED: 1

functions:
    hello:
        handler: handler.hello
        destinations:
            onFailure: arn:aws:sqs:ap-northeast-1:123456789012:fail-destination-queue

Lambda関数

イベントペイロードに{ "success": false }を渡すとエラーを発生させるコードです。

handler.ts

import "source-map-support/register";

export const hello = async (event) => {
    console.log(event);
    if (event.success === false) {
        throw new Error("destination test");
    }
    return true;
};

デプロイ

$ npx sls deploy

コンソールでDestinationが確認できました。

テスト実行

$ npx sls invoke -f hello -d '{ "success": false }' -t Event

falseを指定したのでエラーになってDestinationの処理が走るはずです。

CloudWatch Logs出力内容

$ npx sls logs -t -f hello
START RequestId: 523089e2-8cc2-4181-8984-00a151e47ec8 Version: $LATEST
2020-06-24 21:44:45.825 (+09:00)        523089e2-8cc2-4181-8984-00a151e47ec8    INFO    { success: false }
2020-06-24 21:44:45.831 (+09:00)        523089e2-8cc2-4181-8984-00a151e47ec8    ERROR   Invoke Error    {"errorType":"Error","errorMessage":"destination test","stack":["Error: destination test","    at Runtime.n [as handler] (/var/task/webpack:/handler.ts:6:15)","    at Runtime.handleOnce (/var/runtime/Runtime.js:66:25)"]}
END RequestId: 523089e2-8cc2-4181-8984-00a151e47ec8
REPORT RequestId: 523089e2-8cc2-4181-8984-00a151e47ec8  Duration: 8.65 ms       Billed Duration: 100 ms Memory Size: 1024 MB    Max Memory Used: 68 MB

START RequestId: 523089e2-8cc2-4181-8984-00a151e47ec8 Version: $LATEST
2020-06-24 21:45:46.064 (+09:00)        523089e2-8cc2-4181-8984-00a151e47ec8    INFO    { success: false }
2020-06-24 21:45:46.066 (+09:00)        523089e2-8cc2-4181-8984-00a151e47ec8    ERROR   Invoke Error    {"errorType":"Error","errorMessage":"destination test","stack":["Error: destination test","    at Runtime.n [as handler] (/var/task/webpack:/handler.ts:6:15)","    at Runtime.handleOnce (/var/runtime/Runtime.js:66:25)"]}
END RequestId: 523089e2-8cc2-4181-8984-00a151e47ec8
REPORT RequestId: 523089e2-8cc2-4181-8984-00a151e47ec8  Duration: 3.04 ms       Billed Duration: 100 ms Memory Size: 1024 MB    Max Memory Used: 69 MB

START RequestId: 523089e2-8cc2-4181-8984-00a151e47ec8 Version: $LATEST
2020-06-24 21:47:35.308 (+09:00)        523089e2-8cc2-4181-8984-00a151e47ec8    INFO    { success: false }
2020-06-24 21:47:35.309 (+09:00)        523089e2-8cc2-4181-8984-00a151e47ec8    ERROR   Invoke Error    {"errorType":"Error","errorMessage":"destination test","stack":["Error: destination test","    at Runtime.n [as handler] (/var/task/webpack:/handler.ts:6:15)","    at Runtime.handleOnce (/var/runtime/Runtime.js:66:25)"]}
END RequestId: 523089e2-8cc2-4181-8984-00a151e47ec8
REPORT RequestId: 523089e2-8cc2-4181-8984-00a151e47ec8  Duration: 4.33 ms       Billed Duration: 100 ms Memory Size: 1024 MB    Max Memory Used: 70 MB

非同期呼び出しの場合、エラーが発生すると最大2回の再試行、つまり合計3回の実行が行われます。(再試行回数は変更可能です)
そのすべてで失敗した場合に、失敗時のDestinationsへの配信が行われます。

ちなみにですが、invoke時に -t Event を抜くとDestinationsへの配信はされません。-t Event は呼び出しタイプの指定オプションです。何も指定しないとRequestResponse、つまりAPI Gatewayの後ろにLambda関数を配置するときのような同期呼び出しになります。Destinationsは「非同期呼び出しの宛先指定」という日本語名がついているとおり非同期呼び出しの場合の機能なので、この場合は動作しないわけです。

※呼び出しタイプについて詳しく知りたい方は以下をどうぞ。(古いエントリなので細かなところは今と異なるかもしれませんが、大筋を理解いただくには良いと思います)

SQSメッセージ確認

$ aws sqs receive-message --queue-url (キューURL)\
> --attribute-names All --message-attribute-names All --max-number-of-messages 10

{
    "Messages": [
        {
            "MessageId": "82917adb-5e5d-4e1d-bb93-e953b1991885",
            "ReceiptHandle": "AQEB5cFnrwSRKb8sNLZhmCvKi33RbKnQ2YoJi/VBAtGo34DODh/vC99ky5J/NmoKmACwOGjWTWaMbcrpc1wWPkeykrNc1F/NEHdafNvliCiSHsmsz8vzo1hDt/JGXQ/urrNZRZfHbZkcAvoaeFMyfNqo9MdFRZqY5wKCQimNnor6QlsKOyEyHElf+WQxbvsMihJsYvJ/JTS1fj1rCboJtM2Y5Gr9tN3YeRnp+J8yr3DRmQ51ZTgBOW3zOSW2BWOFXdpPBkLhMS+1DW5naK8boWF35cPiuzvnXe30YXm81rFaXIxS3n5BRcbswHjrEp+Sdp1wC5fX17udYgreLMZZp8liKgfL8IeOHXptVAkJ0oPV4UY/cz4nBiaZxWtfsXymvGSOTj+7i7KYIgZuPcCP0Y4xCJAvsVSDfojhniNQcQsgOJM=",
            "MD5OfBody": "e09e0ca1338e07891893eb66f2d1b540",
            "Body": "{\"version\":\"1.0\",\"timestamp\":\"2020-06-24T12:47:35.460Z\",\"requestContext\":{\"requestId\":\"523089e2-8cc2-4181-8984-00a151e47ec8\",\"functionArn\":\"arn:aws:lambda:ap-northeast-1:123456789012:function:destinations-sample-dev-hello:$LATEST\",\"condition\":\"RetriesExhausted\",\"approximateInvokeCount\":3},\"requestPayload\":{\"success\":false},\"responseContext\":{\"statusCode\":200,\"executedVersion\":\"$LATEST\",\"functionError\":\"Unhandled\"},\"responsePayload\":{\"errorType\":\"Error\",\"errorMessage\":\"destination test\",\"trace\":[\"Error: destination test\",\"    at Runtime.n [as handler] (/var/task/webpack:/handler.ts:6:15)\",\"    at Runtime.handleOnce (/var/runtime/Runtime.js:66:25)\"]}}",
            "Attributes": {
                "SenderId": "AROAQWCYJEBRJM6ORNHK5:awslambda_289_20200624124735466",
                "ApproximateFirstReceiveTimestamp": "1593003998400",
                "ApproximateReceiveCount": "1",
                "SentTimestamp": "1593002855530"
            }
        }
    ]
}

SQSへ配信されていることを確認できました。

パターン2: 宛先のSQSも併せて作成する

実際にDestinationsを使うとなると、SQSキューの設定もServerless Framework内でやってしまいたいですよね。そのパターンをやってみます。

serverless.yml SQSキュー追加版

service:
    name: destinations-sample

custom:
    webpack:
        webpackConfig: ./webpack.config.js
        includeModules: true

plugins:
    - serverless-webpack

provider:
    name: aws
    runtime: nodejs12.x
    region: ap-northeast-1
    environment:
        AWS_NODEJS_CONNECTION_REUSE_ENABLED: 1

functions:
    hello:
        handler: handler.hello
        destinations:
            onFailure: !GetAtt FailQueue.Arn
            
resources:
  Resources:
    FailQueue:
      Type: "AWS::SQS::Queue"
      Properties:
        QueueName: fail-destination-queue-by-sls
        MessageRetentionPeriod: 1209600

エラーになった

  Type Error ---------------------------------------------
 
  TypeError: functionAddress.startsWith is not a function
      at /Users/kazue.masaki/project/kazue-sandboxes/20200620-lambda-destinations/destinations-sample/node_modules/serverless/lib/plugins/aws/package/compile/functions/index.js:609:28
      at AwsCompileFunctions.ensureTargetExecutionPermission (/Users/kazue.masaki/project/kazue-sandboxes/20200620-lambda-destinations/destinations-sample/node_modules/serverless/lib/plugins/aws/package/compile/functions/index.js:616:7)
      at AwsCompileFunctions.memoized [as ensureTargetExecutionPermission] (/Users/kazue.masaki/project/kazue-sandboxes/20200620-lambda-destinations/destinations-sample/node_modules/lodash/lodash.js:10552:27)
      at AwsCompileFunctions.compileFunctionDestinations (/Users/kazue.masaki/project/kazue-sandboxes/20200620-lambda-destinations/destinations-sample/node_modules/serverless/lib/plugins/aws/package/compile/functions/index.js:591:14)
      at /Users/kazue.masaki/project/kazue-sandboxes/20200620-lambda-destinations/destinations-sample/node_modules/serverless/lib/plugins/aws/package/compile/functions/index.js:553:56
      at tryCatcher (/Users/kazue.masaki/project/kazue-sandboxes/20200620-lambda-destinations/destinations-sample/node_modules/bluebird/js/release/util.js:16:23)
      at Promise._settlePromiseFromHandler (/Users/kazue.masaki/project/kazue-sandboxes/20200620-lambda-destinations/destinations-sample/node_modules/bluebird/js/release/promise.js:547:31)
      at Promise._settlePromise (/Users/kazue.masaki/project/kazue-sandboxes/20200620-lambda-destinations/destinations-sample/node_modules/bluebird/js/release/promise.js:604:18)
      at Promise._settlePromise0 (/Users/kazue.masaki/project/kazue-sandboxes/20200620-lambda-destinations/destinations-sample/node_modules/bluebird/js/release/promise.js:649:10)
      at Promise._settlePromises (/Users/kazue.masaki/project/kazue-sandboxes/20200620-lambda-destinations/destinations-sample/node_modules/bluebird/js/release/promise.js:729:18)
      at Promise._fulfill (/Users/kazue.masaki/project/kazue-sandboxes/20200620-lambda-destinations/destinations-sample/node_modules/bluebird/js/release/promise.js:673:18)
      at Promise._resolveCallback (/Users/kazue.masaki/project/kazue-sandboxes/20200620-lambda-destinations/destinations-sample/node_modules/bluebird/js/release/promise.js:466:57)
      at Promise._settlePromiseFromHandler (/Users/kazue.masaki/project/kazue-sandboxes/20200620-lambda-destinations/destinations-sample/node_modules/bluebird/js/release/promise.js:559:17)
      at Promise._settlePromise (/Users/kazue.masaki/project/kazue-sandboxes/20200620-lambda-destinations/destinations-sample/node_modules/bluebird/js/release/promise.js:604:18)
      at Promise._settlePromise0 (/Users/kazue.masaki/project/kazue-sandboxes/20200620-lambda-destinations/destinations-sample/node_modules/bluebird/js/release/promise.js:649:10)
      at Promise._settlePromises (/Users/kazue.masaki/project/kazue-sandboxes/20200620-lambda-destinations/destinations-sample/node_modules/bluebird/js/release/promise.js:729:18)
      at Promise._fulfill (/Users/kazue.masaki/project/kazue-sandboxes/20200620-lambda-destinations/destinations-sample/node_modules/bluebird/js/release/promise.js:673:18)
      at /Users/kazue.masaki/project/kazue-sandboxes/20200620-lambda-destinations/destinations-sample/node_modules/bluebird/js/release/nodeback.js:42:21
      at ReadStream.<anonymous> (/Users/kazue.masaki/project/kazue-sandboxes/20200620-lambda-destinations/destinations-sample/node_modules/serverless/lib/plugins/aws/package/compile/functions/index.js:463:15)
      at ReadStream.emit (events.js:311:20)
      at ReadStream.EventEmitter.emit (domain.js:482:12)
      at internal/fs/streams.js:241:14
      at /Users/kazue.masaki/project/kazue-sandboxes/20200620-lambda-destinations/destinations-sample/node_modules/graceful-fs/graceful-fs.js:61:14
      at FSReqCallback.oncomplete (fs.js:154:23)
 
     For debugging logs, run again after setting the "SLS_DEBUG=*" environment variable.

こちらのGitHub Issueと同内容のようです。こちらによると、CloudFormationの関数(この例では!GetAttを使っています)は使えないようです。

解決方法1:CFnで書く

service:
    name: destinations-sample

custom:
    webpack:
        webpackConfig: ./webpack.config.js
        includeModules: true

plugins:
    - serverless-webpack

provider:
    name: aws
    runtime: nodejs12.x
    region: ap-northeast-1
    environment:
        AWS_NODEJS_CONNECTION_REUSE_ENABLED: 1
    iamRoleStatements:
        - Effect: Allow
          Action: sqs:SendMessage
          Resource: !GetAtt FailQueue.Arn

functions:
    hello:
        handler: handler.hello
resources:
    Resources:
        FailQueue:
            Type: "AWS::SQS::Queue"
            Properties:
                QueueName: fail-destination-queue-by-sls
                MessageRetentionPeriod: 1209600
        HelloLambdaEvConf:
            Type: AWS::Lambda::EventInvokeConfig
            Properties:
                FunctionName:
                    Ref: HelloLambdaFunction
                DestinationConfig:
                    OnFailure:
                        Destination: !GetAtt FailQueue.Arn
                Qualifier: "$LATEST"

Serverless Frameworkの構文を使わず、resources.Resources以下にCFn(CloudFormation)テンプレート構文をそのまま書けることを利用して書いてみました。

なのですが、以下 2点の理由でいまいちだと感じました。

  • AWS::Lambda::EventInvokeConfigで対象のLambda関数を参照します(36-37行目)。この際 HelloLambdaFunction を指定しているのですがわかりにくいと思いました。HelloLambdaFunctionfunctions項で定義されているhello関数がCFnテンプレートに変換された際の 論理IDなのですが、その結びつきがテンプレート上では伝わりづらいと感じます。(ちなみに、Serverless Framework構文で作成した各リソースのCFn論理ID変換一覧は こちらの表です。)
  • Serverless Frameworkの構文で書いた場合には内部的に作成してくれる(=生成されるCFnテンプレートにServerless Frameworkが自動で挿入してくれる)、Lambda関数のsqs:SendMessage権限を明記する必要があります。(18-21行)

解決方法2:直接参照しないで変数を使う

というわけでこちらの方法をおすすめします。

プラグインインストール

$ npm install -D serverless-pseudo-parameters

serverless.yml 修正

service:
    name: destinations-sample

custom:
    webpack:
        webpackConfig: ./webpack.config.js
        includeModules: true
    fail-queue-name: fail-destination-queue-by-sls

plugins:
    - serverless-webpack
    - serverless-pseudo-parameters

provider:
    name: aws
    runtime: nodejs12.x
    region: ap-northeast-1
    environment:
        AWS_NODEJS_CONNECTION_REUSE_ENABLED: 1

functions:
    hello:
        handler: handler.hello
        destinations:
            onFailure: arn:aws:sqs:#{AWS::Region}:#{AWS::AccountId}:${self:custom.fail-queue-name}

resources:
    Resources:
        FailQueue:
            Type: "AWS::SQS::Queue"
            Properties:
                QueueName: ${self:custom.fail-queue-name}
                MessageRetentionPeriod: 1209600

こちらのほうがスッキリかけて、かつわかりやすいテンプレートになったかと思います。

参考リンク

脚注

  1. 厳密には、処理失敗時にはデッドレターキューを使うことはできました