Amazon EventBridge と Lambda Destinations を使って Kinesis Data Firehose にデータを流す

2020.04.10

概要

CX事業本部@sapporo の佐藤です。

Amazon EventBridge を使うことで、Lambda や AWS サービス、SaaS パートナーのイベントから他の AWS サービスにノーコーディングでアクセスすることが可能になります。今回は、昨年アップデートされた Lambda の新しい機能の Lambda Destinations という機能を使って、Lambda の実行をトリガーにLambda の実行が成功した場合は成功のログ、失敗した場合は例外のログを Kinesis Data Firehose に流して、S3 に保存してみたいと思います。

AWS アーキテクチャ

以下のようなアーキテクチャを作っていきます。

環境

  • Node.js v12.16.0
  • aws-cli/1.16.137

事前準備

EventBridge を触る前に、上記のアーキテクチャを作るための事前準備を行います。

S3バケットを作成する

まずは、ログの格納先であるS3バケットを作成します。バケット名は適当に cm-sato-eventbridge-kinesis-firehose-bucket とします。

aws s3 mb --region ap-northeast-1 s3://cm-sato-eventbridge-kinesisfirehose-test-bucket

Kinesis Data Firehose を作成する

次に、Lambda からログデータを流し込むための Kinesis Data Firehose を作成します。Kinesis Data Firehose を使うことで、ログの書き込みをバッファして S3 への書き込みリクエストを抑えたり、S3 オブジェクトに対して Append してくれたり、Firehose 側で日付ごとにフォルダを切ってくれたりするので、ログの分析をするときなどに役立ちます。名前は log-stream とします。

まずは、今回は firehose の S3 にデータを流す配信ストリームを作成するので、 --generate-cli-skeleton オプションを使って、必要そうなパラメータを確認します。宛先を S3 にするため、ExtendedS3DestinationConfiguration をプロパティを使います(S3DestinationConfiguration は非推奨みたいです)。その他のプロパティは削除します。IAMロールが必要なため、事前に Administrator 権限でロールを作成して ARN を控えておきます。必要ないプロパティを消すと以下の JSON 文字列ができるので、適当な名前で保存します。

{
    "DeliveryStreamName": "log-stream",
    "DeliveryStreamType": "DirectPut",
    "ExtendedS3DestinationConfiguration": {
        "RoleARN": "IAMロールのARN",
        "BucketARN": "先程作ったS3バケットのURL",
        "Prefix": "firehose",
        "BufferingHints": {
            "SizeInMBs": 1,
            "IntervalInSeconds": 60
        },
        "CompressionFormat": "GZIP",
        "EncryptionConfiguration": {
            "NoEncryptionConfig": "NoEncryption"
        }
    }
}

保存できたら、以下のコマンドで firehose の配信ストリームを作成します。

aws firehose create-delivery-stream --cli-input-json file://保存したJSONファイル名

Amazon EventBridge のイベントルールを作成する

事前準備が整いましたので、Amazon EventBridge イベントルールを作成します。今回は、Lambda Destination からのイベントを Kinesis Data Firehose にルーティングするルールを作成します。

まずは、Amazon EventBridge コンソールにアクセスし、ルールの作成をクリックします。ルールを作成の画面が出ると思うので、以下のように Lambda の実行成功ルールと例外ルールを作成します。

onSuccess ルール

  • 名前
    • lambda-success-rule
  • パターンを定義
    • イベントパターンを選択
    • イベント一致パターン
      • カスタムパターン
    • イベントパターン
      • これは、Lambda から EventBridge に送られてくるイベント情報のパターンを JSON 形式で入力します。マッチしたイベントだけを Firehose にルーティングするということになります。responsePayload は Lambda 関数の return で返却される JSON 情報となります。
{
  "detail": {
    "responsePayload": {
      "action": [
        "message"
      ],
      "source": [
        "kinesis-firehose"
      ]
    }
  }
}
  • ターゲットを選択
    • ターゲット
      • Firehose 配信ストリーム
    • ストリーム
      • 先程作った配信ストリームを選択する

onFailure ルール

  • 名前
    • lambda-failure-rule
  • パターンを定義
    • イベントパターンを選択
    • イベント一致パターン
      • カスタムパターン
    • イベントパターン
      • Lambda の実行失敗時は、例外が throw されるため、以下のようなイベントパターンを定義します。
{
  "detail": {
    "responsePayload": {
      "errorType": [
        "Error"
      ]
    }
  }
}
  • ターゲットを選択
    • ターゲット
      • Firehose 配信ストリーム
    • ストリーム
      • 先程作った配信ストリームを選択する

入力できたら作成をクリックして、イベントルールを作成します。これで、EventBridge 側の準備は OK で、Lambdaから該当するイベント情報が送られてきたら、EventBridge が自動的に Kinesis Firehose へイベント情報を流します。

Lambda 関数の作成と設定

次に、EventBridge にイベントを送信するための Lambda 関数を作成します。今までだと Lambda から EventBridge にイベントを送信するためには、aws-sdk を使って Lambda のコード側で実装する必要があったんですが、最近アップデートされた Lambda Destination を使うことで、ノンコーディングで EventBridge にイベントを送信することが可能となりました。今回は、aws-sdk は使わず、Lambda Destinations のみで実装してみたいと思います。Lambda Destinations で関数の実行が成功(onSuccess)、失敗(onFailure)により処理を分けることを試したいので、明示的に例外を throw するようにしています。今回は Lambda のペイロードの値によって例外か正常終了かを分けています。

Lambda サンプルコード

exports.handler = async (event) => {
    console.log(JSON.stringify(event));
    console.log(event.condition);
    if (!event.condition) throw new Error('condtion is bad')

    return {
        source: 'kinesis-firehose',
        action: 'message',
        message: 'condition is good',
    };
};

Lambda の return で返される JSON 値は、EventBridge のイベントルールのイベントパターンと同じでなければ EventBridge 側で破棄されるようになっていますので、ここは設定したパターンと同じに設定します。

Lambda Destinations を設定する

Lambda Destinations を設定していきます。先程作成した関数のコンソールの送信先を追加を選択します。以下のように入力して保存します。

  • ソース: 非同期呼び出し
  • Condition: 正常
  • 送信先タイプ: EventBridge イベントバス
  • 送信先: default

次に関数が失敗した場合の送信先を以下のように入力して保存します。

  • ソース: 非同期呼び出し
  • Condition: On failure
  • 送信先タイプ: EventBridge イベントバス
  • 送信先: default

これで、Lambda Destinations の設定はOKです。

動作確認

正常実行

aws-cli を使って Lambda を実行してテストしてみます。まずは、正常終了です。

aws lambda invoke --function-name eventbridge-publish-event --invocation-type Event --payload '{ "condition": true }' output.json

Lambda を正常終了させてみたところ、EventBridge を経由して Firehose にデータが流れて S3 バケットに保存されていました。以下のような形式のログファイルが確認できました。

{
    "version": "0",
    "id": "20110a65-1fe7-b39c-6858-2a7b4e49dac7",
    "detail-type": "Lambda Function Invocation Result - Success",
    "source": "lambda",
    "account": "XXXXXXXXXXXXX",
    "time": "2020-04-09T08:27:25Z",
    "region": "ap-northeast-1",
    "resources": [
        "arn:aws:events:ap-northeast-1:XXXXXXXXXXXX:event-bus/default",
        "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:eventbridge-publish-event:$LATEST"
    ],
    "detail": {
        "version": "1.0",
        "timestamp": "2020-04-09T08:27:25.590Z",
        "requestContext": {
            "requestId": "bc14b207-8335-4e3a-910a-e5e57e9ff5b3",
            "functionArn": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:eventbridge-publish-event:$LATEST",
            "condition": "Success",
            "approximateInvokeCount": 1
        },
        "requestPayload": {
            "condition": "true"
        },
        "responseContext": {
            "statusCode": 200,
            "executedVersion": "$LATEST"
        },
        "responsePayload": {
            "source": "kinesis-firehose",
            "action": "message",
            "message": "condition is good"
        }
    }
}

異常実行

今度は、関数側から例外を投げて、失敗させてみます。

aws lambda invoke --function-name eventbridge-publish-event --invocation-type Event --payload '{ "condition": false }' output.json

同じように S3 バケットを書くにすると、例外情報が記録されたログファイルが S3 バケットに保存されていました。

{
    "version": "0",
    "id": "58cb45ca-1d4b-7fe4-eedd-14de661896d2",
    "detail-type": "Lambda Function Invocation Result - Failure",
    "source": "lambda",
    "account": "XXXXXXXXXXXXX",
    "time": "2020-04-09T08:42:21Z",
    "region": "ap-northeast-1",
    "resources": [
        "arn:aws:events:ap-northeast-1:XXXXXXXXXXXX:event-bus/default",
        "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:eventbridge-publish-event:$LATEST"
    ],
    "detail": {
        "version": "1.0",
        "timestamp": "2020-04-09T08:42:21.602Z",
        "requestContext": {
            "requestId": "d7621aae-75e6-46fd-99a9-7b0a02430dea",
            "functionArn": "arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:eventbridge-publish-event:$LATEST",
            "condition": "RetriesExhausted",
            "approximateInvokeCount": 3
        },
        "requestPayload": {
            "condition": false
        },
        "responseContext": {
            "statusCode": 200,
            "executedVersion": "$LATEST",
            "functionError": "Unhandled"
        },
        "responsePayload": {
            "errorType": "Error",
            "errorMessage": "condtion is bad",
            "trace": [
                "Error: condtion is bad",
                "    at Runtime.exports.handler (/var/task/index.js:4:33)",
                "    at Runtime.handleOnce (/var/runtime/Runtime.js:66:25)"
            ]
        }
    }
}

まとめ

Lambda Destinations と EventBridge を連携させてみました。以前までは、このような処理を実装しようと思うと Lambda 側で aws-sdk を使って実装する必要があったんですが、Lambda Destinations と EventBridge が出たおかげで、サービス間の連携をある程度 AWS 側にオフロードすることができるようになりました。とても便利な機能なので、試してみてください。