[アップデート]Kinesis FirehoseでHTTPエンドポイントへのストリームデータ配信がサポートされました

2020.07.31

CX事業本部@大阪の岩田です。 7/29付けのアップデートでKinesis Firehoseのデータ送信先にHTTPエンドポイントが利用可能になりました。

さっそく試してみたので、簡単に内容を紹介させて頂きます。

これまでとこれから

これまでKinesis Firehoseの送信先としては

  • S3
  • Redshift
  • Amazon ES
  • Splunk

が選択可能でした。今回HTTPエンドポイントが選択可能になったことで、自前のWebアプリケーションやサードパーティサービスとの統合がより簡単になりました。ちなみに今回のHTTPエンドポイントのサポートに合わせて

  • Datadog
  • New Relic
  • MongoDB Cloud

といったサードパーティーサービスも送信先としてサポートが追加されています。

やってみる

では、ここからは実際にHTTPエンドポイントを試してみようと思います。

HTTPエンドポイントの用意

まずはデータ送信先のエンドポイントの準備です。今回はAPI GW & Lambdaを使ってHTTPエンドポイントを用意します。

以下のような簡易なLambda Functionを用意しました。Lambdaのイベントデータをそのままログに出力しつつ、JSONに変換して返却するだけの内容です。

import json

def handler(event, context):
    
    print (event)
    return {
        'statusCode': 200,
        'body': json.dumps(event)
    }

このLambdaをAPI GWの統合リクエストに設定し、APIをデプロイしておきます。

配信ストリームの作成

ここからKinesis Firehoseの配信ストリームを作成し、先程作成したAPI GWにデータを配信してみます。

まずは配信ストリームの作成

送信先の設定以外は全てデフォルトのまま進めていきます。

送信先としてHTTPエンドポイントを選択します。

HTTPエンドポイントの詳細を設定します。

HTTP endpoint URLにはAPI GWのURLを指定します。その他のパラメータとしてオプションで

  • Access Key
  • Parameters

が指定可能なのですが、エンドポイントにどういう形で連携されるのかを確認しておきたいので適当な値を設定しています。

その他の設定は全てデフォルトのまま作成を完了させます。

配信ストリームが作成できたら「Sending demo data」からテストデータを送信します。

しばらく待つとHTTPエンドポイント(API GW)に対してデータが配信されるので少し休憩します。

HTTPエンドポイントに対するリクエストのログを確認

Cloud WatchのメトリクスHTTP endpoint delivery successが増加していることが確認できたら、Lambdaのログを確認してみましょう。ログに出力されたイベントデータは以下のような構造でした。このログからKinesis Firehoseがどういった形式でHTTPエンドポイントにリクエストを発行しているかが分かりますね。

"resource": "/",
"path": "/",
"httpMethod": "POST",
"headers": {
  "Content-Type": "application/json",
  "Host": "xxxxxxxxxx.execute-api.ap-northeast-1.amazonaws.com",
  "User-Agent": "Amazon Kinesis Data Firehose Agent/1.0",
  "X-Amz-Firehose-Access-Key": "012345789",
  "X-Amz-Firehose-Common-Attributes": "{\\"commonAttributes\\":{\\"key1\\":\\"val1\\"}}",
  "X-Amz-Firehose-Protocol-Version": "1.0",
  "X-Amz-Firehose-Request-Id": "3321e6a7-8914-4cd7-a789-9cf94a9885dc",
  "X-Amz-Firehose-Source-Arn": "arn:aws:firehose:ap-northeast-1:123456789012:deliverystream/http_test",
  "X-Amzn-Trace-Id": "Root=1-5f23634e-0ca54a22580994eee90e9c00",
  "X-Forwarded-For": "13.231.243.5",
  "X-Forwarded-Port": "443",
  "X-Forwarded-Proto": "https"
},
"multiValueHeaders": {
  "Content-Type": [
    "application/json"
  ],
  "Host": [
    "xxxxxxxxxx.execute-api.ap-northeast-1.amazonaws.com"
  ],
  "User-Agent": [
    "Amazon Kinesis Data Firehose Agent/1.0"
  ],
  "X-Amz-Firehose-Access-Key": [
    "012345789"
  ],
  "X-Amz-Firehose-Common-Attributes": [
    "{\\"commonAttributes\\":{\\"key1\\":\\"val1\\"}}"
  ],
  "X-Amz-Firehose-Protocol-Version": [
    "1.0"
  ],
  "X-Amz-Firehose-Request-Id": [
    "3321e6a7-8914-4cd7-a789-9cf94a9885dc"
  ],
  "X-Amz-Firehose-Source-Arn": [
    "arn:aws:firehose:ap-northeast-1:123456789012:deliverystream/http_test"
  ],
  "X-Amzn-Trace-Id": [
    "Root=1-5f23634e-0ca54a22580994eee90e9c00"
  ],
  "X-Forwarded-For": [
    "13.231.243.5"
  ],
  "X-Forwarded-Port": [
    "443"
  ],
  "X-Forwarded-Proto": [
    "https"
  ]
},
"queryStringParameters": null,
"multiValueQueryStringParameters": null,
"pathParameters": null,
"stageVariables": null,
"requestContext": {
  "resourceId": "xxxxxxxxxx",
  "resourcePath": "/",
  "httpMethod": "POST",
  "extendedRequestId": "Qgx0QGZJtjMFTZw=",
  "requestTime": "31/Jul/2020:00:18:22 +0000",
  "path": "/dev",
  "accountId": "123456789012",
  "protocol": "HTTP/1.1",
  "stage": "dev",
  "domainPrefix": "xxxxxxxxxx",
  "requestTimeEpoch": 1596154702396,
  "requestId": "f6c1be06-aa8e-44ad-b48c-892d73c479b9",
  "identity": {
    "cognitoIdentityPoolId": null,
    "accountId": null,
    "cognitoIdentityId": null,
    "caller": null,
    "sourceIp": "13.231.243.5",
    "principalOrgId": null,
    "accessKey": null,
    "cognitoAuthenticationType": null,
    "cognitoAuthenticationProvider": null,
    "userArn": null,
    "userAgent": "Amazon Kinesis Data Firehose Agent/1.0",
    "user": null
  },
  "domainName": "xxxxxxxxxx.execute-api.ap-northeast-1.amazonaws.com",
  "apiId": "xxxxxxxxxx"
},
"body": "{\\"requestId\\":\\"3321e6a7-8914-4cd7-a789-9cf94a9885dc\\",\\"timestamp\\":1596154702386,\\"records\\":[{\\"data\\":\\"eyJ0aWNrZXJfc3ltYm9sIjoiQk5NIiwic2VjdG9yIjoiVEVDSE5PTE9HWSIsImNoYW5nZSI6My4xNSwicHJpY2UiOjE4MS4yNH0=\\"},{\\"data\\":\\"eyJ0aWNrZXJfc3ltYm9sIjoiQVpMIiwic2VjdG9yIjoiSEVBTFRIQ0FSRSIsImNoYW5nZSI6LTAuNjEsInByaWNlIjoxOC4yMX0=\\"},
...略
"isBase64Encoded": false
}

ポイントを抜粋してみましょう

リクエストヘッダ

パット目についたのは以下の項目でしょうか

  • X-Amz-Firehose-Access-Key
    • HTTPエンドポイント設定時に指定したAccess Keyである012345789が入っています。HTTPエンドポイント側でアクセス元がFirehoseであることを確認するにはこのヘッダを確認すれば良さそうです。
  • X-Amz-Firehose-Common-Attributes
    • "{\\"commonAttributes\\":{\\"key1\\":\\"val1\\"}}"という文字列が入っていました。 HTTPエンドポイント設定時に指定したParametersがJSON形式で設定されているようです
  • X-Amz-Firehose-Protocol-Version
    • 1.0が設定されています。今後バージョンアップなどあればインクリメントされると思われます。
  • X-Amz-Firehose-Request-Id
    • UUIDと思われる文字列が入っていました。ヘッダの名称からもFirehose -> HTTPエンドポイントのリクエストを一意に識別可能なIDが設定されていると思われます。
  • X-Amz-Firehose-Source-Arn
    • 配信ストリームのARNが設定されていました

リクエストボディ

リクエストボディは以下のような形式でした。

"body": "{\\"requestId\\":\\"3321e6a7-8914-4cd7-a789-9cf94a9885dc\\",\\"timestamp\\":1596154702386,\\"records\\":[{\\"data\\":\\"eyJ0aWNrZXJfc3ltYm9sIjoiQk5NIiwic2VjdG9yIjoiVEVDSE5PTE9HWSIsImNoYW5nZSI6My4xNSwicHJpY2UiOjE4MS4yNH0=\\"},{\\"data\\":\\"eyJ0aWNrZXJfc3ltYm9sIjoiQVpMIiwic2VjdG9yIjoiSEVBTFRIQ0FSRSIsImNoYW5nZSI6LTAuNjEsInByaWNlIjoxOC4yMX0=\\"},
 ...略{\\"data\\":\\"eyJ0aWNrZXJfc3ltYm9sIjoiQ1ZCIiwic2VjdG9yIjoiVEVDSE5PTE9HWSIsImNoYW5nZSI6LTAuMzcsInByaWNlIjo1Mi4yNn0=\\"}]}"
  • requestId
  • timestamp
  • records

という要素から構成されるJSONを文字列に変換したデータがセットされるようです。recordsの中にはさらに{"data": "base64エンコードされた文字列"}という構造のデータが複数レコードセットされています。

HTTPエンドポイントの側で処理する際はrecordsの中身をループで取り出しながら、base64でデコードしてから処理すると良さそうです。このあたりの処理はKinesis Data StreamsからLambdaを起動した場合と同じように書けそうですね。

まとめ

Kinesis FirehoseのHTTPエンドポイントについてご紹介しました。ストリーミングデータの処理に関してさらに選択肢が増えました。今後のアーキテクチャ設計に活かしていきたいと思います。

参考