KinesisでAPIGatewayを使用してS3にデータを送信してみた。

2022.01.28

概要

この記事では、Kinesis を介してS3にデータを送信するREST APIを作成してみました。APIはPutRecordアクションを使用してデータをKinesis Data Streamsに送信します。Kinesis Firehoseは、KinesisStreamsからS3バケットにデータを送信します。

次のように API を作成しておきました。

  • HTTP GET メソッド : Resource - /streams, Action - ListStreams. このメソッドはアカウントのストリームを一覧表示します。
  • HTTP GET メソッド : Resource - /streams/{stream-name}, Action - DescribeStreams. このメソッドは指定したストリームを表示します。
  • HTTP PUTメソッド : Resource - /streams/{stream-name}/record, Action - PutRecord. このメソッドはストリームに 1 つのデータレコードを追加します。

 

 

やってみた

Kinesis Data Streamの作成

  • Amazon Kinesisコンソールで、Data Streamsを選択して、Create data streamをクリックしておきます。
  • 次の設定でData Streamを作成しておきます。
    • Data stream capacity : Provisioned
    • Provisioned shards : 1

 

 

S3 バケットの作成

  • S3 コンソールで、Create bucketを選択します。
  • バケット名を入力して、S3バケットを作成しておきます。

 

 

Kinesis Data Firehose Deliveryストリームの作成

  • Kinesis Data Firehoseのコンソールで、Create Delivery Streamを選択します。
  • SourceでAmazon Kinesis Data Streamsを選択して、 DestinationでAmason S3を選択しておきます。
  • Source settingsで以前に作成したデータストリームを選択して、Deliveryストリーム名入力しておきます。

 

 

  • Destination設定で以前に作成したS3バケットURLの名前を入力します。

 

  • Buffer intervalを60Secondsに設定して、Deliveryストリームを作成しておきます。

 

REST APIの作成

  • API GatewayコンソールのREST APIペインで、Buildを選択しておきます。

 

  • New API を選択して、API名を入力して、Endpoint typeで regionalを選択して、Create APIをクリックしておきます。

 

APIのリソースとメソッドの作成

IAM ロールの作成

  • API GatewayのIAMロールを作成しておきます。
  •  この設定でIAMロールを作成しておきます。
    • 信頼されたエンティティの種類 : AWS のサービス
    • ユースケース : API Gateway
    • ポリシー  : AmazonKinesisAnalyticsFullAccess
  • このIAMロールは、API Gatewayのメソッドを作成するときに使用されます。

 

List streamsのリソースとメソッド

  • Resources ページで、Actionsをクリック、Create Resourceを 選択しておきます。
  • リソース名[streams]を入力して、[Create Resource]をクリックしておきます。

 

 

  • Actionsで、Create Methodを選択して、GETを選択しておきます。
  • 次の設定でGET Methodを作成しておきます。
    • Integration type : AWS Service
    • Region : us-east-1
    • AWS Service : Kinesis
    • HTTP Method : POST
    • Action : ListStreams
    • Role : 前の手順で作成したIAMロールArnを入力します。

 

 

  • Integration Requestをクリックして、HTTP HeaderとMapping Templateを追加しておきます。

 

  • Mapping TemplateでContentタイプとしてapplication/jsonを追加して、空のテンプレートを作成しておきます。

 

Describe Streamsのリソースとメソッド

  • プロキシでリソース[/streams/{stream-name}]を作成しておきます。

 

  • 次の設定でGET Methodを作成しておきます。
    • Integration type : AWS Service
    • Region : us-east-1
    • AWS Service : Kinesis
    • HTTP Method : POST
    • Action : DescribeStreams
    • Role : 前の手順で作成したIAMロールArnを入力します。
  • HTTP HeaderとMapping Templateを追加しておきます。
  • Mapping TemplateでContentタイプとしてapplication/jsonを追加して、テンプレートに次のコードを追加しておきます。

 

 

Put Recordのリソースとメソッド

  • リソース[/streams/{stream-name}/record]を作成しておきます。

 

  • 次の設定でPUT Methodを作成しておきます。
    • Integration type : AWS Service
    • Region : us-east-1
    • AWS Service : Kinesis
    • HTTP Method : POST
    • Action : PutRecord
    • Role : 前の手順で作成したIAMロールArnを入力します。
  • HTTP HeaderとMapping Templateを追加しておきます。
  • Mapping TemplateでContentタイプとしてapplication/jsonを追加して、テンプレートに次のコードを追加しておきます。

 

{
     "StreamName": "$input.params('stream-name')",
     "Data": "$util.base64Encode($input.json('$.Data'))Cg==",
     "PartitionKey": "$input.path('$.PartitionKey')"
}

 

APIのデプロイとテストする

  • Actionsで、Deploy APIを選択しておきます。
  • New Stageを選択して、ステージ名を入力して、Deployをクリックしておきます

 

 

  • HTTP コマンドを使用してAPIをテストできます。次のコマンドを実行して、Kinesis Data Streamsにデータを送信します。
#Use HTTP PUT command to put data records in to Kinesis Streams
http put  API-Invoke-url/deploy-stage-name/list-stream-resource-name/kinesis-data-stream-name/data-to-be-sent

#Copy the Invoke Url from API Gateway console after deploying the api.
http put  https://id.execute-api.us-east-1.amazonaws.com/test-api/streams/api-data-s3/record Data='data to s3' PartitionKey=1

 

  • データがS3バケットに送信されたことはS3コンソールでみることができます。

 

まとめ

Kinesis Data StreamsとKinesis Firehoseを介してS3にデータを送信するREST APIを作成してみました。必要に応じてレコードを取得するなど、他のアクションを使用できます。

Kinesisで使用できるアクション :  CreateStream, DeleteStream, PutRecords, GetRecords,  GetShardIterator。

Reference : API Gateway with Kinesis