AppSync Events で Lambda をデータソース統合しつつ、Powertools for AWS Lambda (Python) でイベントを処理してみる
先日、AppSync Events がデータソース統合をサポートしました。
データソース統合を行うことで、WebSocket 上で発生したイベントを DynamoDB に保存したり、Lambda や Bedrock などに連携することが可能です。
元々 AppSync Events では専用の JavaScript ランタイム (APPSYNC_JS) を利用したイベント処理を行うことが可能でした。
AWS AppSync を使用すると、AWS AppSync の JavaScript ランタイム (APPSYNC_JS) で実行されるコードを使用して、サービスで発生する特定のトリガーに応答できます。
https://docs.aws.amazon.com/appsync/latest/eventapi/runtime-reference-overview.html
一方、Lambda 統合を行うことで Python なども含めた柔軟な実行環境でイベントを処理することが可能です。
また、本アップデートと合わせて Powertools for AWS Lambda に AppSync Events 用の Event Handler が追加されており、こちらを利用することでよりシンプルにイベント処理を実装できるようになっています。
今回は Powertools for AWS Lambda (Python) を利用して、AppSync Events で Publish したイベントを扱ってみました。
試してみる
下記構成で試してみます。
Lambda 関数から作成していきます。
ランタイムは Python 3.13 を指定します。
Powertools for AWS Lambda をレイヤーとして追加します。
AWS レイヤーとして追加することも可能ですが、現時点では Powertools for AWS Lambda (Python) v3.9.0 に対応するバージョンしか選択できなかったので、ARN を直接指定します。
Powertools for AWS Lambda のドキュメントを参考に、arn:aws:lambda:ap-northeast-1:017000801446:layer:AWSLambdaPowertoolsPythonV3-python313-x86_64:13
を指定しました。
※ Powertools for AWS Lambda (Python) v3.11.0 に対応するバージョンを選択できるようになれば、AWS レイヤーでも構いません。
コードは下記のように設定しました。
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from aws_lambda_powertools.event_handler import AppSyncEventsResolver
if TYPE_CHECKING:
from aws_lambda_powertools.utilities.typing import LambdaContext
app = AppSyncEventsResolver()
@app.on_publish("/default/channel")
def handle_channel1_publish(payload: dict[str, Any]):
print(payload)
return {
"processed": True,
"original_payload": payload,
}
def lambda_handler(event: dict, context: LambdaContext):
return app.resolve(event, context)
チャンネル名を指定しながら on_publish
や on_subscribe
といったデコレータを記載するだけでイベントを扱うことが可能です。
チャンネル名を記述する際にはワイルドカードも利用でき、default 名前空間の全てのチャンネルに対して処理を行う場合、/default/*
のように書くことが可能です。
続いて、AppSync Event API 側の設定に移ります。
AppSync Event API を作成します。
データソースを作成します。
データソースタイプとして AWS Lambda
を選択して、先程作成した関数を指定します。
default 名前空間を選択します。
イベントハンドラーを作成します。
作成したデータソースをパブリッシュ設定として追加します。
呼び出しタイプは、同期処理となる RequestResponse
を選択します。
呼び出しタイプとして Event
も選択でき、こちらを選択すると Lambda 関数を非同期的に呼び出すことができます。
The Lambda data source allows you to define two invocation types: RequestResponse and Event. The invocation types are synonymous with the invocation types defined in the Lambda API. The RequestResponse invocation type lets AWS AppSync call your Lambda function synchronously to wait for a response. The Event invocation allows you to invoke your Lambda function asynchronously.
https://docs.aws.amazon.com/appsync/latest/eventapi/lambda-function-reference.html
AppSync サービスページの Pub/Sub エディタを利用してテストします。
パブリッシャー側からイベントを発行します。
Lambda によって追加要素が付与された状態でサブスクライバー側からイベントを確認できました!
また、Lambda 関数がエラー終了した場合は、パブリッシャー側にエラーが返り、サブスクライバー側にはイベントは送信されません。
バッチ処理の仕方について
Powertools for AWS Lambda を利用した際、各イベントを配列としてまとめて扱うことも、一つ一つ逐次的に扱うこともでき、デコレータの aggregate
属性で制御可能です。
デフォルト値は False で、各イベントを一つずつ処理します。
Working with single items | AppSync Events
先ほどは aggregate
属性を指定していなかったので、各イベントを一つずつ処理していました。
Lambda の中で print(payload)
のように定義していましたが、登録した時と同じ形で一つずつ出力されています。
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from aws_lambda_powertools.event_handler import AppSyncEventsResolver
if TYPE_CHECKING:
from aws_lambda_powertools.utilities.typing import LambdaContext
app = AppSyncEventsResolver()
@app.on_publish("/default/channel")
def handle_channel1_publish(payload: dict[str, Any]):
print(payload) # 標準出力
return {
"processed": True,
"original_payload": payload,
}
def lambda_handler(event: dict, context: LambdaContext):
return app.resolve(event, context)
aggregate=True
の場合は下記のような処理の流れになります。
Working with aggregated item | AppSync Events
Lambda 関数を下記のように修正して、試してみます。
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from aws_lambda_powertools.event_handler import AppSyncEventsResolver
if TYPE_CHECKING:
from aws_lambda_powertools.utilities.typing import LambdaContext
app = AppSyncEventsResolver()
@app.on_publish("/default/channel", aggregate=True)
def handle_channel1_publish(payload: list[dict[str, Any]]):
print("Original payload:", payload)
filtered_payload = [
item for item in payload
if not (
isinstance(item.get('payload'), dict) and
item['payload'].get('event') == 'dummy'
)
]
print("Filtered payload:", filtered_payload)
return filtered_payload
def lambda_handler(event: dict, context: LambdaContext):
return app.resolve(event, context)
キーが dummy
であるものを除いてブロードキャストするシンプルな処理としました。
配列を扱うので、リスト内包表記は相性が良さそうです。
では、ダミーを含めたイベントをパブリッシュしてみます。
サブスクライバー側ではフィルタリングされたイベントが表示されました!
payload の形式としては下記のようになっていました。
[{'payload': {'event': 'data_1'}, 'id': '66ed9700-1365-436a-912c-0a35f93254a8'}, {'payload': {'event': 'dummy'}, 'id': '9ff2d8e7-a0e5-4291-bd1d-38923acd5dd6'}, {'payload': {'event': 'data_3'}, 'id': '5599bff9-ae38-46ef-80b8-8d73c08406ed'}]
aggregate=False
の方がシンプルに記述できるため、基本的にはこちらを利用することになるでしょう。
ただし、バッチ全体を意識する必要がある複雑な処理を実装する際や、バッチで DB に書き込めることを重視する際に aggregate=True
にして利用すると良さそうです。
In some scenarios, you might want to process all events for a channel as a batch rather than individually. This is useful when you need to:
・Optimize database operations by making a single batch query
・Ensure all events are processed together or not at all
・Apply custom error handling logic for the entire batch
https://docs.powertools.aws.dev/lambda/python/latest/core/event_handler/appsync_events/#aggregated-processing
まとめ
今回は AppSync Events の Lambda とのデータソース統合を試してみました。
Powertools for AWS Lambda も利用することで、特に詰まる所も無く WebSocket API を構築してイベント処理をカスタマイズすることができました。
AppSync Events について、シンプルに構築できる上に複雑なロジックも組みやすくなっており、WebSocket API を構築する際のかなり魅力的な選択肢に思えてきています。
2024 年末に発表されたという所で、まだまだこれからな機能かと思いますが、是非注目してみて下さい!