Amazon Athenaのクエリ実行と実行結果の取得をAWS SDKでやってみた(exponential-backoff)

2021.12.20

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、CX事業本部 IoT事業部の若槻です。

今回は、Amazon Athenaのクエリ実行と実行結果の取得をAWS SDKでやってみました。

Athenaのクエリ実行は非同期となる

Amazon AthenaのAWS SDKによるクエリ実行はstartQueryExecutionを使用します。

これを実行するとクエリが開始され、実行IDが取得できます。そしてこのgetQueryResultsで実行IDを指定するとクエリの実行結果が取得できます。

以上の処理を行う次のようなスクリプトをAWS SDK for JavaScriptで作成してみました。

import * as AWS from 'aws-sdk';
import { RowList, datumList } from 'aws-sdk/clients/athena';

const athena = new AWS.Athena({ region: 'ap-northeast-1' });

const glueDataBase = process.env.GLUE_DATA_BASE as string;
const glueViewTable = process.env.GLUE_VIEW_TABLE as string;
const athenaWorkGroup = process.env.ATHENA_WORK_GROUP as string;

//クエリ実行
const executionResult = await athena
  .startQueryExecution({
    QueryString: `
        SELECT *
        FROM "${glueDataBase}"."${glueViewTable}"
        LIMIT 1`,
    WorkGroup: athenaWorkGroup,
  })
  .promise();

const queryExecutionId = executionResult.QueryExecutionId;

//クエリ実行結果の取得
const queryResult = await athena
  .getQueryResults({
    QueryExecutionId: queryExecutionId as string,
  })
  .promise();

しかし上記を実行すると次のようなエラーとなります。クエリがまだ完了しておらずQUEUEDステータスであるとのことです。

InvalidRequestException: Query has not yet finished. Current state: QUEUED

原因としてはAthenaのクエリ実行は非同期となるためです。

クエリ実行のステータスはQUEUED | RUNNING | SUCCEEDED | FAILED | CANCELLEDのいずれかとなり、QUEUEDまたはRUNNINGのステータスで結果を取得しようとすると今回のようなエラーとなります。

The state of query execution. QUEUED indicates that the query has been submitted to the service, and Athena will execute the query as soon as resources are available. RUNNING indicates that the query is in execution phase. SUCCEEDED indicates that the query completed without errors. FAILED indicates that the query experienced an error and did not complete processing. CANCELLED indicates that a user input interrupted query execution.

ポーリングを設けたらうまく出来た

そこで次のようなwhilesetTimeoutでループ処理によるポーリングを設け、さらにtry/catch内でgetQueryResultsを実行することにようにしてみました。これによりクエリ実行が完了するまで待機できるようになっています。

import * as AWS from 'aws-sdk';
import { RowList, datumList } from 'aws-sdk/clients/athena';

const athena = new AWS.Athena({ region: 'ap-northeast-1' });

const glueDataBase = process.env.GLUE_DATA_BASE as string;
const glueViewTable = process.env.GLUE_VIEW_TABLE as string;
const athenaWorkGroup = process.env.ATHENA_WORK_GROUP as string;

test('テスト', async (): Promise<void> => {
  const executionResult = await athena
    .startQueryExecution({
      QueryString: `
          SELECT *
          FROM "${glueDataBase}"."${glueViewTable}"
          LIMIT 1`,
      WorkGroup: athenaWorkGroup,
    })
    .promise();

  const queryExecutionId = executionResult.QueryExecutionId;

  //getQueryResultsがエラーでなくなるまでポーリング
  let executionStatus = 'RUNNING';
  while (executionStatus === 'RUNNING') {
    await new Promise((r) => setTimeout(r, 1000));

    try {
      await athena
        .getQueryResults({
          QueryExecutionId: queryExecutionId as string,
        })
        .promise();

      executionStatus = 'DONE';
    } catch {}
  }

  //おまけ:クエリ結果のパース
  const queryResult = await athena
    .getQueryResults({
      QueryExecutionId: queryExecutionId as string,
    })
    .promise();

  const rowList = queryResult.ResultSet?.Rows as RowList;
  const attrList = rowList[0].Data as datumList;
  const valueList = rowList[1].Data as datumList;

  let data = {} as any;

  attrList.map((d, i) => {
    data[d.VarCharValue as string] = valueList[i].VarCharValue;
  });

  expect(data).toStrictEqual({
    eventname: expect.any(String),
    newDeviceId: expect.any(String),
    newDeviceName: expect.any(String),
    oldDeviceId: undefined,
    oldDeviceName: undefined,
    unixTimestamp: expect.any(String),
  });
});

テストコードとしたのでJestで実行するとエラー無くスクリプトが実行できました。

$  npx jest
 PASS  test/sampleHandler.test.ts (8.633 s)
  ✓ テスト (2508 ms)

Test Suites: 1 passed, 1 total
Tests:       1 passed, 1 total
Snapshots:   0 total
Time:        8.699 s, estimated 9 s
Ran all test suites.

しかしポーリング部分の記述が少し冗長な気がしますね。

exponential-backoffでもっとシンプルに

別解として、exponential-backoffを使うともっと処理をシンプルに記述できます。

exponential-backoffは、名前の通りエラーとなった実行を遅延してリトライさせられるユティリティです。今回のクエリ実行結果取得のようなエラーでなくなるまで試行し続けたい場合にうってつけです。

インストールはnpmで行えます。

$ npm i exponential-backoff

次のようにリトライさせたい処理の実行をbackOff()で指定します。

import * as AWS from 'aws-sdk';
import { backOff } from 'exponential-backoff';

const athena = new AWS.Athena({ region: 'ap-northeast-1' });

const glueDataBase = process.env.GLUE_DATA_BASE as string;
const glueViewTable = process.env.GLUE_VIEW_TABLE as string;
const athenaWorkGroup = process.env.ATHENA_WORK_GROUP as string;

test('テスト', async (): Promise<void> => {
  const executionResult = await athena
    .startQueryExecution({
      QueryString: `
          SELECT *
          FROM "${glueDataBase}"."${glueViewTable}"
          LIMIT 1`,
      WorkGroup: athenaWorkGroup,
    })
    .promise();

  const queryExecutionId = executionResult.QueryExecutionId;

  const queryResult = await backOff(() =>
    athena
      .getQueryResults({
        QueryExecutionId: queryExecutionId as string,
      })
      .promise()
  );

  //略
});

エラー無く実行できました。

$  npx jest
 PASS  test/sampleHandler.test.ts (8.362 s)
  ✓ テスト (2083 ms)

Test Suites: 1 passed, 1 total
Tests:       1 passed, 1 total
Snapshots:   0 total
Time:        8.432 s, estimated 9 s
Ran all test suites.

参考(環境作成)

以上