この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
先日こんなアップデートがありました。
AWS Lambda Supports Failure-Handling Features for Kinesis and DynamoDB Event Sources
KinesisやDynamoDBのストリーム型のイベントソースで起動するLambdaが失敗した際の挙動をカスタマイズできるようになったみたいです。
アナウンスはAWS re:Invent 2019より前でしたが、重要なアップデートなので試してみたいと思います。
なにが変わったのか?
LambdaのトリガーにKinesisを選択すると、選択項目が追加されていました。
- 「障害時の送信先」
- レコードの有効期限切れもしくは再試行で失敗した際に、SNSかSQSにレコードを送信できるようになったようです。非同期LambdaのDLQと同じような機能ですね。
- 「再試行」
- 失敗した際のリトライ回数を設定できます。
- 「レコードの最長有効期間」
- Lambdaがレコードを保持できる有効期限ですね。
- 「エラー時にバッチを分割」
- たとえば1回のバッチで取得するレコード数が100件の場合、失敗したら50件づつの2つのバッチに分割してくれるようです。
- 余談
- 今回の話とはそれますが、「バッチウィンドウ」「シャードあたりの同時バッチ」という項目も増えており、これらをうまく利用するとトラフィックに応じたLambdaのスケーリング管理が自前できるそうです。
- 参考サイト
今まで
レコードの処理に失敗した場合、Lambdaは該当のレコードの有効期限(Kinesis StreamsやDynamoDB側)が切れるまでリトライを継続する仕様でした。
キューが詰まるのを避けるため、例外を捕捉してもエラーログ出力だけして、それ以外は何もしないという実装をおこなう必要がありました。
この場合、CloudWatch MetricsでLambdaのエラーは検知できず、CloudWatch Logsをフィルタリングする必要がありました。
from logging import getLogger
logger = getLogger(__name__)
def lambda_handler(event, context):
try:
for record in event['Records']:
...各レコードの処理実装
except Exception as e:
# リトライを避けるためログ出力のみ行う
logger.error(e)
これから
「再試行」や「レコードの最長有効期間」が設定できるようになったので、想定外のリトライ実行によるキュー詰まりなどは気にする必要がなくなりました。
また「エラー時にバッチを分割」を有効にしてあげることで、リトライ後に成功する可能性はあがります。 たとえば10件中の1レコードだけに不正なデータがある場合など、分割することでリトライ後は片方のバッチは成功します。
今後はLambdaハンドラーにエラーを返してあげて、CloudWatch MetricsでLambdaのエラーを捕捉できます。
from logging import getLogger
logger = getLogger(__name__)
def lambda_handler(event, context):
try:
for record in event['Records']:
...各レコードの処理実装
except Exception as e:
logger.error(e)
raise e
せっかくなので試してみた
- 設定
バッチサイズ | 障害時の送信先 | 再試行 | レコードの最長有効期間 | エラー時にバッチを分割 |
---|---|---|---|---|
4 | SNSを指定 | 2 | 60s | 有効化 |
- 送信データ
4件のレコードを送信します。内1件は不正なデータを送信します。
{
"Records": [
{
"Data": "sample_data",
"PartitionKey": "0123456789"
},
{
"Data": "sample_data",
"PartitionKey": "0123456789"
},
{
"Data": "sample_data",
"PartitionKey": "0123456789"
},
{
"Data": "invalid_data",
"PartitionKey": "0123456789"
}
],
"StreamName": "arai-test-kinesis-streams"
}
- Lambdaのソースコード
データがinvalid_data
という文字列を条件に例外を発生させます。
import base64
def lambda_handler(event, context):
for record in event['Records']:
b64_data = record['kinesis']['data']
data = base64.b64decode(b64_data).decode(encoding='utf-8')
if data == 'invalid_data':
raise Exception('Failed!')
結果
- CloudWatch Metrics(合計)
4件のレコードが送信されて、エラーの場合はレコード分割と最大2回リトライがおこなわれているので期待通りの結果ですね。詳細は下記参考にしてください。
# 1回目 (失敗)
["sample_data", "sample_data", "sample_data", "invalid_data"]
# 2回目(成功)
["sample_data", "sample_data"]
# 3回目(失敗)
["sample_data", "invalid_data"]
# 4回目(成功)
["sample_data"]
# 5回目(失敗)
["invalid_data"]
# 6回目(失敗)
["invalid_data"]
# 7回目(失敗)
["invalid_data"]
まとめ
いかがだったでしょうか。
CloudWatch MetricsでLambdaのエラーを捕捉できる様になったのはとても嬉しいです。
お疲れ様でした!