この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
Lambda Destinations(非同期呼び出しの宛先指定)とは
Lambda関数実行完了をトリガーに他のAWSサービスを実行することができる機能です。現在以下 4サービスを実行することができます。
- Amazon SQS
- Amazon SNS
- Lambda
- EventBridge
Destinationsの設定にはコードを書く必要はなく、Lambdaのマネジメントコンソールから可能です。これまでは関数実行完了時に別のAWSサービスを実行したい場合は、Lambda関数内で(処理の最後の方に)別サービスを呼び出すコードを書く必要がありました *1が、これが不要になり、Lambda関数のコードをより簡潔にすることができます。なんというか、より「マネージド」になった感じです。
また、関数実行に成功した場合と失敗した場合で別々の宛先を指定することができます。もちろん片方だけ指定することもできます。
今回はこの機能をSereverless Frameworkを使って実装していきたいと思います。
パターン1: Destinationsだけ指定(宛先SQSキューは事前に作っておく)
Serverless FrameworkではDestinationsの設定だけ行ない、その宛先となるSQSキューの作成はServerless Frameworkスコープ外とします。
SQSキュー作成
CLIで作成しました。
$ aws sqs create-queue --queue-name fail-destination-queue
$ aws sqs get-queue-attributes \
> --queue-url (前述create-queueコマンド結果のURL) \
> --attribute-names QueueArn
serverless.yml
最後 2行がDestinationsに関する設定箇所です。今回は失敗時の宛先のみ指定しています。
service:
name: destinations-sample
custom:
webpack:
webpackConfig: ./webpack.config.js
includeModules: true
plugins:
- serverless-webpack
provider:
name: aws
runtime: nodejs12.x
region: ap-northeast-1
environment:
AWS_NODEJS_CONNECTION_REUSE_ENABLED: 1
functions:
hello:
handler: handler.hello
destinations:
onFailure: arn:aws:sqs:ap-northeast-1:123456789012:fail-destination-queue
Lambda関数
イベントペイロードに{ "success": false }
を渡すとエラーを発生させるコードです。
handler.ts
import "source-map-support/register";
export const hello = async (event) => {
console.log(event);
if (event.success === false) {
throw new Error("destination test");
}
return true;
};
デプロイ
$ npx sls deploy
テスト実行
$ npx sls invoke -f hello -d '{ "success": false }' -t Event
false
を指定したのでエラーになってDestinationの処理が走るはずです。
CloudWatch Logs出力内容
$ npx sls logs -t -f hello
START RequestId: 523089e2-8cc2-4181-8984-00a151e47ec8 Version: $LATEST
2020-06-24 21:44:45.825 (+09:00) 523089e2-8cc2-4181-8984-00a151e47ec8 INFO { success: false }
2020-06-24 21:44:45.831 (+09:00) 523089e2-8cc2-4181-8984-00a151e47ec8 ERROR Invoke Error {"errorType":"Error","errorMessage":"destination test","stack":["Error: destination test"," at Runtime.n [as handler] (/var/task/webpack:/handler.ts:6:15)"," at Runtime.handleOnce (/var/runtime/Runtime.js:66:25)"]}
END RequestId: 523089e2-8cc2-4181-8984-00a151e47ec8
REPORT RequestId: 523089e2-8cc2-4181-8984-00a151e47ec8 Duration: 8.65 ms Billed Duration: 100 ms Memory Size: 1024 MB Max Memory Used: 68 MB
START RequestId: 523089e2-8cc2-4181-8984-00a151e47ec8 Version: $LATEST
2020-06-24 21:45:46.064 (+09:00) 523089e2-8cc2-4181-8984-00a151e47ec8 INFO { success: false }
2020-06-24 21:45:46.066 (+09:00) 523089e2-8cc2-4181-8984-00a151e47ec8 ERROR Invoke Error {"errorType":"Error","errorMessage":"destination test","stack":["Error: destination test"," at Runtime.n [as handler] (/var/task/webpack:/handler.ts:6:15)"," at Runtime.handleOnce (/var/runtime/Runtime.js:66:25)"]}
END RequestId: 523089e2-8cc2-4181-8984-00a151e47ec8
REPORT RequestId: 523089e2-8cc2-4181-8984-00a151e47ec8 Duration: 3.04 ms Billed Duration: 100 ms Memory Size: 1024 MB Max Memory Used: 69 MB
START RequestId: 523089e2-8cc2-4181-8984-00a151e47ec8 Version: $LATEST
2020-06-24 21:47:35.308 (+09:00) 523089e2-8cc2-4181-8984-00a151e47ec8 INFO { success: false }
2020-06-24 21:47:35.309 (+09:00) 523089e2-8cc2-4181-8984-00a151e47ec8 ERROR Invoke Error {"errorType":"Error","errorMessage":"destination test","stack":["Error: destination test"," at Runtime.n [as handler] (/var/task/webpack:/handler.ts:6:15)"," at Runtime.handleOnce (/var/runtime/Runtime.js:66:25)"]}
END RequestId: 523089e2-8cc2-4181-8984-00a151e47ec8
REPORT RequestId: 523089e2-8cc2-4181-8984-00a151e47ec8 Duration: 4.33 ms Billed Duration: 100 ms Memory Size: 1024 MB Max Memory Used: 70 MB
非同期呼び出しの場合、エラーが発生すると最大2回の再試行、つまり合計3回の実行が行われます。(再試行回数は変更可能です)
そのすべてで失敗した場合に、失敗時のDestinationsへの配信が行われます。
ちなみにですが、invoke時に -t Event
を抜くとDestinationsへの配信はされません。-t Event
は呼び出しタイプの指定オプションです。何も指定しないとRequestResponse、つまりAPI Gatewayの後ろにLambda関数を配置するときのような同期呼び出しになります。Destinationsは「非同期呼び出しの宛先指定」という日本語名がついているとおり非同期呼び出しの場合の機能なので、この場合は動作しないわけです。
※呼び出しタイプについて詳しく知りたい方は以下をどうぞ。(古いエントリなので細かなところは今と異なるかもしれませんが、大筋を理解いただくには良いと思います)
SQSメッセージ確認
$ aws sqs receive-message --queue-url (キューURL)\
> --attribute-names All --message-attribute-names All --max-number-of-messages 10
{
"Messages": [
{
"MessageId": "82917adb-5e5d-4e1d-bb93-e953b1991885",
"ReceiptHandle": "AQEB5cFnrwSRKb8sNLZhmCvKi33RbKnQ2YoJi/VBAtGo34DODh/vC99ky5J/NmoKmACwOGjWTWaMbcrpc1wWPkeykrNc1F/NEHdafNvliCiSHsmsz8vzo1hDt/JGXQ/urrNZRZfHbZkcAvoaeFMyfNqo9MdFRZqY5wKCQimNnor6QlsKOyEyHElf+WQxbvsMihJsYvJ/JTS1fj1rCboJtM2Y5Gr9tN3YeRnp+J8yr3DRmQ51ZTgBOW3zOSW2BWOFXdpPBkLhMS+1DW5naK8boWF35cPiuzvnXe30YXm81rFaXIxS3n5BRcbswHjrEp+Sdp1wC5fX17udYgreLMZZp8liKgfL8IeOHXptVAkJ0oPV4UY/cz4nBiaZxWtfsXymvGSOTj+7i7KYIgZuPcCP0Y4xCJAvsVSDfojhniNQcQsgOJM=",
"MD5OfBody": "e09e0ca1338e07891893eb66f2d1b540",
"Body": "{\"version\":\"1.0\",\"timestamp\":\"2020-06-24T12:47:35.460Z\",\"requestContext\":{\"requestId\":\"523089e2-8cc2-4181-8984-00a151e47ec8\",\"functionArn\":\"arn:aws:lambda:ap-northeast-1:123456789012:function:destinations-sample-dev-hello:$LATEST\",\"condition\":\"RetriesExhausted\",\"approximateInvokeCount\":3},\"requestPayload\":{\"success\":false},\"responseContext\":{\"statusCode\":200,\"executedVersion\":\"$LATEST\",\"functionError\":\"Unhandled\"},\"responsePayload\":{\"errorType\":\"Error\",\"errorMessage\":\"destination test\",\"trace\":[\"Error: destination test\",\" at Runtime.n [as handler] (/var/task/webpack:/handler.ts:6:15)\",\" at Runtime.handleOnce (/var/runtime/Runtime.js:66:25)\"]}}",
"Attributes": {
"SenderId": "AROAQWCYJEBRJM6ORNHK5:awslambda_289_20200624124735466",
"ApproximateFirstReceiveTimestamp": "1593003998400",
"ApproximateReceiveCount": "1",
"SentTimestamp": "1593002855530"
}
}
]
}
SQSへ配信されていることを確認できました。
パターン2: 宛先のSQSも併せて作成する
実際にDestinationsを使うとなると、SQSキューの設定もServerless Framework内でやってしまいたいですよね。そのパターンをやってみます。
serverless.yml
SQSキュー追加版
service:
name: destinations-sample
custom:
webpack:
webpackConfig: ./webpack.config.js
includeModules: true
plugins:
- serverless-webpack
provider:
name: aws
runtime: nodejs12.x
region: ap-northeast-1
environment:
AWS_NODEJS_CONNECTION_REUSE_ENABLED: 1
functions:
hello:
handler: handler.hello
destinations:
onFailure: !GetAtt FailQueue.Arn
resources:
Resources:
FailQueue:
Type: "AWS::SQS::Queue"
Properties:
QueueName: fail-destination-queue-by-sls
MessageRetentionPeriod: 1209600
エラーになった
Type Error ---------------------------------------------
TypeError: functionAddress.startsWith is not a function
at /Users/kazue.masaki/project/kazue-sandboxes/20200620-lambda-destinations/destinations-sample/node_modules/serverless/lib/plugins/aws/package/compile/functions/index.js:609:28
at AwsCompileFunctions.ensureTargetExecutionPermission (/Users/kazue.masaki/project/kazue-sandboxes/20200620-lambda-destinations/destinations-sample/node_modules/serverless/lib/plugins/aws/package/compile/functions/index.js:616:7)
at AwsCompileFunctions.memoized [as ensureTargetExecutionPermission] (/Users/kazue.masaki/project/kazue-sandboxes/20200620-lambda-destinations/destinations-sample/node_modules/lodash/lodash.js:10552:27)
at AwsCompileFunctions.compileFunctionDestinations (/Users/kazue.masaki/project/kazue-sandboxes/20200620-lambda-destinations/destinations-sample/node_modules/serverless/lib/plugins/aws/package/compile/functions/index.js:591:14)
at /Users/kazue.masaki/project/kazue-sandboxes/20200620-lambda-destinations/destinations-sample/node_modules/serverless/lib/plugins/aws/package/compile/functions/index.js:553:56
at tryCatcher (/Users/kazue.masaki/project/kazue-sandboxes/20200620-lambda-destinations/destinations-sample/node_modules/bluebird/js/release/util.js:16:23)
at Promise._settlePromiseFromHandler (/Users/kazue.masaki/project/kazue-sandboxes/20200620-lambda-destinations/destinations-sample/node_modules/bluebird/js/release/promise.js:547:31)
at Promise._settlePromise (/Users/kazue.masaki/project/kazue-sandboxes/20200620-lambda-destinations/destinations-sample/node_modules/bluebird/js/release/promise.js:604:18)
at Promise._settlePromise0 (/Users/kazue.masaki/project/kazue-sandboxes/20200620-lambda-destinations/destinations-sample/node_modules/bluebird/js/release/promise.js:649:10)
at Promise._settlePromises (/Users/kazue.masaki/project/kazue-sandboxes/20200620-lambda-destinations/destinations-sample/node_modules/bluebird/js/release/promise.js:729:18)
at Promise._fulfill (/Users/kazue.masaki/project/kazue-sandboxes/20200620-lambda-destinations/destinations-sample/node_modules/bluebird/js/release/promise.js:673:18)
at Promise._resolveCallback (/Users/kazue.masaki/project/kazue-sandboxes/20200620-lambda-destinations/destinations-sample/node_modules/bluebird/js/release/promise.js:466:57)
at Promise._settlePromiseFromHandler (/Users/kazue.masaki/project/kazue-sandboxes/20200620-lambda-destinations/destinations-sample/node_modules/bluebird/js/release/promise.js:559:17)
at Promise._settlePromise (/Users/kazue.masaki/project/kazue-sandboxes/20200620-lambda-destinations/destinations-sample/node_modules/bluebird/js/release/promise.js:604:18)
at Promise._settlePromise0 (/Users/kazue.masaki/project/kazue-sandboxes/20200620-lambda-destinations/destinations-sample/node_modules/bluebird/js/release/promise.js:649:10)
at Promise._settlePromises (/Users/kazue.masaki/project/kazue-sandboxes/20200620-lambda-destinations/destinations-sample/node_modules/bluebird/js/release/promise.js:729:18)
at Promise._fulfill (/Users/kazue.masaki/project/kazue-sandboxes/20200620-lambda-destinations/destinations-sample/node_modules/bluebird/js/release/promise.js:673:18)
at /Users/kazue.masaki/project/kazue-sandboxes/20200620-lambda-destinations/destinations-sample/node_modules/bluebird/js/release/nodeback.js:42:21
at ReadStream.<anonymous> (/Users/kazue.masaki/project/kazue-sandboxes/20200620-lambda-destinations/destinations-sample/node_modules/serverless/lib/plugins/aws/package/compile/functions/index.js:463:15)
at ReadStream.emit (events.js:311:20)
at ReadStream.EventEmitter.emit (domain.js:482:12)
at internal/fs/streams.js:241:14
at /Users/kazue.masaki/project/kazue-sandboxes/20200620-lambda-destinations/destinations-sample/node_modules/graceful-fs/graceful-fs.js:61:14
at FSReqCallback.oncomplete (fs.js:154:23)
For debugging logs, run again after setting the "SLS_DEBUG=*" environment variable.
こちらのGitHub Issueと同内容のようです。こちらによると、CloudFormationの関数(この例では!GetAtt
を使っています)は使えないようです。
解決方法1:CFnで書く
service:
name: destinations-sample
custom:
webpack:
webpackConfig: ./webpack.config.js
includeModules: true
plugins:
- serverless-webpack
provider:
name: aws
runtime: nodejs12.x
region: ap-northeast-1
environment:
AWS_NODEJS_CONNECTION_REUSE_ENABLED: 1
iamRoleStatements:
- Effect: Allow
Action: sqs:SendMessage
Resource: !GetAtt FailQueue.Arn
functions:
hello:
handler: handler.hello
resources:
Resources:
FailQueue:
Type: "AWS::SQS::Queue"
Properties:
QueueName: fail-destination-queue-by-sls
MessageRetentionPeriod: 1209600
HelloLambdaEvConf:
Type: AWS::Lambda::EventInvokeConfig
Properties:
FunctionName:
Ref: HelloLambdaFunction
DestinationConfig:
OnFailure:
Destination: !GetAtt FailQueue.Arn
Qualifier: "$LATEST"
Serverless Frameworkの構文を使わず、resources.Resources
以下にCFn(CloudFormation)テンプレート構文をそのまま書けることを利用して書いてみました。
なのですが、以下 2点の理由でいまいちだと感じました。
AWS::Lambda::EventInvokeConfig
で対象のLambda関数を参照します(36-37行目)。この際HelloLambdaFunction
を指定しているのですがわかりにくいと思いました。HelloLambdaFunction
はfunctions
項で定義されているhello関数がCFnテンプレートに変換された際の 論理IDなのですが、その結びつきがテンプレート上では伝わりづらいと感じます。(ちなみに、Serverless Framework構文で作成した各リソースのCFn論理ID変換一覧は こちらの表です。)- Serverless Frameworkの構文で書いた場合には内部的に作成してくれる(=生成されるCFnテンプレートにServerless Frameworkが自動で挿入してくれる)、Lambda関数の
sqs:SendMessage
権限を明記する必要があります。(18-21行)
解決方法2:直接参照しないで変数を使う
というわけでこちらの方法をおすすめします。
プラグインインストール
$ npm install -D serverless-pseudo-parameters
serverless.yml
修正
service:
name: destinations-sample
custom:
webpack:
webpackConfig: ./webpack.config.js
includeModules: true
fail-queue-name: fail-destination-queue-by-sls
plugins:
- serverless-webpack
- serverless-pseudo-parameters
provider:
name: aws
runtime: nodejs12.x
region: ap-northeast-1
environment:
AWS_NODEJS_CONNECTION_REUSE_ENABLED: 1
functions:
hello:
handler: handler.hello
destinations:
onFailure: arn:aws:sqs:#{AWS::Region}:#{AWS::AccountId}:${self:custom.fail-queue-name}
resources:
Resources:
FailQueue:
Type: "AWS::SQS::Queue"
Properties:
QueueName: ${self:custom.fail-queue-name}
MessageRetentionPeriod: 1209600
こちらのほうがスッキリかけて、かつわかりやすいテンプレートになったかと思います。
参考リンク
- AWS Lambda Destination Support | Serverless blog
- Using SQS with AWS Lambda and Serverless | Serverless blog
- serverless-pseudo-parameters
- 非同期呼び出しの送信先の設定 | AWS Lambda 開発者ガイド
- AWS Lambda が非同期呼び出しの宛先指定をサポート
脚注
- 厳密には、処理失敗時にはデッドレターキューを使うことはできました ↩