この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
こんにちは、CX事業本部の夏目です。
案件でFirehoseのLambdaによるデータ変換を使ったのですが、FirehoseのドキュメントがLambdaのBluePrint(設計図)を使ってねぐらいの書き方で、どんなeventが来てどんなresponseを返さないといけないのか詳しく書いてなかったので、備忘録として残しておきます。
Lambdaの設定について
- LambdaにつけるIAM Roleでは特に必要と言えるものはない
- タイムアウトは5分までしか対応していない (これはドキュメントに記載されている)
Lambdaに渡されるEventについて
{
"invocationId": "b384e3cf-8ae0-47ac-8e2a-8e4a9e59941e",
"sourceKinesisStreamArn": "arn:aws:kinesis:ap-northeast-1:000000000000:stream/firehose_ln",
"deliveryStreamArn": "arn:aws:firehose:ap-northeast-1:000000000000:deliverystream/firehose_ln",
"region": "ap-northeast-1",
"records": [
{
"recordId": "49607016322531838770965472870713485392692964656771235842000000",
"approximateArrivalTimestamp": 1589518484529,
"data": "eyJhIjogImM3NjNlM2I5LWI1NDQtNDg4NS04YWM1LWQ3ZDg4MjJjNGNmYyIsICJiIjogImQ1NjNiYWU0LTYwZTItNDhiOC1hZTQ0LTdkYjU2ZTZlNTZkNSIsICJjIjogIjljZjNkYmE3LTY2NTEtNGE2MC1iOGJhLThkNmFhMGYwMjNkMiJ9",
"kinesisRecordMetadata": {
"sequenceNumber": "49607016322531838770965472870713485392692964656771235842",
"subsequenceNumber": 0,
"partitionKey": "d39ae9bc-27ba-4208-92c8-98c607b84c46",
"shardId": "shardId-000000000000",
"approximateArrivalTimestamp": 1589518484529
}
},
{
"recordId": "49607016322531838770965472870714694318512579354665418754000000",
"approximateArrivalTimestamp": 1589518485244,
"data": "eyJhIjogIjUxNDJhMGUwLTdiMWYtNGMxNS1iZjBiLThhMjU2ZmQzZjkyYyIsICJiIjogImNkZTVmNWRmLWUzMGMtNGE2Yi1iNjJjLWZiNmY3Zjk1NTYzNSIsICJjIjogIjljN2YyMWRlLWU2MWItNGJhZi05YjQ0LTg2MmE0YmVhZTVlMiJ9",
"kinesisRecordMetadata": {
"sequenceNumber": "49607016322531838770965472870714694318512579354665418754",
"subsequenceNumber": 0,
"partitionKey": "75c67f74-41be-4be7-8d64-868712be1773",
"shardId": "shardId-000000000000",
"approximateArrivalTimestamp": 1589518485244
}
}
]
}
- 基本的に使用するのは、
records
の中身- 特に
recordId
はresponseで使用する
- 特に
- recordsの各要素の
data
は各データをBase64エンコードしたもの
Lambdaが返す値について
{
"records": [
{
"recordId": "49607016322531838770965472870713485392692964656771235842000000",
"result": "Ok",
"data": "eyJhIjogImM3NjNlM2I5LWI1NDQtNDg4NS04YWM1LWQ3ZDg4MjJjNGNmYyIsICJiIjogImQ1NjNiYWU0LTYwZTItNDhiOC1hZTQ0LTdkYjU2ZTZlNTZkNSIsICJjIjogIjljZjNkYmE3LTY2NTEtNGE2MC1iOGJhLThkNmFhMGYwMjNkMiJ9"
},
{
"recordId": "49607016322531838770965472870714694318512579354665418754000000",
"result": "Ok",
"data": "eyJhIjogIjUxNDJhMGUwLTdiMWYtNGMxNS1iZjBiLThhMjU2ZmQzZjkyYyIsICJiIjogImNkZTVmNWRmLWUzMGMtNGE2Yi1iNjJjLWZiNmY3Zjk1NTYzNSIsICJjIjogIjljN2YyMWRlLWU2MWItNGJhZi05YjQ0LTg2MmE0YmVhZTVlMiJ9"
}
]
}
- Lambdaのresponseは上記形式になっている必要がある
- 自分は
records
の配列だけを返していて、形式が異なると怒られた
- 自分は
records
の各要素については、ドキュメントに詳しく記載されているdata
はBase64でエンコードしたもの
Firehoseの設定について (CloudFormation)
# Lambdaによるデータ変換において必要そうな要素のみ抜粋
DeliveryStream:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
ExtendedS3DestinationConfiguration:
ProcessingConfiguration:
Enabled: true
Processors:
- Type: Lambda
Parameters:
- ParameterName: LambdaArn
ParameterValue: !Ref ConvertLambda.Alias
- ParameterName: NumberOfRetries
ParameterValue: "3"
- ParameterName: RoleArn
ParameterValue: !GetAtt InvokeLambdaRole.Arn
- ParameterName: BufferSizeInMBs
ParameterValue: "1"
- ParameterName: BufferIntervalInSeconds
ParameterValue: "60"
S3BackupMode: Enabled
S3BackupConfiguration:
RoleARN: !GetAtt DeliveryRole.Arn
BucketARN: arn:aws:s3:::delivery-bucket
Prefix: Backup/
ErrorOutputPrefix: "!{firehose:error-output-type}/"
BufferingHints:
SizeInMBs: 5
IntervalInSeconds: 300
CompressionFormat: UNCOMPRESSED
- Lambdaでデータ変換を行う場合、各
DestinationConfiguration
の中のProcessingConfiguration
で設定を記述する- 上記例では
ExtendedS3DestinationConfiguration
を使用している
- 上記例では
- 変換で呼び出すLambdaなどは、
Processors[].Parameters
のパラメータとして指定する RoleArn
では変換に使用するLambdaのInvoke権限が必要- 変更に失敗したときのために、
S3Backup
の設定をいれておくと良さそう