[AWS SDK for JavaScript v3] Amazon Athenaのクエリ実行および結果をページネーションして取得するTypeScriptテンプレート

2023.03.13

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、CX事業本部 Delivery部の若槻です。

Amazon Athenaは、非常に大きなデータセットに対して実行されるクエリをサポートするため、結果セットが非常に大きい場合は、一度にすべてのデータを取得することは望ましくありません。その場合、ページネーションを使用することで、各ページで必要なデータのみを取得し、余分なデータを取得しなくて済むため、クエリの実行時間を短縮することができます。

今回は、AWS SDK for JavaScript v3でAmazon Athenaのクエリ実行および結果をページネーションして取得するTypeScriptテンプレートを作ってみました。

やってみた

クエリ対象のテーブル作成

クエリ対象の適当なデータとして、Amazon CloudTrailのログデータを使ってみます。

Amazon CloudTrailコンソールで[Create Athena table]をクリック。

CloudTrailログが格納されているS3 Bucketを選択し、クリックしてテーブル作成。

クエリ対象のテーブルが作成できました。

テンプレート

必要な依存関係をインスントールします。

npm install @aws-sdk/client-athena

下記はAmazon Athenaのクエリ実行および結果をページネーションして取得するTypeScriptテンプレートです。

script.ts

import {
  AthenaClient,
  GetQueryExecutionCommand,
  GetQueryResultsCommand,
  StartQueryExecutionCommand,
  QueryExecutionState,
} from '@aws-sdk/client-athena';

const REGION = 'ap-northeast-1';
const DATABASE = 'default';
const MAX_RESULTS_OF_GET_QUERY_RESULTS_COMMAND =
  Number(process.env.MAX_RESULTS_OF_GET_QUERY_RESULTS_COMMAND) || 10;

const QUERY_STATEMENT = process.argv[2];

const athena = new AthenaClient({ region: REGION });

const executeQuery = async (query: string): Promise<string> => {
  const { QueryExecutionId } = await athena.send(
    new StartQueryExecutionCommand({
      QueryString: query,
      QueryExecutionContext: { Database: DATABASE },
    })
  );

  if (QueryExecutionId === undefined)
    throw new Error('QueryExecutionId is undefined.');

  return QueryExecutionId;
};

const getPage = async (
  queryExecutionId: string,
  pageToken?: string
): Promise<any> {
  // クエリ実行結果として、結果セットとネクストトークンを取得
  const { ResultSet, NextToken } = await athena.send(
    new GetQueryResultsCommand({
      QueryExecutionId: queryExecutionId,
      MaxResults: MAX_RESULTS_OF_GET_QUERY_RESULTS_COMMAND,
      NextToken: pageToken,
    })
  );

  if (ResultSet === undefined || ResultSet.Rows === undefined)
    throw new Error('ResultSet is undefined.');

  console.log(ResultSet.Rows.length);

  // ネクストトークンが含まれていた場合、次のページからクエリ実行結果を取得(再帰実行)
  if (NextToken) {
    const nextPage = await getPage(queryExecutionId, NextToken);
    if (Array.isArray(nextPage)) {
      return [...ResultSet.Rows, ...nextPage];
    } else {
      return ResultSet.Rows;
    }
  }

  return ResultSet;
}

const getResults = async (query: string): Promise<any> => {
  const queryExecutionId = await executeQuery(query);

  // クエリ実行完了待機
  let queryExecutionState = QueryExecutionState.RUNNING;
  while (queryExecutionState === QueryExecutionState.RUNNING) {
    await new Promise((resolve) => setTimeout(resolve, 1000));
    const { QueryExecution } = await athena.send(
      new GetQueryExecutionCommand({
        QueryExecutionId: queryExecutionId,
      })
    );
    if (
      QueryExecution == undefined ||
      QueryExecution.Status == undefined ||
      QueryExecution.Status.State == undefined
    )
      throw new Error('QueryExecution is undefined.');
    queryExecutionState = QueryExecution.Status.State as QueryExecutionState;
  }

  if (queryExecutionState !== QueryExecutionState.SUCCEEDED) {
    throw new Error(
      `Query ${queryExecutionId} failed with status ${queryExecutionState}`
    );
  }

  return getPage(queryExecutionId);
};

getResults(QUERY_STATEMENT);

実行してみると、ページネーション毎にデータが取得できていることが確認できました。

$ npx ts-node script.ts "SELECT * FROM cloudtrail_logs_cm_members_XXXXXXXXXXXX LIMIT 100"
10
10
10
10
10
10
10
10
10
10
1

GetQueryResultsCommandのMaxResultsで10を指定し、クエリステートメントでLIMIT 100を指定しているため、11回に分けて100件のデータが取得できています。(最初の結果セットにはヘッダー行が含まれるため、100件目のデータは11回目の試行で取得されています。)

Athena-Queryという選択肢もある

ここまでAWS SDK for JavaScript v3のネイティブなメソッドのみを使ってAthenaクエリ実行の結果取得を実装してみましたが、実は上記の実行完了待機およびページネーション処理をラップしたAthena-Queryというライブラリがあります。

Athena-Queryは弊社クラスメソッド公式のリポジトリで管理されており、以前私も下記の使ってみたブログを投稿しました。選択肢としてご検討ください。

参考

以上