AWS Lambda Durable Functionで並列操作parallelをためしてみた #AWSreInvent

AWS Lambda Durable Functionで並列操作parallelをためしてみた #AWSreInvent

2025.12.04

はじめに

データ事業本部のkobayashiです。

AWS Lambda上で複数の外部APIを順次呼び出すような処理を実装する際、各APIの応答を待つ時間が積み重なり全体の処理時間が長くなることがあります。Pythonではthreadingconcurrent.futuresを使って並列化できますが、AWS Lambda Durable Function を使うと、threadingのような感覚で並列実行ができ、さらにチェックポイントによる耐障害性も得られます。今回はこのparallel()機能を使って複数のAPIを並列で呼び出す方法を試してみたのでその内容をまとめます。

AWS Durable Execution SDK for Pythonとは

AWS Durable Execution SDK for Pythonは、耐障害性のあるワークフローを構築するためのSDKです。主な特徴は以下の通りです。

  • チェックポイント機能: 各ステップの実行結果が自動的に保存され、Lambdaが再起動しても途中から再開可能
  • 並列実行: parallel()map()で複数の処理を同時実行
  • コールバック: 外部からの応答を待つ長時間実行ワークフローに対応
  • シンプルなAPI: Pythonのデコレータと関数で直感的に記述可能

本機能の詳しい解説は以下の記事を参考にしてみてください。
https://dev.classmethod.jp/articles/aws-lambda-durable-functions-awsreinvent/

threadingとの比較

Lambdaのメモリ空間上で動作することを考えると、threadingに近い感覚で使えます。

観点 threading Durable Execution SDK
書き方 Threadconcurrent.futures context.parallel()
結果取得 自分でjoin()Future.result() BatchResultで自動収集
エラー処理 自分でtry-except failed_resultsで自動分離
再実行時 最初からやり直し チェックポイントから再開

普段の使い方はthreadingの感覚でOKですが、「再実行時にチェックポイントから再開できる」という点が大きな違いです。

parallel()の概要

parallel()メソッドは、複数の関数を並行実行し、結果を自動的に収集します。

メソッドシグネチャ

def parallel(
    functions: Sequence[Callable[[DurableContext], T]],
    name: str | None = None,
    config: ParallelConfig | None = None,
) -> BatchResult[T]

パラメータ:

パラメータ 説明
functions Sequence[Callable[[DurableContext], T]] 実行する関数のリスト。各関数はDurableContextを受け取り結果を返す
name str | None 並列操作の名前(デバッグやテスト用、省略可)
config ParallelConfig | None 同時実行数や完了条件を設定するオブジェクト(省略可)

BatchResultオブジェクト

parallel()の戻り値であるBatchResultには以下の情報が含まれます。

プロパティ 説明
successful_results list[T] 成功した結果のリスト(入力順序を保持)
failed_results list 失敗した結果のリスト
total_count int Branchの総数
success_count int 成功したBranch数
failure_count int 失敗したBranch数
status BatchStatus 並列操作の全体ステータス
completion_reason CompletionReason 操作が完了した理由

BatchResultの使用例

result: BatchResult[dict] = context.parallel([task1, task2, task3])

# 成功した結果を取得(入力順序を保持)
first_result = result.successful_results[0]
second_result = result.successful_results[1]
third_result = result.successful_results[2]

# 失敗があるかチェック
if result.failure_count > 0:
    print(f"失敗したBranch数: {result.failure_count}")
    print(f"失敗結果: {result.failed_results}")

# カウントの確認
print(f"総Branch数: {result.total_count}")
print(f"成功数: {result.success_count}")

ただ2025年12月4日現在、ドキュメントに記載のあるsuccessful_resultsを指定すると以下のエラーになってしまいます。

API アクションの呼び出しに失敗しました。エラーメッセージ: { "errorMessage": "'BatchResult' object has no attribute 'allsuccessful_results'", "errorType": "AttributeError" }

ParallelConfigパラメータ

max_concurrencyによる同時実行数の制御

大量の処理を並列実行する場合は、max_concurrencyでリソース使用量を制御できます。これにより、リソースの過剰消費を防ぐことができます。

from aws_durable_execution_sdk_python.config import ParallelConfig

config = ParallelConfig(max_concurrency=5)  # 最大5つまで同時実行
result = context.parallel(functions, config=config)

completion_configによる並列操作の完了制御

ParallelConfigcompletion_configで完了条件を設定できます。デフォルトでは、all_successful()ですべてのブランチが成功する必要がありますが、他にも以下のようなパターンがあるようです。

  • CompletionConfig.all_successful(): すべて成功で完了(デフォルト)
  • CompletionConfig.first_successful(): いずれか1つ成功で完了
  • CompletionConfig.all_completed(): 成功/失敗問わずすべて完了まで待機
from aws_durable_execution_sdk_python.config import CompletionConfig, ParallelConfig

# 最初に成功したものだけ取得(複数のデータソースから取得する場合など)
config = ParallelConfig(
    completion_config=CompletionConfig.first_successful()
)

# すべて完了まで待機(一部失敗しても結果を収集したい場合)
config = ParallelConfig(
    completion_config=CompletionConfig.all_completed()
)

AWS Lambda Durable Functionのparallelを試してみる

AWS Lambda Durable Functionは、2025年12月現在、us-east-2(オハイオ)リージョンでのみ利用可能なのでus-east-2で試してみます。

3つのAPIを並列実行してみる

では実際に3つの外部APIを並列で呼び出し、結果を統合する処理を実装してみます。
シナリオとしては ECサイトのダッシュボード表示を想定し、以下の3つのAPIから情報を取得して統合します。

  1. ユーザー情報API(2秒)
  2. 注文情報API(3秒)
  3. 在庫情報API(2秒)

順次実行すると 2 + 3 + 2 = 7秒かかりますが、並列実行すると最も遅いAPIの時間である約3秒で完了します。

実装コード

import time

from aws_durable_execution_sdk_python import (
    BatchResult,
    DurableContext,
    durable_execution,
)

@durable_execution
def lambda_handler(event: dict, context: DurableContext) -> dict:
    """3つのAPIを並列で呼び出して結果を統合する"""

    # 各APIを呼び出す関数を定義
    def fetch_user(ctx: DurableContext) -> dict:
        # ユーザー情報APIの応答時間(2秒)をシミュレート
        def call_user_api(_):
            time.sleep(2)  
            return {"user_id": "U001", "name": "Taro Yamada", "email": "taro@example.com"}
        return ctx.step(call_user_api, name="fetch_user")

    def fetch_orders(ctx: DurableContext) -> dict:
        # 注文情報APIの応答時間(3秒)をシミュレート
        def call_orders_api(_):
            time.sleep(3)  
            return {"order_id": "O001", "items": ["item1", "item2"], "total": 5000}
        return ctx.step(call_orders_api, name="fetch_orders")

    def fetch_inventory(ctx: DurableContext) -> dict:
        # 在庫情報APIの応答時間(2秒)をシミュレート
        def call_inventory_api(_):
            time.sleep(2)  
            return {"product_id": "P001", "stock": 100, "warehouse": "Tokyo"}
        return ctx.step(call_inventory_api, name="fetch_inventory")

    # 3つのAPIを並列で実行

    result: BatchResult[dict] = context.parallel([
        fetch_user,
        fetch_orders,
        fetch_inventory,
    ])

    # 結果を統合して返却

    return {
        # result
        # "user": result.allsuccessful_results[0],
        # "orders": result.successful_results[1],
        # "inventory": result.successful_results[2],

        "user": result.all[0].result,
        "orders": result.all[1].result,
        "inventory": result.all[2].result,

        # Counts
        "total_count": result.total_count,
        "success_count": result.success_count,
        "failure_count": result.failure_count,

        # Status information
        "status": result.status.value,
        "completion_reason": result.completion_reason.value,
    }

ポイント解説

  1. @durable_executionデコレータ: Lambda関数をDurable Functionとして定義
  2. ctx.step(): 各API呼び出しをステップとして定義。この単位でチェックポイントが作成される
  3. context.parallel(): 3つの関数を並列実行し、BatchResultで結果を受け取る
  4. 結果の順序: successful_resultsは入力した関数の順序を保持

となっています。
先程のBatchResultの使用例でも記述した通りsuccessful_resultsがうまく取得できないため上記のコードではBatchResultallから各ブランチの結果を取得しています。こちらも入力した関数の順序を保持しているようです。

このようにparallelを使うことで、複数のAPIを並列実行し、その結果を受け取ることができます。順次実行と並列実行の実行時間を比較すると以下のようになります。

実行方法 実行時間
順次実行 約7秒(2 + 3 + 2)
並列実行 約3秒(最大のAPI時間)

並列実行により約57%の時間短縮が実現できます。

並列実行のメリット

1. 実行時間の短縮

上記で試したように複数の独立したI/O処理を同時に実行することで、全体の処理時間を大幅に短縮できます。特に時間のかかる外部のAPIを同時並列で実行するような場面での効果が大きいです。

2. エラーの分離

各ブランチは独立して実行されるため、1つのAPIが失敗しても他のAPIの処理は継続されBatchResultで成功/失敗を個別に確認できるので、エラー時の分岐処理がしやすくなります。またtry-catchでエラーをキャッチする必要もありません。

    def fetch_inventory(ctx: DurableContext) -> dict:
        # 在庫情報APIの応答時間(2秒)をシミュレート
        def call_inventory_api(_):
            time.sleep(2)  
            return {"product_id": "P001", "stock": 100, "warehouse": "Tokyo"}
        raise Exception("エラーです。")
        return ctx.step(call_inventory_api, name="fetch_inventory")

    return {
    ...
        "inventory": str(result.all[2].error),
    ...

実行結果

{
  "user": {
    "user_id": "U001",
    "name": "Taro Yamada",
    "email": "taro@example.com"
  },
  "orders": {
    "order_id": "O001",
    "items": [
      "item1",
      "item2"
    ],
    "total": 5000
  },
  "inventory": "ErrorObject(message='エラー', type='CallableRuntimeError', data=None, stack_trace=None)",
  "total_count": 3,
  "success_count": 2,
  "failure_count": 1,
  "status": "FAILED",
  "completion_reason": "ALL_COMPLETED"
}

3. チェックポイントによる耐障害性

通常のthreadingと異なり、各ステップの結果がチェックポイントとして保存されます。タイムアウトや障害で再起動しても、完了したステップはスキップして続きから再開できることが大きいと思います。

まとめ

AWS Durable Execution SDK for Pythonのparallel()機能を使って複数のAPIを並列で呼び出す方法を試してみました。threadingのような感覚で並列実行ができ、さらにチェックポイントによる耐障害性も得られるため、Lambda上で複数の外部API呼び出しを行う場面で活用していきたいと思います。

最後まで読んで頂いてありがとうございました。

この記事をシェアする

FacebookHatena blogX

関連記事