![[備忘録] Kinesis FirehoseのLambdaによるデータ変換について](https://devio2023-media.developers.io/wp-content/uploads/2019/04/amazon-kinesis-firehose.png)
[備忘録] Kinesis FirehoseのLambdaによるデータ変換について
2020.05.25
この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
こんにちは、CX事業本部の夏目です。
案件でFirehoseのLambdaによるデータ変換を使ったのですが、FirehoseのドキュメントがLambdaのBluePrint(設計図)を使ってねぐらいの書き方で、どんなeventが来てどんなresponseを返さないといけないのか詳しく書いてなかったので、備忘録として残しておきます。
Lambdaの設定について
- LambdaにつけるIAM Roleでは特に必要と言えるものはない
 - タイムアウトは5分までしか対応していない (これはドキュメントに記載されている)
 
Lambdaに渡されるEventについて
{
  "invocationId": "b384e3cf-8ae0-47ac-8e2a-8e4a9e59941e",
  "sourceKinesisStreamArn": "arn:aws:kinesis:ap-northeast-1:000000000000:stream/firehose_ln",
  "deliveryStreamArn": "arn:aws:firehose:ap-northeast-1:000000000000:deliverystream/firehose_ln",
  "region": "ap-northeast-1",
  "records": [
    {
      "recordId": "49607016322531838770965472870713485392692964656771235842000000",
      "approximateArrivalTimestamp": 1589518484529,
      "data": "eyJhIjogImM3NjNlM2I5LWI1NDQtNDg4NS04YWM1LWQ3ZDg4MjJjNGNmYyIsICJiIjogImQ1NjNiYWU0LTYwZTItNDhiOC1hZTQ0LTdkYjU2ZTZlNTZkNSIsICJjIjogIjljZjNkYmE3LTY2NTEtNGE2MC1iOGJhLThkNmFhMGYwMjNkMiJ9",
      "kinesisRecordMetadata": {
        "sequenceNumber": "49607016322531838770965472870713485392692964656771235842",
        "subsequenceNumber": 0,
        "partitionKey": "d39ae9bc-27ba-4208-92c8-98c607b84c46",
        "shardId": "shardId-000000000000",
        "approximateArrivalTimestamp": 1589518484529
      }
    },
    {
      "recordId": "49607016322531838770965472870714694318512579354665418754000000",
      "approximateArrivalTimestamp": 1589518485244,
      "data": "eyJhIjogIjUxNDJhMGUwLTdiMWYtNGMxNS1iZjBiLThhMjU2ZmQzZjkyYyIsICJiIjogImNkZTVmNWRmLWUzMGMtNGE2Yi1iNjJjLWZiNmY3Zjk1NTYzNSIsICJjIjogIjljN2YyMWRlLWU2MWItNGJhZi05YjQ0LTg2MmE0YmVhZTVlMiJ9",
      "kinesisRecordMetadata": {
        "sequenceNumber": "49607016322531838770965472870714694318512579354665418754",
        "subsequenceNumber": 0,
        "partitionKey": "75c67f74-41be-4be7-8d64-868712be1773",
        "shardId": "shardId-000000000000",
        "approximateArrivalTimestamp": 1589518485244
      }
    }
  ]
}
- 基本的に使用するのは、
recordsの中身- 特に
recordIdはresponseで使用する 
 - 特に
 - recordsの各要素の
dataは各データをBase64エンコードしたもの 
Lambdaが返す値について
{
  "records": [
    {
      "recordId": "49607016322531838770965472870713485392692964656771235842000000",
      "result": "Ok",
      "data": "eyJhIjogImM3NjNlM2I5LWI1NDQtNDg4NS04YWM1LWQ3ZDg4MjJjNGNmYyIsICJiIjogImQ1NjNiYWU0LTYwZTItNDhiOC1hZTQ0LTdkYjU2ZTZlNTZkNSIsICJjIjogIjljZjNkYmE3LTY2NTEtNGE2MC1iOGJhLThkNmFhMGYwMjNkMiJ9"
    },
    {
      "recordId": "49607016322531838770965472870714694318512579354665418754000000",
      "result": "Ok",
      "data": "eyJhIjogIjUxNDJhMGUwLTdiMWYtNGMxNS1iZjBiLThhMjU2ZmQzZjkyYyIsICJiIjogImNkZTVmNWRmLWUzMGMtNGE2Yi1iNjJjLWZiNmY3Zjk1NTYzNSIsICJjIjogIjljN2YyMWRlLWU2MWItNGJhZi05YjQ0LTg2MmE0YmVhZTVlMiJ9"
    }
  ]
}
- Lambdaのresponseは上記形式になっている必要がある
- 自分は
recordsの配列だけを返していて、形式が異なると怒られた 
 - 自分は
 recordsの各要素については、ドキュメントに詳しく記載されているdataはBase64でエンコードしたもの
Firehoseの設定について (CloudFormation)
  # Lambdaによるデータ変換において必要そうな要素のみ抜粋
  DeliveryStream:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      ExtendedS3DestinationConfiguration:
        ProcessingConfiguration:
          Enabled: true
          Processors:
            - Type: Lambda
              Parameters:
                - ParameterName: LambdaArn
                  ParameterValue: !Ref ConvertLambda.Alias
                - ParameterName: NumberOfRetries
                  ParameterValue: "3"
                - ParameterName: RoleArn
                  ParameterValue: !GetAtt InvokeLambdaRole.Arn
                - ParameterName: BufferSizeInMBs
                  ParameterValue: "1"
                - ParameterName: BufferIntervalInSeconds
                  ParameterValue: "60"
        S3BackupMode: Enabled
        S3BackupConfiguration:
          RoleARN: !GetAtt DeliveryRole.Arn
          BucketARN: arn:aws:s3:::delivery-bucket
          Prefix: Backup/
          ErrorOutputPrefix: "!{firehose:error-output-type}/"
          BufferingHints:
            SizeInMBs: 5
            IntervalInSeconds: 300
          CompressionFormat: UNCOMPRESSED
- Lambdaでデータ変換を行う場合、各
DestinationConfigurationの中のProcessingConfigurationで設定を記述する- 上記例では
ExtendedS3DestinationConfigurationを使用している 
 - 上記例では
 - 変換で呼び出すLambdaなどは、
Processors[].Parametersのパラメータとして指定する RoleArnでは変換に使用するLambdaのInvoke権限が必要- 変更に失敗したときのために、
S3Backupの設定をいれておくと良さそう 






