Athenaのクエリ結果をポーリングする実装をdurable functionsで書き換えてみた #AWSreInvent

Athenaのクエリ結果をポーリングする実装をdurable functionsで書き換えてみた #AWSreInvent

Durable Functionsでコスト改善!! ...できる場合もあれば、できない場合もあるので注意が必要です。
2025.12.04

Athenaのクエリ実行は非同期なAPIです。クエリ実行開始のAPIを呼び出すとクエリの実行完了を待つことなく実行IDが返却され、その実行IDを指定してクエリ実行が完了したか状況を確認できます。Athenaというサービスの特性を考えると数TBレベルのデータをスキャンすることも想定されるため、この仕様は妥当なのですが、実際のユースケースを考えると同期的なAPIが欲しくなることもあります。
例えばですが、想定されるデータ量が最大でもxxなので...とか、クエリを実行するのは社内の限られた人間だけなので10秒程度の実行待ちは許容できるとか、そういった背景です。

LambdaでAthenaの同期的なクエリ実行を実現する場合、クエリを実行〜クエリ実行完了までGetQueryExecutionをポーリングすることになりますが、これを簡単に実現するためのライブラリとしてAthena-Queryなんてのもあります。

https://dev.classmethod.jp/articles/using-athena-query/

そう、Athenaのクエリ同期実行には根強いニーズが存在するんです。しかし、ポーリングというアーキテクチャは従量課金というLambdaのモデルの良さを殺してしまう側面があります。ポーリングしてる最中にもLambdaの課金は発生していますからね。

しかし、今回のLambdaのアップデートでdurable functionsが利用可能になったことで、Lambdaでもコンピューティング料金を無駄にしない形でポーリング処理を実装できるようになりました!さっそくAthenaのクエリ実行を題材にdurable functionsでポーリング処理を実装してみます!

やってみる

前提として事前に以下ドキュメントに従ってsampleというデータベースにimpressionsというテーブルを作成済みとします。

データのパーティション化 - Amazon Athena

また、以後のLambdaは全て以下の設定です。

  • ランタイム: Node.js 24.x
  • 一般設定のタイムアウト: 1分
  • 永続設定の実行タイムアウト: 1分 ※あまり長いとバグがあって無限ループした場合の課金が怖いので注意しましょう

durable functionsが無いとき

まず従来のポーリングを行う実装例です。

以下のブログを参考にポーリング処理を書いてみました。
https://dev.classmethod.jp/articles/get-amazon-athena-query-execution-results/

import { AthenaClient, StartQueryExecutionCommand, GetQueryResultsCommand, GetQueryExecutionCommand } from '@aws-sdk/client-athena';

const athenaClient = new AthenaClient();

export const handler = async (event) => {

  const startCommand = new StartQueryExecutionCommand({
    QueryString: `
    SELECT
      dt,
      COUNT(*)
    FROM
      impressions
    WHERE
      dt < '2009-04-12-14-00'
	    AND dt >= '2009-04-12-13-00'
    GROUP BY
      dt
    ORDER BY
      dt
`,
    WorkGroup: 'primary',
    QueryExecutionContext: {
      Database: 'sample',
    },
  });

  const executionResult = await athenaClient.send(startCommand);
  const queryExecutionId = executionResult.QueryExecutionId;

  let executionStatus = 'RUNNING';
  while (executionStatus === 'RUNNING' || executionStatus === 'QUEUED') {
    await new Promise((r) => setTimeout(r, 1000));

    const getExecutionCommand = new GetQueryExecutionCommand({
      QueryExecutionId: queryExecutionId,
    });

    const executionDetails = await athenaClient.send(getExecutionCommand);
    executionStatus = executionDetails.QueryExecution?.Status?.State;

    if (executionStatus === 'FAILED' || executionStatus === 'CANCELLED') {
      throw new Error(`Query execution failed: ${executionDetails.QueryExecution?.Status?.StateChangeReason}`);
    }
  }

  const getResultsCommand = new GetQueryResultsCommand({
    QueryExecutionId: queryExecutionId
  });

  const queryResult = await athenaClient.send(getResultsCommand);

  const response = {
    statusCode: 200,
    body: JSON.stringify(queryResult),
  };

  return response;
};

テスト実行した際のログには以下のように出力されていました。

REPORT RequestId: ea55cce9-0af1-4cfe-a045-aa9c486c1bd3	Duration: 3659.22 ms	Billed Duration: 4102 ms	Memory Size: 128 MB	Max Memory Used: 107 MB	Init Duration: 442.24 ms

Billed Duration: 4102 msなので、4秒弱に対して課金が発生することになります。

durable functionsがあるとき

先程のクエリ実行をdurable functionsで書き直してみます。コードは以下のようになりました。

import { withDurableExecution } from '@aws/durable-execution-sdk-js';
import { AthenaClient, StartQueryExecutionCommand, GetQueryResultsCommand, GetQueryExecutionCommand } from '@aws-sdk/client-athena';

const athenaClient = new AthenaClient();

export const handler = withDurableExecution(async (event, context) => {

  const queryExecutionId = await context.step("start-query-execution", async () => {
    const startCommand = new StartQueryExecutionCommand({
      QueryString: `
      SELECT
        dt,
        COUNT(*)
      FROM
        impressions
      WHERE
        dt < '2009-04-12-14-00'
        AND dt >= '2009-04-12-13-00'
      GROUP BY
        dt
      ORDER BY
        dt
  `,
      WorkGroup: 'primary',
      QueryExecutionContext: {
        Database: 'sample',
      },
    });
  
    const executionResult = await athenaClient.send(startCommand);
    const queryExecutionId = executionResult.QueryExecutionId;
    console.log(`queryExecutionId: ${queryExecutionId}`)
    return queryExecutionId;
  });

  const res = await context.waitForCondition(
    "wait-for-query-execution",
    async (currentState, ctx) => {
      console.log(`GetQueryExecution実行 currentState: ${currentState}`);
      const getExecutionCommand = new GetQueryExecutionCommand({
        QueryExecutionId: queryExecutionId,
      });
      const executionDetails = await athenaClient.send(getExecutionCommand);
      const executionResult = executionDetails.QueryExecution?.Status?.State;;
      console.log(`executionResult: ${executionResult}`);
      return executionResult;
    },
    {
      initialState: '',
      waitStrategy: (state, attempt) => {
        // TODO クエリの実行結果が"FAILED"や"CANCELLED"の場合の考慮が必要        
        console.log(`waitStrategyでstateをチェック state: ${state}`);
        if (state === "SUCCEEDED") {
          console.log("終了");
          return { shouldContinue: false };
        }
        console.log("リトライ");
        return {
          shouldContinue: true,
          delay: { seconds: Math.min(attempt * 2, 10) },
        };
      },
    },
  );
  console.log(res);

  const getResultsCommand = new GetQueryResultsCommand({
    QueryExecutionId: queryExecutionId
  });
  const queryResult = await athenaClient.send(getResultsCommand);

  const response = {
    statusCode: 200,
    body: JSON.stringify(queryResult),
  };

  return response;
});

ポイントをいくつか解説します。

まずメインとなるAthenaのクエリ実行は1回だけ実行したいので、context.step()で囲んでやります。

  const queryExecutionId = await context.step("start-query-execution", async () => {
    const startCommand = new StartQueryExecutionCommand({
      QueryString: `
      SELECT
        ...略
  `,
      WorkGroup: 'primary',
      QueryExecutionContext: {
        Database: 'sample',
      },
    });
  
    const executionResult = await athenaClient.send(startCommand);
    const queryExecutionId = executionResult.QueryExecutionId;
    console.log(`queryExecutionId: ${queryExecutionId}`)
    return queryExecutionId;
  });

これでStartQueryExecutionの実行結果がチェックポイントに保存され、2回目以後のLamdba実行ではこのStartQueryExecution呼び出しはスキップできるようになります。

続いてリトライロジックです。context.waitForCondition()を使って実装します。

const res = await context.waitForCondition(
    "wait-for-query-execution",
    async (currentState, ctx) => {
      console.log(`GetQueryExecution実行 currentState: ${currentState}`);
      const getExecutionCommand = new GetQueryExecutionCommand({
        QueryExecutionId: queryExecutionId,
      });
      const executionDetails = await athenaClient.send(getExecutionCommand);
      const executionResult = executionDetails.QueryExecution?.Status?.State;;
      console.log(`executionResult: ${executionResult}`);
      return executionResult;
    },
    {
      initialState: '',
      waitStrategy: (state, attempt) => {
        // TODO クエリの実行結果が"FAILED"や"CANCELLED"の場合の考慮が必要        
        console.log(`waitStrategyでstateをチェック state: ${state}`);
        if (state === "SUCCEEDED") {
          console.log("終了");
          return { shouldContinue: false };
        }
        console.log("リトライ");
        return {
          shouldContinue: true,
          delay: { seconds: Math.min(attempt * 2, 10) },
        };
      },
    },
  );

第2引数にはクエリ実行の終了をチェックするロジックを記述します。GetQueryExecutionを実行して結果を返却します。この結果を第3引数のオブジェクトのwaitStrategyでチェックし、SUCCEEDEDになっていた場合は{ shouldContinue: false }を返却してポーリング処理を終了します。SUCCEEDED以外の場合はリトライしてもらうためにshouldContinueにはtrueを設定、リトライ間隔にdelayを指定したオブジェクトを返却します。ポーリングの間隔を長くしたいとか短くしたいといった調整はここで行いましょう。

※今回は割愛していますが、本当はステータスがFAILEDCANCELLEDの場合の考慮も必要なので、実際に本番環境に導入を検討する際は注意してください。

このLambdaをテスト実行すると以下の結果になりました。
durable functionsでAthenaのクエリ実行をポーリングしたイベント履歴

wait-for-query-executionというステップが2回実行されており、2回目の実行でGetQueryExecutionの結果がSUCCEEDEDとなっていることが分かります。

CW Logsに出力されたログは以下の通りでした。

durable functionsでAthenaのクエリ実行をポーリングした際のログ出力

billedDurationMsについては2848 + 819で合計3667msです。durable functions無しの場合の4102msと比較して課金対象時間が短くなっていることがわかります!今回は実行したクエリが軽かったこともありGetQueryExecution2回で結果が取得できたため、あまり差異がありませんでしたが、これがもう少し時間のかかるクエリになってくると削減可能な課金対象時間が削減できそうです。

注意!!安くなるとは限らない

注意点としてdurable functionsは通常のLambdaの課金に加えて以下の課金が発生します。

  • StepsやWaitといったDurable Operationsに対しての課金... $8.00 / 100万オペレーション
  • Durable Operationsによって書き込まれたデータ量およびデータ保持期間に対する課金
    • 書き込み... $0.25 / 1GB
    • データ保持期間... $0.15 / 1GB Month

先程の例だとコンピューティング時間が435ms削減されました。
これはx86アーキテクチャ、メモリ1024MB割当、オハイオリージョンの場合は $0.0000000167 ✕ 435 = 約$0.0000072645の削減となります。

一方でDurable OperationはStart Executionが1回、Stepが1回、WaitForCallbackが2回なので合計4回になります。
つまり $8 ÷ 100万オペレーション ✕ 4で $0.000032の課金が追加で発生します。
今回のサンプルコードはストレージコストが微々たるものなので無視しますが、それでも$0.000024 - $0.0000072645で約$0.0000247355のコストが追加発生するという結果になりました。クエリの所要時間やWaitForCallbackの実行頻度によっては逆にコストが高くなる可能性があるのは注意が必要です。

とはいえdurable functionsによってLambdaの同時実行数が削減できるといったメリットもあるので、単純にコストだけで判断するのではなく総合的な観点でdurable functionsへの置き換えを検討しましょう。

まとめ

Lambdaでポーリング処理を実装するユースケースとしてよく見かけるAthenaのクエリ処理を題材にcontext.waitForConditionによるポーリング処理を実装してみました。何かしらポーリング処理を実装しているユースケースではdurable functionsへの置き換えを検討したいですね。東京リージョン/大阪リージョンでもdurable functionsが利用可能になるのが待ち遠しいです!

参考

この記事をシェアする

FacebookHatena blogX

関連記事