この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
AWS API Gatewayを使用してAWS サービスプロキシ経由でKinesis Firehoseに接続する方法について説明します。これにより、AWS Lambda 関数を介してではなく直接、AWS サービスを呼び出すことができます。
Lambda関数とAWSサービスプロキシの違い
API Gatewayを使用てKinesis Firehoseと連携する場合、Integration type には
- Lambda関数
- AWSサービスプロキシ
の2方式が考えられます。
Lambda関数方式
Kinesis FirehoseへのリクエストはLambda関数が行います。 リクエストデータの加工はAPI GatewayのBody Mapping TemplatesやLambda関数が行います。
AWSサービスプロキシ方式
今回はこの方式を採用します。 Kinesis FirehoseへのリクエストはAPI GatewayのIntegration Requestが行います。 リクエストデータの加工はAPI GatewayのBody Mapping Templatesが行います。
Kinesis Firehoseとは
Amazon Kinesis Firehoseは、ストリーミングデータをS3およびRedshiftに自動的にロードする完全マネージド型サービスです。
2016/03/20時点では東京リージョンでは利用できず、以下のリージョンで利用可能です。
- US East (N. Virginia) us-east-1
- US West (Oregon) us-west-2
- EU (Ireland) eu-west-1
AWS Service ProxyでKinesis Firehoseを利用する
最終的には下図にあるようなデータ連携を目指します。
以下の順で作業します。
- Kinesis Firehoseストリームの作成
- API GatewayがKinesis Firehoseを操作するためのIAM Roleの作成
- API Gatewayの作成
- AWSサービスプロキシ用メソッドの作成
- API Gatewayのデプロイ
- curlで動作確認
1. Kinesis Firehoseストリームの作成
繰り返しになりますが、Kinesis Firehoseは東京リージョンではまだ利用できないため、以下のいずれかのリージョンで作業します。
- US East (N. Virginia) us-east-1
- US West (Oregon) us-west-2
- EU (Ireland) eu-west-1
今回はUS West(Oregon)を利用します。
管理画面のウィザードに従ってポチポチやると
- ストリームデータの保存先S3バケット
- Kinesis FirehoseがS3を操作するためのIAM Role
- Kinesis Firehoseストリーム
が作成されます。
Kinesis Firehoseストリームには次のようなIAM Roleが設定されているはずです
Trust Relationships
Kinesis FirehoseがAssumeRoleすることを許可するPolicy Document
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {
"Service": "firehose.amazonaws.com"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "YOUR_ACCOUNT_ID"
}
}
}
]
}
Permissions
S3を操作するAPIを許可するPolicy Document
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Action": [
"s3:AbortMultipartUpload",
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:PutObject"
],
"Resource": [
"arn:aws:s3:::YOUR_BUCKET",
"arn:aws:s3:::YOUR_BUCKET/*"
]
}
]
}
2.API GatewayがKinesis Firehoseを操作するためのIAM Roleを作成
ここで作成するIAM RoleはAPI Gatewayのメソッド定義時に利用します。
Trust Relationships
以下のようにapigatewayがAssumeRoleすることを許可するPolicy Documentを設定します。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {
"Service": "apigateway.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
Permissions
以下のように、Kinesis Firehoseにレコード追加するAPIを許可するPolicy Documentを設定します。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Stmt",
"Effect": "Allow",
"Action": [
"firehose:PutRecord",
"firehose:PutRecordBatch"
],
"Resource": [
"*"
]
}
]
}
3.API Gatewayの作成
新規にAPI Gatewayを作成します。
4.AWSサービスプロキシ用メソッドの作成
ここからがメインの作業です。
リソースとメソッドの作成
リソース「foo」を作成し、POSTメソッドを作成します。
Integration Requestの修正
図の「Integration Request」をクリックします。
「Integration Type」には「AWS Service Proxy」を選択します。 この選択肢は初期状態では隠れており、「Show advanced」をクリックしないと表示されません。 お気をつけけください。
の表のように設定を行います。
項目 | 設定値 |
---|---|
Integration type | AWS Service Proxy |
AWS Region | Kinesis Firehoseを作成したリージョン |
AWS Service | Firehose |
AWS Subdomain | なし |
HTTP method | POST |
Action | PutRecord |
Execution role | 上記で作成した IAM Role の ARN |
Body Mapping Templatesの修正
続けてIntegration RequestのBody Mapping Templatesを修正します。
Kinesis Firehoseに単一レコードを追加するPutRecord APIのリクエストインタックスは次の形をしています。
{
"DeliveryStreamName": "string",
"Record": {
"Data": blob
}
}
API Gatewayに
{
"Data": "record"
}
というPOSTリクエストがされた時に、PutRecordのリクエストシンタックスに変換させるように、以下のように記入します。
{
"DeliveryStreamName": "YOUR_STREAM_NAME",
"Record": {
"Data": "$util.base64Encode($input.json('$.Data'))"
}
}
ポイントは "$util.base64Encode($input.json('$.Data'))" の箇所です。
$input.json('$.Data')
POSTしたJSONデータのメンバーData
の値を取得します。
$.Data
というようにJSONPathで記述できます。
$util.base64Encode(...)
Kinesis FirehoseのPutRecord APIの仕様により、Dataをbase64エンコードします。
Mapping templateの詳細は次のURLをご確認下さい。 http://docs.aws.amazon.com/apigateway/latest/developerguide/api-gateway-mapping-template-reference.html
FirehoseからClientへの戻りについては、Kinesis Firehoseのレスポンスをそのまま利用するため、passthrough とします。
メソッドの動作確認
定義したメソッドの動作確認をしましょう。
メソッドのトップ画面「Method Execution」には「TEST」ボタンがあるので、これをクリックしてメソッドのテストを行います。
Request BodyにPOSTデータを記入し「TEST」ボタンを押してテストします。 POSTデータは以下の様なJSONデータにします。
{
"Data" : "firehose"
}
Logsのエリアにサーバーログが出力されます。重要なログを拾います。
Method request body before transformations: {
"Data" : "firehose"
}
クライアントが送信したPOSTリクエストのボディーです。
Endpoint request URI: https://firehose.us-west-2.amazonaws.com/?Action=PutRecord
Integration Requestのリクエスト先URLです。
Endpoint request body after transformations: {
"DeliveryStreamName": "YOUR_STREAM_NAME",
"Record": {
"Data": "ImZpcmVob3NlIg=="
}
}
Integration RequestのMapping template通りに、Kinesis Firehoseへのリクエストデータが加工されています。
$ echo "ImZpcmVob3NlIg==" | base64 -d
"firehose"
となるので、確かにbase64エンコードされています。
Endpoint response body before transformations: {"RecordId":"YHcefFqNzV3/..."}
Kinesis Firehoseからのレスポンスです。
Method response body after transformations: {"RecordId":"YHcefFqNzV3/..."}
レスポンスに関してはpassthroughしているので、before/after transformationsでレスポンスは同じです。
S3データを確認
Kinesis Firehose->S3と連携されたデータを確認してみましょう。
$ aws s3 sync s3://YOUR_BUCKET . --region us-west-2
download: s3://YOUR_BUCKET/2016/03/20/06/fh-1-2016-03-20-06-46-37-4cd04278-8cac-43b2-a155-9cfcd0b0d14c to 2016/03/20/06/fh-1-2016-03-20-06-46-37-4cd04278-8cac-43b2-a155-9cfcd0b0d14c
$ cat 2016/03/20/06/fh-1-2016-03-20-06-46-37-4cd04278-8cac-43b2-a155-9cfcd0b0d14c
"firehose"
テスト送信したデータ("firehose")を復元できました。
5.API Gatewayのデプロイ
「Deploy API」ボタンをクリックして、ステージにデプロイします。 初期状態ではステージが一個も作成されていないため、ステージ名「prod」で作成します。
ステージ作成後はStages画面に遷移します。
6.curlで動作確認
リソース「foo」のURLに向けて POST リクエストを送信してみましょう。
$ curl -X POST \
--header "Content-Type:application/json" \
https://YOUR-DOMAIN.execute-api.us-west-2.amazonaws.com/prod/foo \
--data '{"Data" : "hello world!"}'
{"RecordId":"UleiO329lAo..."}
リクエストヘッダーにはContent-Typeも正しく設定してください。
S3データを確認
Kinesis Firehose->S3と連携されたデータを確認してみましょう。
S3オブジェクトをローカルにシンクします。
$ aws s3 sync s3://YOUR_BUCKET . --region us-west-2
download: s3://YOUR_BUCKET/2016/03/20/06/fh-1-2016-03-20-06-55-14-b8861371-3fb8-481b-9b89-8dcaafde1ac9 to 2016/03/20/06/fh-1-2016-03-20-06-55-14-b8861371-3fb8-481b-9b89-8dcaafde1ac9
$ cat 2016/03/20/06/fh-1-2016-03-20-06-55-14-b8861371-3fb8-481b-9b89-8dcaafde1ac9
"hello world!
POSTした"hello world!"が返ってきました。
AWS Service ProxyとSORACOM Funnelの比較
IoTプラットフォームSORACOMのSORACOM Funnel(以下Funnel)はAWS Service Proxyに近い機能を提供します。
FunnelはTCP/UDPにも対応している一方で、プロキシ先が
- Amazon Kinesis
- Amazon Kinesis Firehose
- Microsoft Azure Event Hubs
に限定されています。
Funnelの詳細は以下のURLを参照ください。
まとめ
AWS Service Proxyを使うと、API GatewayにHTTPSリクエストするだけでAWSサービスを操作できます。 クライアントにAWS SDKを導入しづらいようなケースで一度検討されてはいかがでしょうか?