Cognitoから払い出されたIdTokenをAPI Gateway カスタムオーソライザーのLambda(Python3.6)で検証する方法
おつかれさまです。サーバーレス開発部の新井です。
今回はタイトル通り、Cognitoのユーザプールから払いだされたIdTokenをAPI GatewayのカスタムオーソライザーのLambda(Python3.6)で検証する方法を紹介したいと思います。
API GatewayのカスタムオーソライザーでIdTokenの検証を行うには、Cognitoの自動チェックと、今回紹介するLambda Functionでチェックする2種類の方法があります。
Lambda Functionを利用する場合のメリットは、IdTokenの検証以外の認可処理を行うことができるという点です。例えば、IP制限や特定のユーザエージェントだけ許可するなど、用途に応じた処理を書くことができます。
前置きが長くなりましたが、さっそく始めていきたいと思います!
概要図
カスタムオーソライザーのLambda Functionで行う「その他の認可ロジック」に、今回はIP制限を行いたいと思います。
congnito-user-poolの作成
まずはCognitoのユーザプールから作成していきます。IdTokenの検証がメインなので、Cognitoの細かい説明などは省きます。
1. ユーザプールの新規作成
Cognitoのマネージメントコンソールからユーザプールを作成していきます。
2. ユーザプールの属性設定
デフォルトだとemailが必須項目ですが、面倒なので外します。
3. ユーザプールのアプリクライアント登録
今回は、client secretは外します。また、auth-flowはADMIN_NO_SRP_AUTHを選択します。
4. ユーザプール作成後のユーザ登録
最後まで進んでユーザプールの作成が終わったら、次はユーザの登録を行います。 まず、適当なユーザ名とパスワードでユーザを登録します。
ユーザの作成をおこなったら、ユーザのStatusがFORCE_CHANGE_PASSWORDとなっていると思うので、パスワード変更を行う必要がります。
パスワード変更する際のスクリプトはこんな感じです。必要項目を適宜埋めていってください。 実行後には、コンソールにIdToken等が出力されているのが確認できると思います。
#!/bin/sh user_pool_id="" client_id="" user_name="" password="" new_password="" session="$(aws cognito-idp admin-initiate-auth --user-pool-id ${user_pool_id} --client-id ${client_id} --auth-flow ADMIN_NO_SRP_AUTH --auth-parameters USERNAME=${user_name},PASSWORD=${password} | jq -r .Session)" aws cognito-idp admin-respond-to-auth-challenge --user-pool-id ${user_pool_id} --client-id ${client_id} --challenge-name NEW_PASSWORD_REQUIRED --challenge-responses USERNAME=${user_name},NEW_PASSWORD=${new_password} --session ${session}
5. IdTokenの取得
手順4のユーザのパスワード再設定後にIdToken等が返却されるかと思いますが、デフォルトの設定ではIdTokenは1時間で有効期限が切れますので、再取得する必要があります。 そんな時は、以下のコマンドで再取得できます。
aws cognito-idp admin-initiate-auth --user-pool-id <user_pool_id> --client-id <client_id> --auth-flow ADMIN_NO_SRP_AUTH --auth-parameters USERNAME=<user_name>,PASSWORD=<password>
APIの構築
今回はAWS SAMを使います。SAMでのカスタムオーソライザーの環境構築は、弊社岩田がとても良い記事を書いているのでこちらをご参考ください。
最終的に今回作成する、リポジトリの中身はこんな感じになります。
※authorizer.py
とauth_util.py
については、後ほど解説します。
$ tree -L 1 . ├── authorizer.py ├── auth_util.py ├── hello.py ├── libs ├── output.yaml ├── README.md ├── requirements.txt └── template.yaml
SAMのテンプレートtemplate.yaml
はこんな感じ。
AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: LambdaAuthorizer Test Globals: Function: Runtime: python3.6 Timeout: 5 Environment: Variables: PYTHONPATH: /var/runtime:/var/task/libs Parameters: UserPoolId: Type: String Description: CognitoのユーザプールID ClientId: Type: String Description: Cognitoに登録されているクライアントID IpWhiteList: Type: String Description: IPアドレスのホワイトリスト (カンマ区切り) Resources: Hello: Type: AWS::Serverless::Function Properties: CodeUri: . Handler: hello.lambda_handler Runtime: python3.6 Authorizer: Type: AWS::Serverless::Function Properties: CodeUri: . Handler: authorizer.lambda_handler Runtime: python3.6 Environment: Variables: USER_POOL_ID: !Sub ${UserPoolId} CLIENT_ID: !Sub ${ClientId} IP_WHITE_LIST: !Sub ${IpWhiteList} LambdaPermissionHello: Type: "AWS::Lambda::Permission" Properties: Action: lambda:InvokeFunction FunctionName: !Ref Hello Principal: apigateway.amazonaws.com LambdaPermissionAuthorizer: Type: "AWS::Lambda::Permission" Properties: Action: lambda:InvokeFunction FunctionName: !Ref Authorizer Principal: apigateway.amazonaws.com HelloAPI: Type: AWS::Serverless::Api Properties: StageName: Dev DefinitionBody: swagger: "2.0" info: version: "1.0.0" title: "lambda authorizer test" basePath: "/" schemes: - "http" paths: /: get: summary: "lambda authorizer test" description: "lambda authorizer test" produces: - "application/json" responses: "200": description: "successful operation" x-amazon-apigateway-integration: uri: Fn::Sub: arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${Hello.Arn}/invocations passthroughBehavior: "when_no_match" httpMethod: "POST" contentHandling: "CONVERT_TO_TEXT" type: "aws_proxy" responses: default: statusCode: "200" security: - authorizer: [] securityDefinitions: authorizer: type: apiKey name: Authorization in: header x-amazon-apigateway-authtype: custom x-amazon-apigateway-authorizer: type: request identitySource: method.request.header.Authorization authorizerUri: Fn::Sub: arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${Authorizer.Arn}/invocations
カスタムオーソライザーでのIdTokenの検証
ここからは、実際のソースコードとなる以下の3つファイルを編集していきます。
authorizer.py
: カスタムオーソライザーで呼び出されるLambda Functionauth_util.py
: カスタムオーソライザーがインポートするファイル。カスタムオーソライザーがAPI Gatewayに返却するレスポンスを生成してくれるhello.py
: 実際のビジネスロジックが含まれるLambda Function-
参考サイト
- https://aws.amazon.com/jp/premiumsupport/knowledge-center/decode-verify-cognito-json-token/
- https://github.com/awslabs/aws-support-tools/tree/master/Cognito/decode-verify-jwt
- https://github.com/awslabs/aws-apigateway-lambda-authorizer-blueprints
authorizer.py
の編集
上記参考サイト2のdecode-verify-jwt.py
をベースに編集していきます。Lambdaの戻り値の形式などは、上記の参考サイト3を一部利用しています。
ここで行っているのは、IdTokenの検証とIP制限の処理です。
今回、JWTの仕組みについての詳しい解説はしませんが、ソースコードの中に可能な範囲でコメントを入れていますのでご勘弁ください。
from botocore.vendored import requests import os import json import time import jose from http import HTTPStatus from libs.jose import jwk, jwt from libs.jose.utils import base64url_decode from auth_util import AuthPolicy region = 'ap-northeast-1' client_ids = os.getenv('CLIENT_ID') user_pool_id = os.getenv('USER_POOL_ID') ip_white_list = os.getenv('IP_WHITE_LIST').split(',') # IdTokenの署名に使われた秘密鍵と対になる公開鍵の情報が含まれるエンドポイント keys_url = 'https://cognito-idp.{}.amazonaws.com/{}/.well-known/jwks.json'.format( region, user_pool_id) def lambda_handler(event, context): try: token = event['headers']['Authorization'] tmp = event['methodArn'].split(':') awsAccountId = tmp[4] apiGatewayArnTmp = tmp[5].split('/') policy = AuthPolicy('', awsAccountId) policy.restApiId = apiGatewayArnTmp[0] policy.region = tmp[3] policy.stage = apiGatewayArnTmp[1] # Tokenの有効性を確認 result = is_valid_token(token) if result['is_valid_token'] is False: # API Gatewayの呼び出しを拒否 policy.denyAllMethods() authResponse = policy.build() authResponse['context'] = { 'message': result['msg'] } return authResponse # IP制限 if event['requestContext']['identity']['sourceIp'] not in ip_white_list: # API Gatewayの呼び出しを拒否 policy.denyAllMethods() authResponse = policy.build() authResponse['context'] = { 'message': result['msg'] } return authResponse # API Gatewayの呼び出しを許可 policy.allowAllMethods() authResponse = policy.build() authResponse['context'] = { 'user_id': result['claims']['sub'] } return authResponse # JWTのデコードエラーが発生した場合の処理 except jose.exceptions.JWTError as e: policy.denyAllMethods() authResponse = policy.build() authResponse['context'] = { 'message': 'JWT error when decoding token' } return authResponse except Exception as e: raise e def is_valid_token(token): headers = jwt.get_unverified_headers(token) kid = headers['kid'] res_cognito = requests.get(keys_url) # エンドポイントが見つからなかった場合の処理 if res_cognito.status_code != HTTPStatus.OK: msg = 'Http request to cognito jwks endpoint failed' return {'is_valid_token': False, 'msg': msg, 'claims': None} keys = json.loads(res_cognito.text)['keys'] key_index = -1 for i in range(len(keys)): if kid == keys[i]['kid']: key_index = i break # エンドポイントから公開鍵が見つからなかった場合の処理 if key_index == -1: msg = 'Public key not found in jwks.json' return {'is_valid_token': False, 'msg': msg, 'claims': None} public_key = jwk.construct(keys[key_index]) message = str(token).rsplit('.', 1)[0].encode('utf-8') encoded_signature = str(token).rsplit('.', 1)[1].encode('utf-8') decoded_signature = base64url_decode(encoded_signature) # JWTの署名チェックが失敗した場合の処理 if not public_key.verify(message, decoded_signature): msg = 'Signature verification failed' return {'is_valid_token': False, 'msg': msg, 'claims': None} claims = jwt.get_unverified_claims(token) # JWTの有効期限が切れていた場合の処理 if time.time() > claims['exp']: msg = 'Token is expired' return {'is_valid_token': False, 'msg': msg, 'claims': None} # audクレームが想定された値でない場合の処理 # CognitoのJWTのaudクレームには、認証されたユーザーで使用されるclient_idが含まれる if claims['aud'] not in client_ids: msg = 'Token was not issued for this audience' return {'is_valid_token': False, 'msg': msg, 'claims': None} return {'is_valid_token': True, 'msg': 'Signature successfully verified', 'claims': claims}
auth_util.py
の編集
上記の参考サイト3のblueprints/python/api-gateway-authorizer-python.py
のHttpVerbクラスとAuthPolicyクラスがそのまま使えるので、これらをauth_util.py
にコピーします。
import re class HttpVerb: GET = "GET" POST = "POST" PUT = "PUT" PATCH = "PATCH" HEAD = "HEAD" DELETE = "DELETE" OPTIONS = "OPTIONS" ALL = "*" class AuthPolicy(object): awsAccountId = "" # 以下省略
hello.py
の編集
eventを出力し、ステータス200とhelloを返すだけのシンプルなLambdaを作成します。
def lambda_handler(event, context): print(event) return { "statusCode": 200, "body": 'hello' }
必要なモジュールのインストール
今回必要になるモジュールはpython-jose 3.0.0です。
pip install python-jose -t libs/
python-jose 3.0.0はPython3.6の実行環境においてJWTの署名チェックが行えるモジュールで、JWTのツールやドキュメントなどに関する専門的なウェブサイトJWTにて詳細が確認できます。
※ 2018年7月17時点
デプロイ&動作確認
以上で、下処理は終了です。さっそくデプロイして、動作確認を行っていきたいと思います。
- デプロイ SAMのテンプレートからデプロイを行っていきます。
aws cloudformation package --template-file template.yaml --output-template-file output.yaml --s3-bucket <s3_bucket_name> aws cloudformation deploy --template-file output.yaml --stack-name lambda-authorizer-test --capabilities CAPABILITY_IAM --parameter-overrides UserPoolId=<user_pool_id> ClientId=<client_id> IpWhiteList=<ip_white_list>
- 動作確認 curlでリクエストを送ってみます。
$curl https://<domain>/<path> -H "Authorization: <IdToken>" hello
ということで、レスポンスにhelloが返ってきて、無事認可処理が実装できました。 誤ったリクエストを送信すると、以下のようなエラーが返ってきます。
$curl https://<domain>/<path> -H "Authorization: hogehoge" {"message":null}
レスポンス形式をカスタマイズしたい場合は、上記で紹介したブログを参考にしていただければと思います。 また、必要なタイミングでログ出力しながらCloudWatch Logsに吐き出されたログを確認することで、より具体的な認可処理の流れるが理解できるかと思います。
まとめ
いかがだったでしょうか。CognitoのIdTokenの署名チェックのやり方は探しても結構見つかりづらかったので、どなたかの役に立てば幸いです。 また、今回あまり触れることのできなかったCognitoのJWT署名方式について、どこかのタイミングでブログを書きたいと思っています。