AppSync Events で Lambda をデータソース統合しつつ、Powertools for AWS Lambda (Python) でイベントを処理してみる

AppSync Events で Lambda をデータソース統合しつつ、Powertools for AWS Lambda (Python) でイベントを処理してみる

先日、AppSync Events がデータソース統合をサポートしました。

https://dev.classmethod.jp/articles/appsync-data-source-integration/

データソース統合を行うことで、WebSocket 上で発生したイベントを DynamoDB に保存したり、Lambda や Bedrock などに連携することが可能です。
元々 AppSync Events では専用の JavaScript ランタイム (APPSYNC_JS) を利用したイベント処理を行うことが可能でした。

AWS AppSync を使用すると、AWS AppSync の JavaScript ランタイム (APPSYNC_JS) で実行されるコードを使用して、サービスで発生する特定のトリガーに応答できます。
https://docs.aws.amazon.com/appsync/latest/eventapi/runtime-reference-overview.html

一方、Lambda 統合を行うことで Python なども含めた柔軟な実行環境でイベントを処理することが可能です。
また、本アップデートと合わせて Powertools for AWS Lambda に AppSync Events 用の Event Handler が追加されており、こちらを利用することでよりシンプルにイベント処理を実装できるようになっています。

https://docs.powertools.aws.dev/lambda/python/latest/core/event_handler/appsync_events/

https://github.com/aws-powertools/powertools-lambda-python/releases/tag/v3.11.0

今回は Powertools for AWS Lambda (Python) を利用して、AppSync Events で Publish したイベントを扱ってみました。

試してみる

下記構成で試してみます。

arch.png

Lambda 関数から作成していきます。
ランタイムは Python 3.13 を指定します。

lambda1.png

Powertools for AWS Lambda をレイヤーとして追加します。
AWS レイヤーとして追加することも可能ですが、現時点では Powertools for AWS Lambda (Python) v3.9.0 に対応するバージョンしか選択できなかったので、ARN を直接指定します。
Powertools for AWS Lambda のドキュメントを参考に、arn:aws:lambda:ap-northeast-1:017000801446:layer:AWSLambdaPowertoolsPythonV3-python313-x86_64:13 を指定しました。

lambda3.png

※ Powertools for AWS Lambda (Python) v3.11.0 に対応するバージョンを選択できるようになれば、AWS レイヤーでも構いません。

コードは下記のように設定しました。

from __future__ import annotations

from typing import TYPE_CHECKING, Any

from aws_lambda_powertools.event_handler import AppSyncEventsResolver

if TYPE_CHECKING:
    from aws_lambda_powertools.utilities.typing import LambdaContext

app = AppSyncEventsResolver()

@app.on_publish("/default/channel")
def handle_channel1_publish(payload: dict[str, Any]):
    print(payload)
    return {
        "processed": True,
        "original_payload": payload,
    }

def lambda_handler(event: dict, context: LambdaContext):
    return app.resolve(event, context)

チャンネル名を指定しながら on_publishon_subscribe といったデコレータを記載するだけでイベントを扱うことが可能です。
チャンネル名を記述する際にはワイルドカードも利用でき、default 名前空間の全てのチャンネルに対して処理を行う場合、/default/* のように書くことが可能です。

続いて、AppSync Event API 側の設定に移ります。
AppSync Event API を作成します。

appsync1.png

データソースを作成します。

appsync2.png

データソースタイプとして AWS Lambda を選択して、先程作成した関数を指定します。

appsync3.png

default 名前空間を選択します。

appsync4.png

イベントハンドラーを作成します。

appsync5.png

作成したデータソースをパブリッシュ設定として追加します。
呼び出しタイプは、同期処理となる RequestResponse を選択します。

appsync6.png

呼び出しタイプとして Event も選択でき、こちらを選択すると Lambda 関数を非同期的に呼び出すことができます。

The Lambda data source allows you to define two invocation types: RequestResponse and Event. The invocation types are synonymous with the invocation types defined in the Lambda API. The RequestResponse invocation type lets AWS AppSync call your Lambda function synchronously to wait for a response. The Event invocation allows you to invoke your Lambda function asynchronously.
https://docs.aws.amazon.com/appsync/latest/eventapi/lambda-function-reference.html

AppSync サービスページの Pub/Sub エディタを利用してテストします。
パブリッシャー側からイベントを発行します。

appsync8.png

Lambda によって追加要素が付与された状態でサブスクライバー側からイベントを確認できました!

appsync9.png

また、Lambda 関数がエラー終了した場合は、パブリッシャー側にエラーが返り、サブスクライバー側にはイベントは送信されません。

appsync7.png

バッチ処理の仕方について

Powertools for AWS Lambda を利用した際、各イベントを配列としてまとめて扱うことも、一つ一つ逐次的に扱うこともでき、デコレータの aggregate 属性で制御可能です。
デフォルト値は False で、各イベントを一つずつ処理します。

event-by-event-1.png

Working with single items | AppSync Events

先ほどは aggregate 属性を指定していなかったので、各イベントを一つずつ処理していました。
Lambda の中で print(payload) のように定義していましたが、登録した時と同じ形で一つずつ出力されています。

from __future__ import annotations

from typing import TYPE_CHECKING, Any

from aws_lambda_powertools.event_handler import AppSyncEventsResolver

if TYPE_CHECKING:
    from aws_lambda_powertools.utilities.typing import LambdaContext

app = AppSyncEventsResolver()

@app.on_publish("/default/channel")
def handle_channel1_publish(payload: dict[str, Any]):
    print(payload) # 標準出力
    return {
        "processed": True,
        "original_payload": payload,
    }

def lambda_handler(event: dict, context: LambdaContext):
    return app.resolve(event, context)

log.png

aggregate=True の場合は下記のような処理の流れになります。

list.png

Working with aggregated item | AppSync Events

Lambda 関数を下記のように修正して、試してみます。

from __future__ import annotations

from typing import TYPE_CHECKING, Any

from aws_lambda_powertools.event_handler import AppSyncEventsResolver

if TYPE_CHECKING:
    from aws_lambda_powertools.utilities.typing import LambdaContext

app = AppSyncEventsResolver()

@app.on_publish("/default/channel", aggregate=True)
def handle_channel1_publish(payload: list[dict[str, Any]]):
    print("Original payload:", payload)

    filtered_payload = [
        item for item in payload
        if not (
            isinstance(item.get('payload'), dict) and
            item['payload'].get('event') == 'dummy'
        )
    ]

    print("Filtered payload:", filtered_payload)
    return filtered_payload

def lambda_handler(event: dict, context: LambdaContext):
    return app.resolve(event, context)

キーが dummy であるものを除いてブロードキャストするシンプルな処理としました。
配列を扱うので、リスト内包表記は相性が良さそうです。

では、ダミーを含めたイベントをパブリッシュしてみます。

lambda4.png

サブスクライバー側ではフィルタリングされたイベントが表示されました!

lambda5.png

payload の形式としては下記のようになっていました。

[{'payload': {'event': 'data_1'}, 'id': '66ed9700-1365-436a-912c-0a35f93254a8'}, {'payload': {'event': 'dummy'}, 'id': '9ff2d8e7-a0e5-4291-bd1d-38923acd5dd6'}, {'payload': {'event': 'data_3'}, 'id': '5599bff9-ae38-46ef-80b8-8d73c08406ed'}]

log2.png

aggregate=False の方がシンプルに記述できるため、基本的にはこちらを利用することになるでしょう。
ただし、バッチ全体を意識する必要がある複雑な処理を実装する際や、バッチで DB に書き込めることを重視する際に aggregate=True にして利用すると良さそうです。

In some scenarios, you might want to process all events for a channel as a batch rather than individually. This is useful when you need to:
・Optimize database operations by making a single batch query
・Ensure all events are processed together or not at all
・Apply custom error handling logic for the entire batch
https://docs.powertools.aws.dev/lambda/python/latest/core/event_handler/appsync_events/#aggregated-processing

まとめ

今回は AppSync Events の Lambda とのデータソース統合を試してみました。
Powertools for AWS Lambda も利用することで、特に詰まる所も無く WebSocket API を構築してイベント処理をカスタマイズすることができました。
AppSync Events について、シンプルに構築できる上に複雑なロジックも組みやすくなっており、WebSocket API を構築する際のかなり魅力的な選択肢に思えてきています。
2024 年末に発表されたという所で、まだまだこれからな機能かと思いますが、是非注目してみて下さい!

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.