KinesisでAPIGatewayを使用してS3にデータを送信してみた。
概要
この記事では、Kinesis を介してS3にデータを送信するREST APIを作成してみました。APIはPutRecordアクションを使用してデータをKinesis Data Streamsに送信します。Kinesis Firehoseは、KinesisStreamsからS3バケットにデータを送信します。
次のように API を作成しておきました。
- HTTP GET メソッド : Resource - /streams, Action - ListStreams. このメソッドはアカウントのストリームを一覧表示します。
- HTTP GET メソッド : Resource - /streams/{stream-name}, Action - DescribeStreams. このメソッドは指定したストリームを表示します。
- HTTP PUTメソッド : Resource - /streams/{stream-name}/record, Action - PutRecord. このメソッドはストリームに 1 つのデータレコードを追加します。
やってみた
Kinesis Data Streamの作成
- Amazon Kinesisコンソールで、Data Streamsを選択して、Create data streamをクリックしておきます。
- 次の設定でData Streamを作成しておきます。
- Data stream capacity : Provisioned
- Provisioned shards : 1
S3 バケットの作成
- S3 コンソールで、Create bucketを選択します。
- バケット名を入力して、S3バケットを作成しておきます。
Kinesis Data Firehose Deliveryストリームの作成
- Kinesis Data Firehoseのコンソールで、Create Delivery Streamを選択します。
- SourceでAmazon Kinesis Data Streamsを選択して、 DestinationでAmason S3を選択しておきます。
- Source settingsで以前に作成したデータストリームを選択して、Deliveryストリーム名入力しておきます。
- Destination設定で以前に作成したS3バケットURLの名前を入力します。
- Buffer intervalを60Secondsに設定して、Deliveryストリームを作成しておきます。
REST APIの作成
- API GatewayコンソールのREST APIペインで、Buildを選択しておきます。
- New API を選択して、API名を入力して、Endpoint typeで regionalを選択して、Create APIをクリックしておきます。
APIのリソースとメソッドの作成
IAM ロールの作成
- API GatewayのIAMロールを作成しておきます。
- この設定でIAMロールを作成しておきます。
- 信頼されたエンティティの種類 : AWS のサービス
- ユースケース : API Gateway
- ポリシー : AmazonKinesisAnalyticsFullAccess
- このIAMロールは、API Gatewayのメソッドを作成するときに使用されます。
List streamsのリソースとメソッド
- Resources ページで、Actionsをクリック、Create Resourceを 選択しておきます。
- リソース名[streams]を入力して、[Create Resource]をクリックしておきます。
- Actionsで、Create Methodを選択して、GETを選択しておきます。
- 次の設定でGET Methodを作成しておきます。
- Integration type : AWS Service
- Region : us-east-1
- AWS Service : Kinesis
- HTTP Method : POST
- Action : ListStreams
- Role : 前の手順で作成したIAMロールArnを入力します。
- Integration Requestをクリックして、HTTP HeaderとMapping Templateを追加しておきます。
- Mapping TemplateでContentタイプとしてapplication/jsonを追加して、空のテンプレートを作成しておきます。
Describe Streamsのリソースとメソッド
- プロキシでリソース[/streams/{stream-name}]を作成しておきます。
- 次の設定でGET Methodを作成しておきます。
- Integration type : AWS Service
- Region : us-east-1
- AWS Service : Kinesis
- HTTP Method : POST
- Action : DescribeStreams
- Role : 前の手順で作成したIAMロールArnを入力します。
- HTTP HeaderとMapping Templateを追加しておきます。
- Mapping TemplateでContentタイプとしてapplication/jsonを追加して、テンプレートに次のコードを追加しておきます。
Put Recordのリソースとメソッド
- リソース[/streams/{stream-name}/record]を作成しておきます。
- 次の設定でPUT Methodを作成しておきます。
- Integration type : AWS Service
- Region : us-east-1
- AWS Service : Kinesis
- HTTP Method : POST
- Action : PutRecord
- Role : 前の手順で作成したIAMロールArnを入力します。
- HTTP HeaderとMapping Templateを追加しておきます。
- Mapping TemplateでContentタイプとしてapplication/jsonを追加して、テンプレートに次のコードを追加しておきます。
{ "StreamName": "$input.params('stream-name')", "Data": "$util.base64Encode($input.json('$.Data'))Cg==", "PartitionKey": "$input.path('$.PartitionKey')" }
APIのデプロイとテストする
- Actionsで、Deploy APIを選択しておきます。
- New Stageを選択して、ステージ名を入力して、Deployをクリックしておきます。
- HTTP コマンドを使用してAPIをテストできます。次のコマンドを実行して、Kinesis Data Streamsにデータを送信します。
#Use HTTP PUT command to put data records in to Kinesis Streams http put API-Invoke-url/deploy-stage-name/list-stream-resource-name/kinesis-data-stream-name/data-to-be-sent #Copy the Invoke Url from API Gateway console after deploying the api. http put https://id.execute-api.us-east-1.amazonaws.com/test-api/streams/api-data-s3/record Data='data to s3' PartitionKey=1
- データがS3バケットに送信されたことはS3コンソールでみることができます。
まとめ
Kinesis Data StreamsとKinesis Firehoseを介してS3にデータを送信するREST APIを作成してみました。必要に応じてレコードを取得するなど、他のアクションを使用できます。
Kinesisで使用できるアクション : CreateStream, DeleteStream, PutRecords, GetRecords, GetShardIterator。
Reference : API Gateway with Kinesis