Lambda(Python)でAuth0のトークンを検証する #Auth0

PythonでAuth0のトークンの検証処理を実装する方法をauth0-pythonを使う方法と、python-joseを使う方法併せて2通り解説します。
2020.10.29

バックエンドがPythonで認証にAuth0を使う場合は、auth0-pythonというSDKが用意されています。
これを使うことでトークンの検証処理が非常に簡単に実装することができます。

auth0-pythonで検証

動作確認環境

  • Python: 3.8.5
  • auth0-python: 3.13.0
  • pyjwt: 1.7.1

インストール

以下のコマンドでインストールします。

$ pip install auth0-python
$ pip install PyJWT

auth0-pythonがpyjwtに依存しているため、pyjwtもインストールする必要があります。

サンプルコード

auth-handler.py

from auth0.v3.authentication.token_verifier import TokenVerifier, AsymmetricSignatureVerifier

import json
import logging
import os
logger = logging.getLogger()
logger.setLevel('DEBUG')


def handler(event, context):
    try:
        logging.info(event)

        auth = Auth()
        token = auth.get_token(event)  # eventからトークン取得

        if token is None:
            logging.error('No token.')
            result = auth.generate_policy('sample', 'Deny', '*')
            return result
        logging.info(token)

        resource = event['methodArn']

        # トークン検証
        result = auth.verify(token, resource)
        return result

    except Exception as e:
        logging.error(e)
        raise e
        


class Auth:

    def get_token(self, event):
        token = event['authorizationToken']
        return token.split(' ')[1]

    def generate_policy(self, principal_id, effect, resource):
        return {
            'principalId': principal_id,
            'policyDocument': {
                'Version': '2012-10-17',
                'Statement': [
                    {
                        "Action": "execute-api:Invoke",
                        "Effect": effect,
                        "Resource": resource
                    }
                ]
            }
        }

    def verify(self, token, resource):
        domain = os.getenv('AUTH0_DOMAIN')
        jwks_url = f'https://{domain}/.well-known/jwks.json'
        issuer = f'https://{domain}/'
        audience = os.getenv('AUTH0_AUDIENCE') # Auth0側のAPIを作成した時に指定したaudience

        try:
            signature_verifier = AsymmetricSignatureVerifier(jwks_url)
            token_verifier = TokenVerifier(
                signature_verifier=signature_verifier,
                issuer=issuer,
                audience=audience)
            token_verifier.verify(token)

            # トークンが有効であれば、適切なIAMポリシーを返す
            return self.generate_policy('sample', 'Allow', resource)
        except Exception as error:
            logging.error(error)
            # トークンが無効であれば、アクセスを拒否するIAMポリシーを返す
            return self.generate_policy('sample', 'Deny', '*')

API GatewayのカスタムオーソライザーでLambdaイベントペイロードをトークンにした場合、リクエストのヘッダーに付与されたトークンがLambdaのeventの authorizationToken に渡されます。
eventの中身は以下の通りです。

event = {
            'type': 'TOKEN',
            'methodArn': 'arn:aws:execute-api:ap-northeast-1:xxxxx:yyyyy/stage/GET/items',
            'authorizationToken': 'Bearer xxxxxx'
}

auth0-pythonで検証できるのはIDトークンのみ

こちらにも書かれているように、auth0-pythonで検証できるのはIDトークンに限られます。アクセストークンの検証には使えません。
アクセストークンを検証したい場合はJWTのデコード処理をJWT用のライブラリで実装する必要があります。

python-joseで検証

ここでは一例としてpython-joseでトークンの検証をしてみます。

動作確認環境

  • Python: 3.8.5
  • python-jose: 3.2.0

インストール

以下のコマンドでインストールします。

$ pip install python-jose

サンプルコード

auth-handler.py

from jose import jwt
from urllib.request import urlopen
import json

def get_rsa_key(token):
    """
    ヘッダーのkidと一致する公開鍵を取得する
    """
    jsonurl = urlopen(jwks_url)
    jwks = json.loads(jsonurl.read())
    unverified_header = jwt.get_unverified_header(token)

    for key in jwks["keys"]:
        if key["kid"] == unverified_header["kid"]:
            rsa_key = {
                "kty": key["kty"],
                "kid": key["kid"],
                "use": key["use"],
                "n": key["n"],
                "e": key["e"]
            }
            return rsa_key
    

def verify(self, token, resource):
    """
    トークンを検証する
    """
    domain = os.getenv('AUTH0_DOMAIN')
    jwks_url = f'https://{domain}/.well-known/jwks.json'
    issuer = f'https://{domain}/'
    audience = os.getenv('AUTH0_AUDIENCE')

    rsa_key = get_rsa_key(token)

    try:
        payload = jwt.decode(
            token,
            rsa_key,
            algorithms='RS256',
            audience=audience,
            issuer=issuer)
        logging.debug(payload)

        return self.generate_policy('sample', 'Allow', resource)
    except Exception as error:
        logging.error(error)
        return self.generate_policy('sample', 'Deny', '*')

JWTのデコードのため、https://{domain}/.well-known/jwks.jsonから公開鍵を取得する必要があります。
jwt.decode()で指定したaudienceもしくはissuerが一致しなかった場合は、デコード自体に失敗してDenyのポリシーが返却されます。
権限チェック等の処理を追加する場合はデコードの後に追加します。