この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
概要
この記事では、AWS CDKで同期エクスプレスステートマシンを使用して API Gateway REST API を作成しました。ここでは、StepFunctionsを呼び出すためのHTTP「ANY」メソッドAPIを実装しました。StepFunctionsRestApiコンストラクトを使用して、ステートマシンをAPI Gatewayに接続しました。HTTPS リクエストが API メソッドに送信されると、API Gateway はStep Functions API アクションを呼び出します。
やってみた
CDKアプリの作成
- 次のコマンドを使用してCDKをインストールしておきます。
python -m pip install aws-cdk-lib
- 新しいディレクトリを作成しておきます。
- CDKは、プロジェクトディレクトリの名前に基づいてソースファイルとクラスに名前を付けます。
#create new directory
mkdir api-stepfunctions
cd api-stepfunctions
- cdk initコマンドを使用してアプリを初期化しておきます。
cdk init --language python
- 次の2つのコマンドを実行して、アプリのPython virtual environmentをアクティブ化し、CDK core dependenciesをインストールしておきます。
source .venv/bin/activate
pip install -r requirements.txt
サービスの作成
- 新しいファイル [api_stepfunctions/services.py] を作成して、作成する必要のあるAWSサービスを定義しておきます。
ステートマシンを定義する
- パス状態でMachine Definition を定義しておきます。
invoke_state = stepfunctions.Pass(self,"InvokeState",
result = stepfunctions.Result("API Gateway Invokes Step Functions"))
success_state = stepfunctions.Pass(self,"SuccessState",
result = stepfunctions.Result("Successfully invoked"))
machine_definition = invoke_state.next(success_state)
- Machine Definition とEXPRESSタイプでステートマシンを定義しておきます。
- StepFunctionsRestApiは同期Expressステートマシンのみを許可するため、ステートマシンタイプはEXPRESSとして設定されます。
state_machine = stepfunctions.StateMachine(self, 'StateMachine',
definition = machine_definition,
state_machine_name = "ApiStepFunctions",
state_machine_type = stepfunctions.StateMachineType.EXPRESS)
REST APIを定義する
- StepFunctionsRestApiコンストラクトを使用してAPIGateway RESTAPIを作成します。
api = apigateway.StepFunctionsRestApi(self, "StepFunctionsApi",
deploy=True,
state_machine=state_machine)
完全なコード
- すべてのサービスが定義された完全なコードです。
import aws_cdk as cdk
from constructs import Construct
from aws_cdk import (
aws_stepfunctions as stepfunctions,
aws_apigateway as apigateway
)
class serviceStack(Construct):
def __init__(self, scope: Construct, id: str):
super().__init__(scope, id)
invoke_state = stepfunctions.Pass(self,"InvokeState",
result = stepfunctions.Result("API Gateway Invokes Step Functions"))
success_state = stepfunctions.Pass(self,"SuccessState",
result = stepfunctions.Result("Successfull invoked"))
machine_definition = invoke_state.next(success_state)
state_machine = stepfunctions.StateMachine(self, 'StateMachine',
definition = machine_definition,
state_machine_name = "ApiStepFunctions",
state_machine_type = stepfunctions.StateMachineType.EXPRESS)
api = apigateway.StepFunctionsRestApi(self, "StepFunctionsApi",
deploy=True,
state_machine=state_machine)
アプリにサービスを追加する
- /api_stepfunctions/api_stepfunctions_stack.py ファイルに次のコードを追加しておきます。
#Import the Service file created in the previou step
from . import services
#service_file.constructor
services.serviceStack(self, "api-invoke-step-functions")
CDK Deploy
- Deploy する前に、環境をブートストラップする必要があります。
- 次のコマンドを実行して、AWS環境をブートストラップしておきます。
cdk bootstrap
- 次のコマンドを使用してCDKを展開しておきます。
cdk deploy
- コンソールでは、サービスが作成されたことを見ることができます。
State Machine
State Machine Definition
API Gateway
APIをテストする
- API Gatewayのコンソールで、APIを選択して、Resources ペインで、テストするメソッドを選択しておきます。
- Method Execution ペインで、Testを選択しておきます。
- POSTメソッドを選択して、Testをクリックしておきます。
- Step Functionsの出力を見ることができます。
まとめ
AWS CDK でStep FunctionsのAPIを作成してみました。
Reference :