AWS CDKでAPI GatewayからKinesis Data Streamsにデータを流す設定を作成してみた

AWS CDKでAPI GatewayからKinesis Data Streamsにデータを流す設計を作成してみます。CDKのインストールからデプロイ、実行結果確認までを記事にしました。CDKでAPI GatewayのAWS統合を使用する場合にも参考になるかと思います。
2020.04.08

はじめに

CX事業本部東京オフィスの佐藤智樹です。

今回はAWS CDKでAPI GatewayからKinesis Data Streamsにデータを流す設計を作成してみます。

コンソール画面で設計の確認をしてからCDKのコードに設計を落としているのですが、API GatewayのLambdaプロキシ統合ばかり使用していたためAPI GatewayのAWS統合を指定する場合にオプションの階層構造が画面とうまく結び付かなかったので備忘録として残します。

実行環境

項目名 バージョン
mac OS Catalina 10.15.3
npm 6.13.6
AWS CDK 1.31.0

事前準備

AWS CDKを使うためにAWS CLIやAWSアカウントの準備、npmをインストールしてください。 まずCDKのCLIをインストールします。(インストール済みの方は飛ばしても大丈夫です。ただnpm installするバージョンが異なるとビルドでエラーがでる場合があるのでその場合はCDKのバージョンを統一してください)

npm -g install aws-cdk@1.31.0

次にCDKでプロジェクトを作成します。cdk initを実行するフォルダ名がstack名になります。

mkdir apigateway-kinesis
cd apigateway-kinesis
cdk init app --language typescript

モジュールのインストール

CDKで使用するモジュールをnpmでインストールしていきます。

npm install @aws-cdk/core@1.31.0
npm install @aws-cdk/aws-apigateway@1.31.0
npm install @aws-cdk/aws-kinesis@1.31.0
npm install @aws-cdk/aws-iam@1.31.0

API GatewayとKinesis Data Streamsを設定

AWS CDKでAPI GatewayとKinesis Data Streams、後それらを動かすためのロール・ポリシーを作成します Kinesis Data Streamsは最低限の設定になっているので用途によってパラメータを追加してください。 またデータをKinesisに溜めるまでの設定なのでデータを処理する場合はEventSourceMappingでLambdaとKinesisを指定してください。

./lib/apigateway-kinesis-stack.ts

import * as cdk from '@aws-cdk/core';
import * as apigateway from '@aws-cdk/aws-apigateway';
import * as kinesis from '@aws-cdk/aws-kinesis';
import * as iam from '@aws-cdk/aws-iam'

export class ApigatewayKinesisStack extends cdk.Stack {
  constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    const region = cdk.Stack.of(this).region;
    const accountId = cdk.Stack.of(this).account;
    const streamName = 'users-stream';

    // Kinesis Data Streamsの生成
    // 2020/04/07現在Kinesis Data StreamsのHigh-level Constructsはpublic betaなので
    // CloudFomationと1対1対応になるLow-level Constructs(prefixがCfnのもの)を使う
    // 参考:https://docs.aws.amazon.com/cdk/api/latest/docs/aws-kinesis-readme.html
    const usersKinesis = new kinesis.CfnStream(this, `UsersKinesis`, {
        shardCount: 1,
        name: streamName,
      });

    // API Gatewayを作成
    const restApi = new apigateway.RestApi(this, 'RestApi', {
      restApiName: `apigateway-to-kinesis`,
      deployOptions: {
        stageName: 'v1',
      },
    });

    // API GatewayからKinesis Data Streamsにアクセスするためのロールの作成
    const apiGatewayToKinesisRole = new iam.Role(
      this,
      'ApiGatewayToKinesisRole',
      {
        assumedBy: new iam.ServicePrincipal('apigateway.amazonaws.com'),
      },
    );
    // 必要なポリシーをアタッチ
    apiGatewayToKinesisRole.addToPolicy(
      new iam.PolicyStatement({
        actions: ['kinesis:PutRecord'],
        effect: iam.Effect.ALLOW,
        resources: [`arn:aws:kinesis:${region}:${accountId}:stream/${streamName}`],
      }),
    );
    apiGatewayToKinesisRole.addToPolicy(
      new iam.PolicyStatement({
        actions: [
          'logs:CreateLogGroup',
          'logs:CreateLogStream',
          'logs:DescribeLogGroups',
          'logs:DescribeLogStreams',
          'logs:PutLogEvents',
          'logs:GetLogEvents',
          'logs:FilterLogEvents',
        ],
        effect: iam.Effect.ALLOW,
        resources: ['*'],
      }),
    );

    // エンドポイントの追加
    const usersResource = restApi.root.addResource('users');

    // API GatewayからKinesis Data Streamsに流す設定を作成
    usersResource.addMethod(
      'POST',
      new apigateway.AwsIntegration({
        service: 'kinesis',
        action: 'PutRecord',
        options: {
          credentialsRole: apiGatewayToKinesisRole,
          passthroughBehavior: apigateway.PassthroughBehavior.WHEN_NO_TEMPLATES,
          // Kinesis Data Streamsに流すための形式にリクエストデータを整形
          // "StreamName" : 対象のストリーム名を設定
          // "Data" : Kinesis Data Streamsへ渡すデータ本体
          // "PartitionKey" : ストリーム内のどのシャードに流すか判断するための値
          requestTemplates: {
            'application/json': `{
              "StreamName": "${usersKinesis.name}",
              "Data": "$util.base64Encode($input.json('$'))",
              "PartitionKey": $input.json('$.uuid')
            }`,
          },
          // 統合レスポンスの設定
          integrationResponses: [
            {
              statusCode: '200',
              responseTemplates: {
                'application/json': `{"result":"ok"}`,
              },
            },
          ],
        },
      }),
      //メソッドレスポンスの設定
      {
        methodResponses: [
          {
            statusCode: '200',
          },
        ],
      },
    );
  }
}

デプロイ

上記作業が完了したらビルド後デプロイします。

npm run build
cdk deploy

動作確認

API Gatewayの画面とKinesis Data Streamsの画面で動作確認をします。

AWSのWeb画面コンソールでAPI Gatewayの項目から今回作成したAPI名「apigateway-to-kinesis」を開きます。POST->テストの順でテスト画面を開くとリクエストのテストができます。ここで下記の画像のようにリクエスト本文にuuid、dataを追加してテストを実行します。

{
    "uuid": "ec6b83b5-03fa-44c8-8fee-f4e81070e44e",
    "data": "sample"
}

実行後レスポンス本文に{"result":"ok"}が帰ってくればAPI Gatewayの設定は問題なしです。

次はKinesis Data Streamsの画面から今回作成した「users-stream」を選択します。

モニタリングを押すと「Putレコード」などの項目でデータが送信されたことが確認できます。

不要なリソースの削除

確認後は下記のコマンドで不要になったリソースを削除しておいてください。Kinesis置いとくだけでお金かかるので

cdk destroy

参考資料

チュートリアル: API Gateway で Amazon Kinesis プロキシとして REST API を作成する

aws-kinesis module · AWS CDK