この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
CX事業本部@大阪の岩田です。 7/29付けのアップデートでKinesis Firehoseのデータ送信先にHTTPエンドポイントが利用可能になりました。
さっそく試してみたので、簡単に内容を紹介させて頂きます。
これまでとこれから
これまでKinesis Firehoseの送信先としては
- S3
- Redshift
- Amazon ES
- Splunk
が選択可能でした。今回HTTPエンドポイントが選択可能になったことで、自前のWebアプリケーションやサードパーティサービスとの統合がより簡単になりました。ちなみに今回のHTTPエンドポイントのサポートに合わせて
- Datadog
- New Relic
- MongoDB Cloud
といったサードパーティーサービスも送信先としてサポートが追加されています。
- Amazon Kinesis Data Firehose now supports data delivery to MongoDB Cloud
- Amazon Kinesis Data Firehose now supports data delivery to New Relic
- Amazon Kinesis Data Firehose now supports data delivery to Datadog
やってみる
では、ここからは実際にHTTPエンドポイントを試してみようと思います。
HTTPエンドポイントの用意
まずはデータ送信先のエンドポイントの準備です。今回はAPI GW & Lambdaを使ってHTTPエンドポイントを用意します。
以下のような簡易なLambda Functionを用意しました。Lambdaのイベントデータをそのままログに出力しつつ、JSONに変換して返却するだけの内容です。
import json
def handler(event, context):
print (event)
return {
'statusCode': 200,
'body': json.dumps(event)
}
このLambdaをAPI GWの統合リクエストに設定し、APIをデプロイしておきます。
配信ストリームの作成
ここからKinesis Firehoseの配信ストリームを作成し、先程作成したAPI GWにデータを配信してみます。
まずは配信ストリームの作成
送信先の設定以外は全てデフォルトのまま進めていきます。
送信先としてHTTPエンドポイントを選択します。
HTTPエンドポイントの詳細を設定します。
HTTP endpoint URLにはAPI GWのURLを指定します。その他のパラメータとしてオプションで
- Access Key
- Parameters
が指定可能なのですが、エンドポイントにどういう形で連携されるのかを確認しておきたいので適当な値を設定しています。
その他の設定は全てデフォルトのまま作成を完了させます。
配信ストリームが作成できたら「Sending demo data」からテストデータを送信します。
しばらく待つとHTTPエンドポイント(API GW)に対してデータが配信されるので少し休憩します。
HTTPエンドポイントに対するリクエストのログを確認
Cloud WatchのメトリクスHTTP endpoint delivery success
が増加していることが確認できたら、Lambdaのログを確認してみましょう。ログに出力されたイベントデータは以下のような構造でした。このログからKinesis Firehoseがどういった形式でHTTPエンドポイントにリクエストを発行しているかが分かりますね。
"resource": "/",
"path": "/",
"httpMethod": "POST",
"headers": {
"Content-Type": "application/json",
"Host": "xxxxxxxxxx.execute-api.ap-northeast-1.amazonaws.com",
"User-Agent": "Amazon Kinesis Data Firehose Agent/1.0",
"X-Amz-Firehose-Access-Key": "012345789",
"X-Amz-Firehose-Common-Attributes": "{\\"commonAttributes\\":{\\"key1\\":\\"val1\\"}}",
"X-Amz-Firehose-Protocol-Version": "1.0",
"X-Amz-Firehose-Request-Id": "3321e6a7-8914-4cd7-a789-9cf94a9885dc",
"X-Amz-Firehose-Source-Arn": "arn:aws:firehose:ap-northeast-1:123456789012:deliverystream/http_test",
"X-Amzn-Trace-Id": "Root=1-5f23634e-0ca54a22580994eee90e9c00",
"X-Forwarded-For": "13.231.243.5",
"X-Forwarded-Port": "443",
"X-Forwarded-Proto": "https"
},
"multiValueHeaders": {
"Content-Type": [
"application/json"
],
"Host": [
"xxxxxxxxxx.execute-api.ap-northeast-1.amazonaws.com"
],
"User-Agent": [
"Amazon Kinesis Data Firehose Agent/1.0"
],
"X-Amz-Firehose-Access-Key": [
"012345789"
],
"X-Amz-Firehose-Common-Attributes": [
"{\\"commonAttributes\\":{\\"key1\\":\\"val1\\"}}"
],
"X-Amz-Firehose-Protocol-Version": [
"1.0"
],
"X-Amz-Firehose-Request-Id": [
"3321e6a7-8914-4cd7-a789-9cf94a9885dc"
],
"X-Amz-Firehose-Source-Arn": [
"arn:aws:firehose:ap-northeast-1:123456789012:deliverystream/http_test"
],
"X-Amzn-Trace-Id": [
"Root=1-5f23634e-0ca54a22580994eee90e9c00"
],
"X-Forwarded-For": [
"13.231.243.5"
],
"X-Forwarded-Port": [
"443"
],
"X-Forwarded-Proto": [
"https"
]
},
"queryStringParameters": null,
"multiValueQueryStringParameters": null,
"pathParameters": null,
"stageVariables": null,
"requestContext": {
"resourceId": "xxxxxxxxxx",
"resourcePath": "/",
"httpMethod": "POST",
"extendedRequestId": "Qgx0QGZJtjMFTZw=",
"requestTime": "31/Jul/2020:00:18:22 +0000",
"path": "/dev",
"accountId": "123456789012",
"protocol": "HTTP/1.1",
"stage": "dev",
"domainPrefix": "xxxxxxxxxx",
"requestTimeEpoch": 1596154702396,
"requestId": "f6c1be06-aa8e-44ad-b48c-892d73c479b9",
"identity": {
"cognitoIdentityPoolId": null,
"accountId": null,
"cognitoIdentityId": null,
"caller": null,
"sourceIp": "13.231.243.5",
"principalOrgId": null,
"accessKey": null,
"cognitoAuthenticationType": null,
"cognitoAuthenticationProvider": null,
"userArn": null,
"userAgent": "Amazon Kinesis Data Firehose Agent/1.0",
"user": null
},
"domainName": "xxxxxxxxxx.execute-api.ap-northeast-1.amazonaws.com",
"apiId": "xxxxxxxxxx"
},
"body": "{\\"requestId\\":\\"3321e6a7-8914-4cd7-a789-9cf94a9885dc\\",\\"timestamp\\":1596154702386,\\"records\\":[{\\"data\\":\\"eyJ0aWNrZXJfc3ltYm9sIjoiQk5NIiwic2VjdG9yIjoiVEVDSE5PTE9HWSIsImNoYW5nZSI6My4xNSwicHJpY2UiOjE4MS4yNH0=\\"},{\\"data\\":\\"eyJ0aWNrZXJfc3ltYm9sIjoiQVpMIiwic2VjdG9yIjoiSEVBTFRIQ0FSRSIsImNoYW5nZSI6LTAuNjEsInByaWNlIjoxOC4yMX0=\\"},
...略
"isBase64Encoded": false
}
ポイントを抜粋してみましょう
リクエストヘッダ
パット目についたのは以下の項目でしょうか
- X-Amz-Firehose-Access-Key
- HTTPエンドポイント設定時に指定したAccess Keyである
012345789
が入っています。HTTPエンドポイント側でアクセス元がFirehoseであることを確認するにはこのヘッダを確認すれば良さそうです。
- HTTPエンドポイント設定時に指定したAccess Keyである
- X-Amz-Firehose-Common-Attributes
"{\\"commonAttributes\\":{\\"key1\\":\\"val1\\"}}"
という文字列が入っていました。 HTTPエンドポイント設定時に指定したParametersがJSON形式で設定されているようです
- X-Amz-Firehose-Protocol-Version
1.0
が設定されています。今後バージョンアップなどあればインクリメントされると思われます。
- X-Amz-Firehose-Request-Id
- UUIDと思われる文字列が入っていました。ヘッダの名称からもFirehose -> HTTPエンドポイントのリクエストを一意に識別可能なIDが設定されていると思われます。
- X-Amz-Firehose-Source-Arn
- 配信ストリームのARNが設定されていました
リクエストボディ
リクエストボディは以下のような形式でした。
"body": "{\\"requestId\\":\\"3321e6a7-8914-4cd7-a789-9cf94a9885dc\\",\\"timestamp\\":1596154702386,\\"records\\":[{\\"data\\":\\"eyJ0aWNrZXJfc3ltYm9sIjoiQk5NIiwic2VjdG9yIjoiVEVDSE5PTE9HWSIsImNoYW5nZSI6My4xNSwicHJpY2UiOjE4MS4yNH0=\\"},{\\"data\\":\\"eyJ0aWNrZXJfc3ltYm9sIjoiQVpMIiwic2VjdG9yIjoiSEVBTFRIQ0FSRSIsImNoYW5nZSI6LTAuNjEsInByaWNlIjoxOC4yMX0=\\"},
...略{\\"data\\":\\"eyJ0aWNrZXJfc3ltYm9sIjoiQ1ZCIiwic2VjdG9yIjoiVEVDSE5PTE9HWSIsImNoYW5nZSI6LTAuMzcsInByaWNlIjo1Mi4yNn0=\\"}]}"
- requestId
- timestamp
- records
という要素から構成されるJSONを文字列に変換したデータがセットされるようです。records
の中にはさらに{"data": "base64エンコードされた文字列"}
という構造のデータが複数レコードセットされています。
HTTPエンドポイントの側で処理する際はrecords
の中身をループで取り出しながら、base64でデコードしてから処理すると良さそうです。このあたりの処理はKinesis Data StreamsからLambdaを起動した場合と同じように書けそうですね。
まとめ
Kinesis FirehoseのHTTPエンドポイントについてご紹介しました。ストリーミングデータの処理に関してさらに選択肢が増えました。今後のアーキテクチャ設計に活かしていきたいと思います。