CDK + API Gateway + Web Socket を使ってみた

2020.12.11

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

API GatewayWeb Socket APIを作ってみたのでご紹介します。

作りたいものとしては、S3に画像をアップロードする機能があり、その際に管理画面などに画像がアップロードされたのを通知で表示したいというものです。 簡単な構成としては以下のような感じです。

今回紹介するソースコードは、Typescriptで記述しています。

作ったサンプルは以下のGitHubで公開しています。
WebSocket

AWS環境を構築

CDKでAWS環境を構築します。

S3を作成

画像のアップロード先のS3を作成します。

const webSocketBucket = new s3.Bucket(this, 'webSocketBucket')

DynamoDBのテーブルを作成

クライアント接続時のコネクションIDを保持するDynamoDBのテーブルを作成します。
画像がアップロードされた場合の通知先として利用します。

const webSocketConnection = new dynamodb.Table(this, 'webSocketConnection', {
  partitionKey: {
    name: 'connectionId',
    type: dynamodb.AttributeType.STRING,
  },
  tableName: 'webSocketConnection',
  billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
})

ApiGatewayを作成

Web Socket APIを作成します。

const api = new apigatewayv2.CfnApi(this, name, {
  name: 'WebSocketApi',
  protocolType: 'WEBSOCKET',
  routeSelectionExpression: '$request.body.action',
})

routeSelectionExpressionはメッセージに含まれるどの値で、ルーティングを決定するかを指定します。
今回は以下のようなJSONでメッセージのやりとりを想定して設定してます。

{
    "action": "testRoute",
    "data": {
        "message": "test"
    }
}

この場合は、testRouteにルーティングされます。

Routeを作成

routeSelectionExpressionで指定したルーティングは、独自のルーティングと標準で用意されているルーティングがあります。
標準ルーティングは以下の通りです。

  • $connect クライアント接続時のルーティング
  • $disconnect クライアント切断時のルーティング
  • $default 設定されてないルートの場合のルーティング

$connectのルートを作成します。

// Lambdaを作成
const connectLambda = new lambda.Function(scope, 'web-socket-connect', {
    code: new lambda.AssetCode('lib/handler'),
    handler: 'webSocket/connect.handler',
    runtime: lambda.Runtime.NODEJS_12_X,
    environment: {
      TABLE_NAME: webSocketConnection.tableName,
      TABLE_KEY: 'connectionId',
    },
  })

// テーブルへのアクセス権限
webSocketConnection.grantWriteData(connectLambda)

// Lambda呼び出し用のロール
const policy = new iam.PolicyStatement({
  effect: iam.Effect.ALLOW,
  resources: [connectLambda.functionArn],
  actions: ['lambda:InvokeFunction'],
})

const role = new iam.Role(this, `${name}-iam-role`, {
  assumedBy: new iam.ServicePrincipal('apigateway.amazonaws.com'),
})
role.addToPolicy(policy)

const integration = new apigatewayv2.CfnIntegration(scope, `connect-lambda-integration`, {
  apiId: api.ref,
  integrationType: 'AWS_PROXY',
  integrationUri: `arn:aws:apigateway:${region}:lambda:path/2015-03-31/functions/${lambda.functionArn}/invocations`,
  credentialsArn: role.roleArn,
})

const route = new apigatewayv2.CfnRoute(scope, `connect-route`, {
  apiId: api.ref,
  routeKey: "$connect", // *1
  authorizationType: 'NONE',
  target: 'integrations/' + integration.ref,
})

$disconnectルートや独自ルートなども同じような感じで作成します。
ルートの指定は*1で変更できます。

$connectルートのハンドラーも用意します。

export async function handler(event: any): Promise<any> {
  const client = new AWS.DynamoDB.DocumentClient()

  // DynamoDBテーブルに保存する
  const result = await client
    .put({
      TableName: process.env.TABLE_NAME || '',
      Item: {
        connectionId: event.requestContext.connectionId,
      },
    })
    .promise()

  return {
    statusCode: 200,
    body: 'onConnect.',
  }
}

$connectではコネクションIDをDynamoDBへ保存しています。
逆に$disconnectでは、コネクションIDをDynamoDBから削除する必要があります。

statusCodeで500を返すと、接続を拒否することも可能です。

ステージを作成

APIのデプロイ先のステージを作成します。

const deployment = new apigatewayv2.CfnDeployment(this, `${name}-deployment`, {
  apiId: api.ref,
})

const stage = new apigatewayv2.CfnStage(this, `${name}-stage`, {
  apiId: api.ref,
  autoDeploy: true,
  deploymentId: deployment.ref,
  stageName,
})

通知用のLambdaを作成

S3への画像アップロードのイベント契機で動く、Lambdaを作成します。

// Web Socket APIのリソース名を生成
const resouce = `arn:aws:execute-api:${region}:${this.account}:${api.ref}/${stageName}/POST/@connections/*`

// WebSocket経由での通知権限
const policy = new iam.PolicyStatement({
  effect: iam.Effect.ALLOW,
  resources: [resouce],
  actions: ['execute-api:ManageConnections'],
})

const role = new iam.Role(scope, 'sendMessageLambdaRole', {
  assumedBy: new iam.ServicePrincipal('lambda.amazonaws.com'),
})
role.addToPolicy(policy)
role.addManagedPolicy(iam.ManagedPolicy.fromAwsManagedPolicyName('service-role/AWSLambdaBasicExecutionRole'))

// Lambdaを作成
const sendMessageLamdba = new lambda.Function(scope, 'sendMessageLambda', {
  code: new lambda.AssetCode('lib/handler'),
  handler: 'webSocket/sendMessage.handler',
  runtime: lambda.Runtime.NODEJS_12_X,
  role,
  environment: {
    TABLE_NAME: webSocketConnection.tableName,
    TABLE_KEY: 'connectionId',
  },
})

// S3へのアップロードイベント
sendMessageLamdba.addEventSource(
  new S3EventSource(webSocketBucket, {
    events: [s3.EventType.OBJECT_CREATED],
  }),
)

// テーブルへのアクセス権限
webSocketConnection.grantReadWriteData(sendMessageLamdba)

ハンドラーも作成します。

export async function handler(event: any): Promise<any> {
  const endpoint = 'https://{ApiID}.execute-api.ap-northeast-1.amazonaws.com/dev'

  const apiGateway = new AWS.ApiGatewayManagementApi({ endpoint })

  const client = new AWS.DynamoDB.DocumentClient()

  // DBからコネクションIDを取得
  const result = await client.scan({ TableName: process.env.TABLE_NAME || '' }).promise()

  for (const data of result.Items ?? []) {
    const params = {
      Data: '画像がアップロードされました',
      ConnectionId: data.connectionId,
    }

    try {
      await apiGateway.postToConnection(params).promise()
    } catch (err) {
      if (err.statusCode === 410) {
        console.log('Found stale connection, deleting ' + data.connectionId)
        await client
          .delete({
            TableName: process.env.TABLE_NAME || '',
            Key: { [process.env.TABLE_KEY || '']: data.connectionId },
          })
          .promise()
      } else {
        console.log('Failed to post. Error: ' + JSON.stringify(err))
      }
    }
  }
}

DynamoDBから$connectで保存したコネクションIDを取得して、postToConnection()を使ってメッセージを送信しています。

送信処理をawaitforで回していますが、LambdaのタイムアウトもあるのでSQSを使ってもいいかもしれません。

また、送信時にstatusCode === 410だった場合、接続ができなくなっているのでDynamoDBから削除しています。

以上でCDKの実装が完了したので、cdk deployでAWS環境をデプロイできます。

wscatコマンドで動作確認

AWS環境ができたのでwscatコマンドを使って動作を確認してみます。

接続先のURLは、API GatewayのAWSコンソールをみるとわかります。

WebSocket URLが利用するURLになります。

# wscatコマンドのインストール
$ npm install -g wscat

# クライアント接続
$ wscat -c wss://{ApiID}.execute-api.ap-northeast-1.amazonaws.com/dev
Connected (press CTRL+C to quit)
>

Connected (press CTRL+C to quit)と表示されれば接続成功です。 この状態で、S3に画像をアップロードすればサーバー側からメッセージを受信します。

$ wscat -c wss://{ApiID}.execute-api.ap-northeast-1.amazonaws.com/dev
Connected (press CTRL+C to quit)
< 画像がアップロードされました
>

画像がアップロードされましたというメッセージが受信できました。

フロントエンド

フロント側はWeb Socket APIを利用して、作成したWeb Socket APIに接続します。

今回はReactでの実装例です。

const socket = new WebSocket(
  "wss://*****.execute-api.ap-northeast-1.amazonaws.com/dev"
);

function WebSocketComponent() {
  React.useEffect(() => {
    socket.onopen = (event) => {
      // クライアント接続時
      console.log("onopen", event);
    };

    socket.onmessage = (event) => {
      // サーバーからのメッセージ受信時
      console.log("onmessgae", event);
    };

    socket.onclose = (event) => {
      // クライアント切断時
      console.log("onclose", event);
    };

    return () => {
      socket.close();
    };
  });

  return (
    <div>
      <p>WebSocketComponent</p>
    </div>
  );
}

export default WebSocketComponent;

実行してChromeのデベロッパーツールなどでログを見ると、接続されていることを確認できるかと思います。

あとは、通知もしたいのでNotification APIを使ってみます。

function WebSocketComponent() {
  React.useEffect(() => {
    // 通知権限を要求
    const notification = window.Notification
    var permission = notification.permission
    if (permission === 'denied' || permission === 'granted') {
      return
    }
    Notification.requestPermission()
  })

  ・・・

  React.useEffect(() => {
    socket.onmessage = event => {
      // サーバーからのメッセージ受信時
      // 通知を表示
      new Notification(event.data)
    }
  })

  ・・・
}

作成したs3に画像をアップロードすれば、以下のような通知が表示されます。

最後に

Api GaatewayWeb Socket APIを使った実装の紹介でした。
サーバーレスでWeb Socketを構築できて、結構便利なのかと思います。

最後までご覧いただきありがとうございました。