この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
はじめに
データアナリティクス事業本部ビッグデータチームのyosh-kです。
今回はAPI GatewayをトリガーとしてLambdaを実行し、Redshift Serverlessの対象tableのデータを参照する一連の流れをServerless Frameworkで実装したいと思います。
構成図
簡単な構成図としては以下になります。 今回は、リクエストボディに対象tableを指定するため、POSTメソッドでリクエストします。また、リクエストの際は、API Keyの指定を必須とします。
前提条件
- Redshift serverlessが事前に構築されていることとします。 まだの方は以下ブログの「Redshift Serverless構築」項目を参考にしてみてください。今回も以下ブログで構築したflightdata tableを使用します。
- AWS CLIでAWS環境にアクセスできることとします。まだの方は公式サイト等を参考に設定してみてください。
IICS CDI Amazon Redshift V2 ConnectorでRedshift Serverlessと接続できるか試してみた
事前準備
Node.jsのインストール
Serverless FrameworkはNode.jsで作られたCLIツールですので、Node.jsをインストールする必要があります。
以下のサイトの内容を参考にインストールしてみてください。
【Node.js入門】各OS別のインストール方法まとめ(Windows,Mac,Linux…)
インストール完了後にインストールされていることを確認できたらOKです。
(blog_env) kasama.yoshiki 10_api_gateway_rest_lambda_serverless_framework % node -v
v18.3.0
(blog_env) kasama.yoshiki 10_api_gateway_rest_lambda_serverless_framework %
Serverless Frameworkのインストール
Serverless Frameworkはnpmのパッケージとして公開されています。ターミナル上で、インストールコマンドを実行します。
npm install -g serverless
インストールが完了したらバージョンを確認します。注意点として、Serverless Frameworkのバージョンが変わると設定の方法が変更される場合があります。 最新のバージョンでエラーとなる場合は、下記バージョンに変更して試してみてください。
(blog_env) kasama.yoshiki 10_api_gateway_rest_lambda_serverless_framework % serverless --version
Running "serverless" from node_modules
Framework Core: 3.26.0 (local) 3.21.0 (global)
Plugin: 6.2.2
SDK: 4.3.2
(blog_env) kasama.yoshiki 10_api_gateway_rest_lambda_serverless_framework %
Serverless pluginインストール
今回のserverless.ymlで指定をしているServerles pluginをインストールします。
sls plugin install -n serverless-python-requirements
sudo npm install serverless-aws-response-schema-plugin
パラメーターストアへのデータベース名の登録
Redshift Serverlessのデータベース名をAWSのセッションマネジャーから取得するようにしたいため、AWS Management Consoleからパラメーターストアへアクセスし、データベース名をValueとするパラメータを作成します。
実装
実際に作成したディレクトリ構成が以下になります。
(blog_env) kasama.yoshiki@10_api_gateway_rest_lambda_serverless_framework % tree
├── README.md
├── lambda_function.py
├── models
│ ├── redshift_select_input.json
│ └── redshift_select_output.json
├── node_modules
├── package-lock.json
├── package.json
├── requirements.txt
├── serverless.yml
1644 directories, 14240 files
(blog_env) kasama.yoshiki@10_api_gateway_rest_lambda_serverless_framework %
README.mdファイルは実行手順を記載したものなので、作成しなくても問題ないです。実際に自身で作成するのは以下になります。それ以外のフォルダ、ファイルに関しては、コマンド実行時に自動で作成されるものになります。
- lambda_function.py
- modelsフォルダとそのファイル
- requirements.txt
- serverless.yml
※ ソースコードはgithubに格納したので参考にしてみてください。
lambdaファイルの実装
実際にpythonで実装したlambdaファイルになります。
import os
import time
import json
import boto3
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.typing import LambdaContext
logger = Logger()
def get_db_credential(input_parameter_name):
ssm = boto3.client('ssm')
response = ssm.get_parameter(
Name=input_parameter_name, WithDecryption=True)
return response['Parameter']['Value']
# Redshift接続情報
WORK_GROUP_NAME = os.environ['WORK_GROUP_NAME']
DB_NAME = get_db_credential(os.environ['DB_NAME'])
class LambdaException(Exception):
def __init__(self, status_code: int, error_msg: str):
self.status_code = status_code
self.error_msg = error_msg
def __str__(self):
obj = {
"Status": self.status_code,
"ErrorReason": self.error_msg
}
return json.dumps(obj)
class BadRequestException(LambdaException):
def __init__(self, error_msg: str):
super().__init__(400, error_msg)
class InternalServerErrorException(LambdaException):
def __init__(self, error_msg: str):
super().__init__(500, error_msg)
# selectするメソッド
# ======================================
# Parameters.
# target_table. : select対象テーブル
# --------------------------------------
def execute_select_query(target_table):
redshift_client = boto3.client('redshift-data')
select_query = "SELECT count(*) FROM public.\"%s\"" % (
target_table)
print(select_query)
result = redshift_client.execute_statement(
WorkgroupName=WORK_GROUP_NAME,
Database=DB_NAME,
Sql=select_query,
)
start_time = time.time()
# 実行IDを取得
id = result['Id']
# クエリが終わるのを待つ
statement = ''
status = ''
while status != 'FINISHED' and status != 'FAILED' and status != 'ABORTED':
statement = redshift_client.describe_statement(Id=id)
status = statement['Status']
print("Status:", status)
time.sleep(1)
end_time = time.time()
print('process_time:', end_time-start_time)
logger.info(json.dumps(statement, indent=4, default=str))
# 結果の表示
try:
statement = redshift_client.get_statement_result(Id=id)
logger.info(json.dumps(statement, indent=4, default=str))
return statement["Records"][0][0]["longValue"]
except:
if status == 'FAILED' or status == 'ABORTED':
raise BadRequestException(f"{statement}")
@logger.inject_lambda_context(log_event=True)
def handler(event, context: LambdaContext):
try:
body = event['body']
result_value = execute_select_query(
body["target_table"])
return {
'Status': 200,
'body': json.dumps(
{
'Status': 'success',
'result_value': result_value,
'ErrorReason': 'None'
}
)
}
except Exception as e:
logger.error(e)
if e.__class__.__name__ == 'BadRequestException':
raise
raise InternalServerErrorException(str(e))
AWSのSSMのパラメーターストアからDB情報を取得し、eventの中のtable名に基づいて、データ件数を取得するsqlを実行しています。
modelsフォルダとそのファイル
modelsフォルダには、リクエスト時のvalidationとして指定するmodelと、レスポンスデータのvalidationとして指定するmodelを作成します。今回はカスタム統合Proxyを選択するため、このような指定をしていますが、プロキシ統合の場合は、設定は不要となります。
redshift_select_input.json
{
"properties" : {
"target_table" : {"type" : "string"}
}
}
redshift_select_output.json
{
"properties" : {
"status": {
"type": "string"
},
"result_value": {
"type": "string"
},
"ErrorReason": {
"type": "string"
}
}
}
リクエスト時のinputではsql文を実行する対象tableを指定し、レスポンスデータとしては、そのresult valueとstatusを取得します。
requirements.txt
AWS Lambda 関数用のロギングなどに使用するライブラリを使用します。
aws-lambda-powertools
serverless.yml
以下が作成したserverless.ymlになります。
service: redshift-select ## Cloudformationのstack nameを設定
frameworkVersion: '3'
provider: ## providorを指定
name: aws
runtime: python3.9
lambdaHashingVersion: 20201221
region: us-east-1
endpointType: REGIONAL
apiGateway:
resourcePolicy:
- Effect: Allow
Principal: '*'
Action: execute-api:Invoke
Resource:
- execute-api:/*/*/*
apiKeys:
- free: # 使用量プラン
- name: ${self:service}-key # key 名
value: <30-128文字の任意の英数字>
usagePlan:
- free:
quota:
limit: 100 # API の呼び出しを行える最大回数
offset: 0 # API の呼び出し回数の初期値(通常は 0 回を指定する)
period: DAY # DAY or WEEK or MONTH
throttle:
rateLimit: 2 # 1 秒あたりに処理できる API リクエスト数
burstLimit: 3 # 同時に処理できる最大リクエスト数
package: ## lambda上にデプロイする際に除外するファイルを!で指定
patterns:
- "!.vscode/**"
- "!.git/**"
- "!.gitignore"
- "!.serverless"
- "!.serverless/**"
- "!README.md"
- "!deploy-shell/**"
- "!package*.json"
- "!requirements.txt"
- "!node_modules/**"
- "!__pycache__"
- "!yarn.lock"
functions:
redshift_select:
name: ${self:service}-handler ## lambda関数名
handler: lambda_function.handler ## 実行される関数を指定
role: LambdaRole ## lambdaに紐づけられるロール
memorySize: 128 ## lambdaのメモリサイズ
timeout: 30 ## lambdaのタイムアウト時間
environment: ## 環境変数
WORK_GROUP_NAME: <Redshift Serverlessのワークグループネーム>
DB_NAME: <パラメーターストアのデータベース名の格納パス>
layers: ## lambdaに紐づくレイヤーを指定
- Ref: PythonRequirementsLambdaLayer ## Layerを参照
events: ## lambda関数のトリガーを指定
- http:
path: /redshift_select
method: post
private: true # API Keyが必要か否かをセットしている
integration: lambda
request: ## リクエスト時のvalidationを指定
schemas:
application/json:
schema: ${file(models/redshift_select_input.json)}
name: RedshiftSelectInput
description: 'RedshiftSelectInput'
response: ## レスポンス時の統合レスポンスを指定
headers:
Content-Type: "'application/json'"
statusCodes:
200:
pattern: '' # JSON response
template: $input.path("$.body") # JSON return object
headers:
Content-Type: "'application/json'"
400:
pattern: '.*"Status": 400,.*'
template: $input.path("$.errorMessage")
headers:
Content-Type: "'application/json'"
500:
pattern: '.*"Status": 500,.*'
template: $input.path("$.errorMessage")
headers:
Content-Type: "'application/json'"
responseSchemas: ## レスポンス時のメソッドレスポンスを指定
200:
application/json: ${file(models/redshift_select_output.json)}
400:
application/json: ${file(models/redshift_select_output.json)}
500:
application/json: ${file(models/redshift_select_output.json)}
# カスタム変数が定義可能
custom:
accountid: ${AWS::AccountId}
pythonRequirements:
dockerizePip: false ## python以外で作られているライブラリを使用する時はtrueに
usePipenv: false ## Pipenvを使用する場合にtrueに
layer: true ## ライブラリからLambda Layerを作成するオプション
useDownloadCache: true ## pip がパッケージをコンパイルするために必要なダウンロードをキャッシュするダウンロード キャッシュ
useStaticCache: true ## requirements.txtのすべてをコンパイルした後に pip の出力をキャッシュする静的キャッシュ
# region: us-east-1
plugins:
- serverless-python-requirements
- serverless-aws-response-schema-plugin
resources:
Description: Redshift select internal API
Resources:
LambdaRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Action:
- sts:AssumeRole
Effect: Allow
Principal:
Service:
- "lambda.amazonaws.com"
ManagedPolicyArns:
- arn:aws:iam::aws:policy/AmazonSSMReadOnlyAccess
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
- arn:aws:iam::aws:policy/AmazonRedshiftFullAccess
簡単に上から順にみていきます。
- service:
- CloudFormationのスタック名を定義します。
- frameworkVersion:
- Serverlss Frameworkのバージョンを定義します。
- provider: どのクラウドサービスを使用してSereverless Frameworkを動かすかということを定義します。AWS, GCP, Azure, IBM Cloudなど様々なクラウドプロバイダーに対応しています。
- endpointType: API Gatewayのエンドポイントタイプを定義します。デフォルトでは、エッジ最適化エンドポイントですが、今回はリージョンエンドポイントを定義します。
- 参考: API Gateway API に対してセットアップするエンドポイントタイプを選択する
- apiGateway: apiGatewayのresourcePolicy、apiKeys、usagePlanを定義します。
- resourcePolicy: API Gatewayの許可ポリシーを定義して、リソースを制限します。今回は特に制限は行わない条件で定義します。
- apiKeys: リクエストの際にAPI Keyを使用する場合は、定義します。今回は新規でAPI Keyを作成し、リクエスト時に利用します。
- usagePlan: リクエストの使用量プランを定義します。
- package: lambda上にデプロイする際に除外するファイルを!で定義します。
- functions: lambda関数の設定を定義しています。
- events: Lambda関数にトリガーとなるイベントを紐づけています。
- http: 部分が API GatewayのREST APIをイベントとして定義するという意味になります。HTTP API を利用したい場合は「httpApi」と定義します。
- 参考: REST API と HTTP API 間で選択する
- path: リクエストパスを指定します。
- method: RESTAPIのメソッド(GET, POST PUT, PATCH, DELETE)を定義します。GETでも良いかと思いましたが、GETではリクエストBODYがサポートされていなかったのでPOSTにしています。
- integration: デフォルトでは、lambda-proxyとなっていますが、今回は、リクエストレスポンスを制御したいため、lambda(非プロキシ)を定義します。
- 参考: API Gateway で Lambda プロキシ統合を設定する
- 参考: API Gateway で Lambda 統合を設定する
- request: リクエストボディのvalidationを定義します。validationの内容は、modelsフォルダのredshift_select_input.jsonを参照しています。
- response: レスポンス時の統合レスポンスを定義します。Lambdaからreturnされるレスポンス形式と、メソッドレスポンスの形式が異なる場合、結果をそのまま渡すか、データをメソッドレスポンスに合わせて変換することができる定義です。今回はlambda関数から200,400,500のステータスコード以外は受け取らない設定にしたので、その3種類の定義をしています。
- 参考: API Gateway で統合レスポンスを設定する
- 参考: API Gateway + REST + Lambda 非プロキシ(カスタム)統合でステータス200以外返ってこない
- responseSchemas: レスポンス時のメソッドレスポンスを定義します。統合レスポンスデータとマッピングでき、マッピングに従ってAPI Gatewayのレスポンスデータをreturnすることができる定義です。このマッピングを行うことで、必要な値のみのreturnや静的な値のreturnなどを行うことができます。
- 参考: API Gateway のメソッドレスポンスをセットアップする
- custom: カスタム変数を定義しています。
- pythonRequirements: requirements.txtに記載されているライブラリをダウンロードする際の設定等を定義しています。
- layer: trueと設定されることで、ライブラリからlayerを作成します。このlayerをfunctions.layers.Refで参照しています。
- plugins: 今回使用するServerless Frameworkのプラグインを定義します。
- serverless-python-requirements: 外部モジュールをデプロイするときに使用するpluginで、requirements.txtに記載のライブラリも一緒にzipファイルにまとめてデプロイできます。
- serverless-aws-response-schema-plugin: レスポンス時のメソッドレスポンスを追加するためのpluginです。
- resources: AWSのインフラリソースを定義します。resources以下はCloudFormationと同じ記述が可能です。
- 参考: AWS Infrastructure Resources
- Description: CloudFormationの説明を定義します。
- Resources: CloudFormationのスタックに含めるAWSリソースを定義します。
- LambdaRole: Lambdaに紐づけるIAMロールを定義します。
- Properties: リソースの設定を定義します。
- AssumeRolePolicyDocument: IAMロールの信頼関係を定義します。
- ManagedPolicyArns: IAMロールに紐づく許可ポリシーを定義します。
デプロイ実行
Serverless Frameworkでのデプロイ
ローカル上で、以下のコマンドを実行してデプロイします。
AWS_SDK_LOAD_CONFIG=true AWS_PROFILE={AWS環境にアクセスするProfile} sls deploy
AWS SDK for JavaScript のドキュメントによると「AWS_SDK_LOAD_CONFIG=true」に設定されている場合、ロード時に config ファイルを自動的に検索します。
リージョン設定の優先順位
問題なくコマンドが終了していることが確認できればOKです。
running "serverless" from node_modules
Deploying redshift-select to stage dev (us-east-1)
MFA code for arn:aws:iam::<AWS ARN>:mfa/<role-name>:
✔ Service deployed to stack redshift-select-dev (80s)
api keys:
redshift-select-key: <api-key-value>
endpoint: POST - https://execute-api.us-east-1.amazonaws.com/dev/redshift_select
functions:
redshift_select: redshift-select-handler (1.9 kB)
layers:
pythonRequirements: arn:aws:lambda:us-east-1:aws-account:layer:redshift-select-dev-python-requirements:1
1 deprecation found: run 'serverless doctor' for more details
Need a better logging experience than CloudWatch? Try our Dev Mode in console: run "serverless --console"
デプロイ結果確認
AWS Management Console上でデプロイされた資産を確認します。
CloudFormationを確認すると作成したスタックが存在することがわかります。
CloudFormationのリソースを見ると、API Gateway、Lambda、CloudWatch、ServerlessFrameworkの設定ファイルを格納するS3などが作成されていることがわかります。
API Gatewayをみると、メソッドリクエスト、統合リクエスト、Lambda、統合レスポンス、メソッドレスポンス、API Key、使用量プランなども定義通り設定されています。
LambdaにもAPI Gatewayがトリガーとして紐づけられています。
Lambdaに紐づけられたIAMロールも定義通りのため問題ないですね。
API実行
では、作成したAPIの実行結果を確認していきます。今回対象とするtableのデータ件数は118件なのでそれと一致すればOKですね。
先ほどのAPI Gatewayの画面でテストボタンがあるのでクリックします。
するとテスト用の値を入力できる画面になるので、クエリ文字列とヘッダーは入力せずにリクエスト本文に以下のjson形式で入力し「テスト」ボタンを押下します。
{
"target_table": "<対象table名>"
}
result_valueが118件で返ってきたので問題なさそうですね!
外部からのリクエストも確認するため、Talend API Testerで試してみます。 METHODはPOSTで、URLは以下を指定し、パスには今回定義した「redshift_select」を設定します。あとは、HEADERに「API Key」と「Content-Type」を入力し、BODYに先ほどと同様の値を入力し、実行します。
こちらも問題なさそうですね!
最後に
Serverless Frameworkの設定は最初の頃はかなり苦労したので、その内容を書き留めておくことで、同じような方の手助けになれば幸いです。