こんにちは、CX事業本部 Delivery部の若槻です。
Amazon Athenaは、非常に大きなデータセットに対して実行されるクエリをサポートするため、結果セットが非常に大きい場合は、一度にすべてのデータを取得することは望ましくありません。その場合、ページネーションを使用することで、各ページで必要なデータのみを取得し、余分なデータを取得しなくて済むため、クエリの実行時間を短縮することができます。
今回は、AWS SDK for JavaScript v3でAmazon Athenaのクエリ実行および結果をページネーションして取得するTypeScriptテンプレートを作ってみました。
やってみた
クエリ対象のテーブル作成
クエリ対象の適当なデータとして、Amazon CloudTrailのログデータを使ってみます。
Amazon CloudTrailコンソールで[Create Athena table]をクリック。
CloudTrailログが格納されているS3 Bucketを選択し、クリックしてテーブル作成。
クエリ対象のテーブルが作成できました。
テンプレート
必要な依存関係をインスントールします。
npm install @aws-sdk/client-athena
Amazon Athenaのクエリ実行および結果をページネーションして取得するTypeScriptテンプレートです。
script.ts
import {
AthenaClient,
GetQueryExecutionCommand,
GetQueryResultsCommand,
StartQueryExecutionCommand,
QueryExecutionState,
} from '@aws-sdk/client-athena';
const REGION = 'ap-northeast-1';
const DATABASE = 'default';
const MAX_RESULTS_OF_GET_QUERY_RESULTS_COMMAND =
Number(process.env.MAX_RESULTS_OF_GET_QUERY_RESULTS_COMMAND) || 10;
const QUERY_STATEMENT = process.argv[2];
const athena = new AthenaClient({ region: REGION });
const executeQuery = async (query: string): Promise<string> => {
const { QueryExecutionId } = await athena.send(
new StartQueryExecutionCommand({
QueryString: query,
QueryExecutionContext: { Database: DATABASE },
})
);
if (QueryExecutionId === undefined)
throw new Error('QueryExecutionId is undefined.');
return QueryExecutionId;
};
const getPage = async (
queryExecutionId: string,
pageToken?: string
): Promise<any> {
// クエリ実行結果として、結果セットとネクストトークンを取得
const { ResultSet, NextToken } = await athena.send(
new GetQueryResultsCommand({
QueryExecutionId: queryExecutionId,
MaxResults: MAX_RESULTS_OF_GET_QUERY_RESULTS_COMMAND,
NextToken: pageToken,
})
);
if (ResultSet === undefined || ResultSet.Rows === undefined)
throw new Error('ResultSet is undefined.');
console.log(ResultSet.Rows.length);
// ネクストトークンが含まれていた場合、次のページからクエリ実行結果を取得(再帰実行)
if (NextToken) {
const nextPage = await getPage(queryExecutionId, NextToken);
if (Array.isArray(nextPage)) {
return [...ResultSet.Rows, ...nextPage];
} else {
return ResultSet.Rows;
}
}
return ResultSet;
}
const getResults = async (query: string): Promise<any> => {
const queryExecutionId = await executeQuery(query);
// クエリ実行完了待機
let queryExecutionState = QueryExecutionState.RUNNING;
while (queryExecutionState === QueryExecutionState.RUNNING) {
await new Promise((resolve) => setTimeout(resolve, 1000));
const { QueryExecution } = await athena.send(
new GetQueryExecutionCommand({
QueryExecutionId: queryExecutionId,
})
);
if (
QueryExecution == undefined ||
QueryExecution.Status == undefined ||
QueryExecution.Status.State == undefined
)
throw new Error('QueryExecution is undefined.');
queryExecutionState = QueryExecution.Status.State as QueryExecutionState;
}
if (queryExecutionState !== QueryExecutionState.SUCCEEDED) {
throw new Error(
`Query ${queryExecutionId} failed with status ${queryExecutionState}`
);
}
return getPage(queryExecutionId);
};
getResults(QUERY_STATEMENT);
実行してみると、ページネーション毎にデータが取得できていることが確認できました。
$ npx ts-node script.ts "SELECT * FROM cloudtrail_logs_cm_members_XXXXXXXXXXXX LIMIT 100"
10
10
10
10
10
10
10
10
10
10
1
GetQueryResultsCommandのMaxResultsで10
を指定し、クエリステートメントでLIMIT 100
を指定しているため、11回に分けて100件のデータが取得できています。(最初の結果セットにはヘッダー行が含まれるため、100件目のデータは11回目の試行で取得されています。)
Athena-Queryという選択肢もある
ここまでAWS SDK for JavaScript v3のネイティブなメソッドのみを使ってAthenaクエリ実行の結果取得を実装してみましたが、実は上記の実行完了待機およびページネーション処理をラップしたAthena-Queryというライブラリがあります。
Athena-Queryは弊社クラスメソッド公式のリポジトリで管理されており、以前私も下記の使ってみたブログを投稿しました。選択肢としてご検討ください。
参考
以上