
Durable FunctionsでAthenaのクエリ結果をポーリングする実装をコスト最適な形に書き換えてみた #AWSreInvent
Athenaのクエリ実行は非同期なAPIです。クエリ実行開始のAPIを呼び出すとクエリの実行完了を待つことなく実行IDが返却され、その実行IDを指定してクエリ実行が完了したか状況を確認できます。Athenaというサービスの特性を考えると数TBレベルのデータをスキャンすることも想定されるため、この仕様は妥当なのですが、実際のユースケースを考えると同期的なAPIが欲しくなることもあります。
例えばですが、想定されるデータ量が最大でもxxなので...とか、クエリを実行するのは社内の限られた人間だけなので10秒程度の実行待ちは許容できるとか、そういった背景です。
LambdaでAthenaの同期的なクエリ実行を実現する場合、クエリを実行〜クエリ実行完了までGetQueryExecutionをポーリングすることになりますが、これを簡単に実現するためのライブラリとしてAthena-Queryなんてのもあります。
そう、Athenaのクエリ同期実行には根強いニーズが存在するんです。しかし、ポーリングというアーキテクチャは従量課金というLambdaのモデルの良さを殺してしまう側面があります。ポーリングしてる最中にもLambdaの課金は発生していますからね。
しかし、今回のLambdaのアップデートでDurable Functionsが利用可能になったことで、Lambdaでもコスト最適な形でポーリング処理を実装できるようになりました!さっそくAthenaのクエリ実行を題材にDurable Functionsでポーリング処理を実装してみます!
やってみる
前提として事前に以下ドキュメントに従ってsampleというデータベースにimpressionsというテーブルを作成済みとします。
また、以後のLambdaは全て以下の設定です。
- ランタイム: Node.js 24.x
- 一般設定のタイムアウト: 1分
- 永続設定の実行タイムアウト: 1分 ※あまり長いバグがあって無限ループした場合の課金が怖いので注意しましょう
Durable Functionsが無いとき
まず従来のポーリングを行う実装例です。
以下のブログを参考にポーリング処理を書いてみました。
import { AthenaClient, StartQueryExecutionCommand, GetQueryResultsCommand, GetQueryExecutionCommand } from '@aws-sdk/client-athena';
const athenaClient = new AthenaClient();
export const handler = async (event) => {
const startCommand = new StartQueryExecutionCommand({
QueryString: `
SELECT
dt,
COUNT(*)
FROM
impressions
WHERE
dt < '2009-04-12-14-00'
AND dt >= '2009-04-12-13-00'
GROUP BY
dt
ORDER BY
dt
`,
WorkGroup: 'primary',
QueryExecutionContext: {
Database: 'sample',
},
});
const executionResult = await athenaClient.send(startCommand);
const queryExecutionId = executionResult.QueryExecutionId;
let executionStatus = 'RUNNING';
while (executionStatus === 'RUNNING' || executionStatus === 'QUEUED') {
await new Promise((r) => setTimeout(r, 1000));
const getExecutionCommand = new GetQueryExecutionCommand({
QueryExecutionId: queryExecutionId,
});
const executionDetails = await athenaClient.send(getExecutionCommand);
executionStatus = executionDetails.QueryExecution?.Status?.State;
if (executionStatus === 'FAILED' || executionStatus === 'CANCELLED') {
throw new Error(`Query execution failed: ${executionDetails.QueryExecution?.Status?.StateChangeReason}`);
}
}
const getResultsCommand = new GetQueryResultsCommand({
QueryExecutionId: queryExecutionId
});
const queryResult = await athenaClient.send(getResultsCommand);
const response = {
statusCode: 200,
body: JSON.stringify(queryResult),
};
return response;
};
テスト実行した際のログには以下のように出力されていました。
REPORT RequestId: ea55cce9-0af1-4cfe-a045-aa9c486c1bd3 Duration: 3659.22 ms Billed Duration: 4102 ms Memory Size: 128 MB Max Memory Used: 107 MB Init Duration: 442.24 ms
Billed Duration: 4102 msなので、4秒弱に対して課金が発生することになります。
Durable Functionsがあるとき
先程のクエリ実行をDurable Functionsで書き直してみます。コードは以下のようになりました。
import { withDurableExecution } from '@aws/durable-execution-sdk-js';
import { AthenaClient, StartQueryExecutionCommand, GetQueryResultsCommand, GetQueryExecutionCommand } from '@aws-sdk/client-athena';
const athenaClient = new AthenaClient();
export const handler = withDurableExecution(async (event, context) => {
const queryExecutionId = await context.step("start-query-execution", async () => {
const startCommand = new StartQueryExecutionCommand({
QueryString: `
SELECT
dt,
COUNT(*)
FROM
impressions
WHERE
dt < '2009-04-12-14-00'
AND dt >= '2009-04-12-13-00'
GROUP BY
dt
ORDER BY
dt
`,
WorkGroup: 'primary',
QueryExecutionContext: {
Database: 'sample',
},
});
const executionResult = await athenaClient.send(startCommand);
const queryExecutionId = executionResult.QueryExecutionId;
console.log(`queryExecutionId: ${queryExecutionId}`)
return queryExecutionId;
});
const res = await context.waitForCondition(
"wait-for-query-execution",
async (currentState, ctx) => {
console.log(`GetQueryExecution実行 currentState: ${currentState}`);
const getExecutionCommand = new GetQueryExecutionCommand({
QueryExecutionId: queryExecutionId,
});
const executionDetails = await athenaClient.send(getExecutionCommand);
const executionResult = executionDetails.QueryExecution?.Status?.State;;
console.log(`executionResult: ${executionResult}`);
return executionResult;
},
{
initialState: '',
waitStrategy: (state, attempt) => {
// TODO クエリの実行結果が"FAILED"や"CANCELLED"の場合の考慮が必要
console.log(`waitStrategyでstateをチェック state: ${state}`);
if (state === "SUCCEEDED") {
console.log("終了");
return { shouldContinue: false };
}
console.log("リトライ");
return {
shouldContinue: true,
delay: { seconds: Math.min(attempt * 2, 10) },
};
},
},
);
console.log(res);
const getResultsCommand = new GetQueryResultsCommand({
QueryExecutionId: queryExecutionId
});
const queryResult = await athenaClient.send(getResultsCommand);
const response = {
statusCode: 200,
body: JSON.stringify(queryResult),
};
return response;
});
ポイントをいくつか解説します。
まずメインとなるAthenaのクエリ実行は1回だけ実行したいので、context.step()で囲んでやります。
const queryExecutionId = await context.step("start-query-execution", async () => {
const startCommand = new StartQueryExecutionCommand({
QueryString: `
SELECT
...略
`,
WorkGroup: 'primary',
QueryExecutionContext: {
Database: 'sample',
},
});
const executionResult = await athenaClient.send(startCommand);
const queryExecutionId = executionResult.QueryExecutionId;
console.log(`queryExecutionId: ${queryExecutionId}`)
return queryExecutionId;
});
これでStartQueryExecutionの実行結果がチェックポイントに保存され、2回目以後のLamdba実行ではこの後の処理から続きを再開できるようになります。
続いてリトライロジックです。context.waitForCondition()を使って実装します。
const res = await context.waitForCondition(
"wait-for-query-execution",
async (currentState, ctx) => {
console.log(`GetQueryExecution実行 currentState: ${currentState}`);
const getExecutionCommand = new GetQueryExecutionCommand({
QueryExecutionId: queryExecutionId,
});
const executionDetails = await athenaClient.send(getExecutionCommand);
const executionResult = executionDetails.QueryExecution?.Status?.State;;
console.log(`executionResult: ${executionResult}`);
return executionResult;
},
{
initialState: '',
waitStrategy: (state, attempt) => {
// TODO クエリの実行結果が"FAILED"や"CANCELLED"の場合の考慮が必要
console.log(`waitStrategyでstateをチェック state: ${state}`);
if (state === "SUCCEEDED") {
console.log("終了");
return { shouldContinue: false };
}
console.log("リトライ");
return {
shouldContinue: true,
delay: { seconds: Math.min(attempt * 2, 10) },
};
},
},
);
第2引数にはクエリ実行の終了をチェックするロジックを記述します。GetQueryExecutionを実行して結果を返却します。この結果を第3引数のオブジェクトのwaitStrategyでチェックし、SUCCEEDEDになっていた場合は{ shouldContinue: false }を返却してポーリング処理を終了します。SUCCEEDED以外の場合はリトライしてもらうためにshouldContinueにはtrueを設定、リトライ間隔にdelayを指定したオブジェクトを返却します。ポーリングの間隔を長くしたいとか短くしたいといった調整はここで行いましょう。
※今回は割愛していますが、本当はステータスがFAILEDやCANCELLEDの場合の考慮も必要なので、実際に本番環境に導入を検討する際は注意してください。
このLambdaをテスト実行すると以下の結果になりました。

wait-for-query-executionというステップが2回実行されており、2回目の実行でGetQueryExecutionの結果がSUCCEEDEDとなっていることが分かります。
CW Logsに出力されたログは以下の通りでした。

billedDurationMsについては2848 + 819で合計3667msです。Durable Functions無しの場合の4102msと比較して課金対象時間が短くなっていることがわかります!今回は実行したクエリが軽かったこともありGetQueryExecution2回で結果が取得できたため、あまり差異がありませんでしたが、これがもう少し時間のかかるクエリになってくるとDurable Functionsのコストメリットも大きくなってきそうです!
まとめ
Lambdaでポーリング処理を実装するユースケースとしてよく見かけるAthenaのクエリ処理を題材にcontext.waitForConditionによるポーリング処理を実装してみました。何かしらポーリング処理を実装しているユースケースではDurable Functionsへの置き換えを検討したいですね。東京リージョン/大阪リージョンでもDurable Functionsが利用可能になるのが待ち遠しいです!







