Lambda Durable Functionsを利用してRedshiftに対しクエリ実行と結果のポーリングをしてみた

Lambda Durable Functionsを利用してRedshiftに対しクエリ実行と結果のポーリングをしてみた

2026.01.07

データ事業本部のueharaです。

今回は、Lambda Durable Functionsを利用してRedshiftに対しクエリ実行と結果のポーリングをしてみたいと思います。

はじめに

Lamndaには 15分 という最大実行時間の制限があります。

これまで、Redshiftに対し実行に15分以上かかるクエリを実行するには以下のような対応を取る必要がありました。

  • Glue Python Shell/ECSなど実行時間制限の無い(緩い)コンピューティングリソースからクエリを同期的に実行
  • Step FunctionsとLambdaを組み合わせて、非同期でクエリの実行・結果のポーリング

前者の場合だとクエリ実行の完了を待つ間、すなわちコンピューティングリソースのアイドル時間に対し不必要なコストが発生するデメリットがあります。(加えて、ECSの場合はLambdaやGlue Python Shellと比較しイメージの作成・管理の手間もあります)

後者の場合はStep Functionsの学習コストに加え、ステート間の連携を意識して構築する必要があり、やりたいことに対し少々構築の手間が大きい印象があります。

そこで、今回はre:Invent 2025で新しく発表された Lambda Durable Functions を利用して、15分以上かかるクエリ実行を想定した環境構築を行いたいと思います。(2025/12より東京リージョンでも利用可能となっております)

https://docs.aws.amazon.com/lambda/latest/dg/durable-functions.html

Durable Functionsを利用して、後者をLambda単体で完結させるようなイメージです。

前提

今回、Redshiftは既に構築済みであることを想定します。(私の手元ではRedshift Serverlessを利用します)

利用するテーブルのサンプルとして、以下のように orders テーブルを用意しています。

20250107_durable_01

今回Lambdaから発行するクエリは、上記 orders テーブルをソースとしたマートテーブルの洗い替え(DELETE → INSERT)をするものとします。

また、テーブルへのアクセスにはIAM認証を利用し、今回登場するLambdaのIAMロールに対し必要なテーブルへのアクセス権限等は別途付与されていることを想定します。

ファイルの準備

今回、環境はAWS SAMで作成することにします。

※AWS SAM CLIのバージョン 1.150.0 からDurable Functionsがサポートされています。

ディレクトリ構成は以下の通りです。

.
├── handler
│   └── sample
│       ├── query.sql
│       └── redshift_batch_executor.py
├── layers
│   └── dependencies
│       └── requirements.txt
├── samconfig.toml
└── template.yaml

まず、 samconfig.toml は以下の通りです。

samconfig.toml
version = 0.1

[default]
region = "ap-northeast-1"

[default.build.parameters]
debug = true

[default.deploy.parameters]
stack_name = "<YOUR_STACK_NAME>"
s3_bucket = "<YOUR_S3_BUCKET>"
s3_prefix = "<YOUR_S3_PREFIX>"
capabilities = "CAPABILITY_NAMED_IAM"
confirm_changeset = true

スタック名やデプロイ用のS3バケットに関する設定は適宜ご自身の環境に合わせて下さい。

次に、リソースを定義した template.yaml は以下の通りです。

template.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: "Redshift Query Executor with Lambda Durable Functions"

Parameters:
  RedshiftWorkgroupName:
    Type: String
    Description: "Redshift Serverless Workgroup Name"
    Default: "<YOUR_WORKGROUP_NAME>"
  RedshiftDatabaseName:
    Type: String
    Description: "Redshift Database Name"
    Default: "<YOUR_DB_NAME>"
  DurableExecutionTimeout:
    Type: Number
    Description: "Durable Functions Execution Timeout (seconds)"
    Default: 3700

Globals:
  Function:
    Timeout: 180
    MemorySize: 128

Resources:
  # Lambda Layer
  DependenciesLayer:
    Type: AWS::Serverless::LayerVersion
    Properties:
      LayerName: "uehara-redshift-durable-functions-layer"
      Description: "Dependencies for Redshift Durable Functions"
      ContentUri: layers/dependencies/
      CompatibleRuntimes:
        - python3.14
      CompatibleArchitectures:
        - x86_64
    Metadata:
      BuildMethod: python3.14
      BuildArchitecture: x86_64

  # Lambda
  RedshiftQueryFunction:
    Type: AWS::Serverless::Function
    Properties:
      FunctionName: "uehara-redshift-durable-functions"
      Role: !GetAtt RedshiftQueryFunctionRole.Arn
      CodeUri: handler/sample/
      Handler: redshift_batch_executor.lambda_handler
      Runtime: python3.14
      Architectures:
        - x86_64
      Timeout: 60
      Layers:
        - !Ref DependenciesLayer
      Environment:
        Variables:
          WORKGROUP_NAME: !Ref RedshiftWorkgroupName
          DATABASE_NAME: !Ref RedshiftDatabaseName
      DurableConfig:
        ExecutionTimeout: !Ref DurableExecutionTimeout

  # Lambda Role
  RedshiftQueryFunctionRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: "uehara-redshift-durable-functions-role"
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Action: sts:AssumeRole
            Principal:
              Service:
                - lambda.amazonaws.com
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
        - arn:aws:iam::aws:policy/AmazonRedshiftFullAccess
        - arn:aws:iam::aws:policy/AmazonRedshiftDataFullAccess
      Policies:
        - PolicyName: "DurableFunctionsPolicy"
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - lambda:CheckpointDurableExecution
                  - lambda:SendDurableExecutionCallbackSuccess
                  - lambda:SendDurableExecutionCallbackFailure
                Resource: !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:uehara-redshift-durable-functions*"

リソースとしては、以下の3つを定義しています。

  • Lambda Layer
  • Lambda (Durable Functions)
  • Lambda用のIAMロール

Lambda Layerは後述する requirements.txt からSAMのビルドコマンドを用いて作成します。

Redshift Serverlessのワークグループ名やDB名はパラメータとしており、Lambdaの環境変数として設定するようにしています。

また、Lambdaの DurableConfig はDurable Functionsの実行設定となっており、 ExecutionTimeout は最大は366日まで設定可能なDurable Functionsの実行時間の設定値になりますが、今回は3700秒(1時間+バッファの100秒)としています。

その他、Lambda用のIAMロールに関して、今回は検証用のためRedshiftの利用に関する権限は強めのマネージドポリシーを設定しています。

handler/sample/query.sql はLambdaからRedshiftに対し発行するクエリファイルになります。

今回は前述の「前提」に記載の通り、サンプルとしてマートテーブルを洗い替えで作成するようなクエリとしています。

handler/sample/query.sql
DELETE FROM public.mart_customer_summary;

INSERT INTO public.mart_customer_summary (
    customer_id,
    total_orders,
    total_amount,
    last_order_date,
    updated_at
)
SELECT
    customer_id,
    COUNT(DISTINCT order_id) AS total_orders,
    SUM(quantity * unit_price) AS total_amount,
    MAX(order_date) AS last_order_date,
    CURRENT_TIMESTAMP AS updated_at
FROM
    orders
GROUP BY
    customer_id;

handler/sample/redshift_batch_executor.py は今回キモとなるLambda関数(Durable Functions)のスクリプトファイルになりますが、以下の通りです。

handler/sample/redshift_batch_executor.py
import os
from pathlib import Path

import boto3
import sqlparse
from aws_durable_execution_sdk_python import (
    DurableContext,
    StepContext,
    durable_execution,
    durable_step,
)
from aws_durable_execution_sdk_python.config import Duration

# 環境変数
WORKGROUP_NAME = os.environ.get("WORKGROUP_NAME")
DATABASE_NAME = os.environ.get("DATABASE_NAME")

# Redshift Data APIクライアント
redshift_data = boto3.client("redshift-data")

def load_sql_statements(sql_file_path: str) -> list[str]:
    """SQLファイルを読み込み、個別のステートメントに分割する

    Args:
        sql_file_path (str): SQLファイルのパス

    Returns:
        list[str]: 分割されたSQLステートメントのリスト

    Raises:
        FileNotFoundError: SQLファイルが存在しない場合
    """
    with open(sql_file_path, "r", encoding="utf-8") as f:
        sql_content = f.read()

    # sqlparseでSQLを分割
    statements = sqlparse.split(sql_content)

    # 空のステートメントを除外
    return [stmt.strip() for stmt in statements if stmt.strip()]

@durable_step
def execute_batch_query(step_context: StepContext, statements: list) -> str:
    """Redshiftにバッチクエリを実行する

    Args:
        step_context (StepContext): Durable Functionsのステップコンテキスト
        statements (list): 実行するSQLステートメントのリスト

    Returns:
        str: クエリ実行ID
    """
    step_context.logger.info(f"Executing batch query with {len(statements)} statements")
    response = redshift_data.batch_execute_statement(
        WorkgroupName=WORKGROUP_NAME,
        Database=DATABASE_NAME,
        Sqls=statements,
    )
    query_id = response["Id"]
    step_context.logger.info(f"Batch query started with ID: {query_id}")
    return query_id

@durable_step
def check_query_status(step_context: StepContext, query_id: str) -> dict:
    """クエリの実行状態を確認する

    Args:
        step_context (StepContext): Durable Functionsのステップコンテキスト
        query_id (str): クエリ実行ID

    Returns:
        dict: クエリ状態情報
    """
    step_context.logger.info(f"Checking query status for ID: {query_id}")
    response = redshift_data.describe_statement(Id=query_id)
    status = response["Status"]
    step_context.logger.info(f"Query status: {status}")

    if status in ("FAILED", "ABORTED"):
        error_message = response.get("Error", "Unknown error")
        raise Exception(f"Query failed with status {status}: {error_message}")

    return {"status": status, "completed": status == "FINISHED"}

@durable_execution
def lambda_handler(event: dict, context: DurableContext) -> dict:
    """SQLファイル内の全ステートメントをバッチ実行するLambdaハンドラー

    Args:
        event (dict): Lambdaイベント
        context (DurableContext): Durable Functionsのコンテキスト

    Returns:
        dict: 実行結果を含むレスポンス
    """
    # SQLファイルのパスを取得(デフォルトは同ディレクトリのquery.sql)
    sql_file_path = event.get(
        "sql_file_path",
        str(Path(__file__).parent / "query.sql")
    )

    statements = load_sql_statements(sql_file_path)
    context.logger.info(f"Loaded {len(statements)} statements from {sql_file_path}")

    # バッチでクエリ実行
    query_id = context.step(execute_batch_query(statements))
    context.logger.info(f"Batch query submitted with ID: {query_id}")

    # ポーリングでクエリ完了を待機(10秒間隔、最大360回 = 1時間)
    max_attempts = 360
    for attempt in range(max_attempts):
        status_result = context.step(check_query_status(query_id))

        if status_result["completed"]:
            context.logger.info("Batch query completed successfully")
            break

        # 10秒待機
        context.wait(Duration.from_seconds(10))
    else:
        raise Exception(f"Query timed out after {max_attempts * 10} seconds")

    return {
        "statusCode": 200,
        "body": {
            "message": "All queries executed successfully",
            "query_id": query_id,
            "statement_count": len(statements),
            "status": status_result["status"],
        },
    }

各関数の処理についてはdocstringに記載の通りになりますが、簡単に説明するとクエリ実行はBoto3経由でRedshift Data APIの BatchExecuteStatement を利用しています。(参考

BatchExecuteStatement においてSQL文は単一のトランザクションとして実行され、配列の順序に従って順次実行されます。

SQLのステートメントは分割したリストの状態で渡す必要があるため、SQLファイルの読み込み時に sqlparse モジュールを利用して分割を行っています。

check_query_status 関数がクエリの実行結果をポーリングするためのチェック関数になっており、クエリの実行結果がエラーだった場合はエラーメッセージと共に例外を発生させるようにしています。

このチェック関数の実行をステップとして、10秒毎に呼び出しを行っています。

Lambda Layerを作成するための layers/dependencies/requirements.txt は以下の通りです。

layers/dependencies/requirements.txt
boto3==1.42.23
sqlparse==0.5.0
aws-durable-execution-sdk-python==1.1.0

今回の実行に必要なPythonモジュールを記載しています。

デプロイ

今回はLambda Layerもあるので、まずビルドを行います。

$ sam build --use-container

--use-container はコンテナを利用するオプションになっており、Docker等のコンテナ実行環境を事前に整備しておく必要があります。

public.ecr.aws/sam/build-python3.14:latest-x86_64 といったイメージが取得され、その中でビルドが行われるイメージです。

ビルドが完了すると、 .aws-sam 配下にビルドで生成されたファイルが出力されます。

20250107_durable_02

無事ビルドができたら、デプロイを行います。

$ sam deploy

スタックが作成されていればデプロイ完了です。

20250107_durable_03

実行確認

作成したLambda関数について、呼び出しタイプを「非同期」で実行します。

20250107_durable_04

※呼び出しタイプを同期にすると、15分以上実行できません。(公式ドキュメントに記載あり)

実行結果を確認すると、以下のように execute_batch_query 関数を実行した後、10秒間隔で check_query_status 関数をCallしていることが分かります。

20250107_durable_05

Step毎にログを確認すると、期待した挙動になっておりました。

▼ 実行ステータスが未完了

20250107_durable_06

▼ 実行ステータスが完了

20250107_durable_07

念のためマートテーブルが作成されたかも確認してみます。

20250107_durable_08

無事データの確認ができました。

最後に

今回は、Lambda Durable Functionsを利用してRedshiftに対しクエリ実行と結果のポーリングをしてみました。

参考になりましたら幸いです。

参考文献

この記事をシェアする

FacebookHatena blogX

関連記事