Athenaの結果確認をDigdagのretryでやってみた

2022.07.29

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

データアナリティクス事業本部の鈴木です。

boto3を使ってAthenaで検索を実行した際に、実行結果は処理終了を待って、実行IDで確認する必要があります。そのため、DigdagのようなワークフローエンジンでPythonを使ってAthenaに処理を投げる際には、Python側で処理が終わるまでループしてステータスチェックすることがあるかと思います。

ループして確認する処理は、Pythonではなくワークフロー側の仕組みで実現してみたい思いがあり、Digdagだとどのような実装ができそうか試してみました。

やりたいこと

DigdagからPythonを呼び出せるので、Pythonでboto3を使い、Athenaで処理を開始した後、レスポンスの実行IDを使って処理が終わるまで一定回数ステータスを確認しに行くような実装が多いと思います。

例えば以下のような感じです。

sample.dig

+execute_query:
  # この関数の中で、Athenaへの検索の実行から結果の確認までを行う。
  py>: python.athena.execute_sql_file

この実装だと、ワークフロー側はPythonに処理をほとんどお任せしているので、ワークフローの定義から何をしているのか汲み取りにくく、Pythonのコードを読む必要があります。ワークフロー側で繰返しの処理を実装できれば、ワークフロー定義から内容が分かりやすくなるので、どんな実装ができそうか試してみました。

Digdagだとloopはあるもののbreakのような機能が見つからなかったため、loopでAthenaで処理が終了したらループを早めに切り上げるような実装はできなさそうでした。

代わりにretryがあるので、この機能を試してみました。

構成

以下のような構成で検証しました。3層構成のデータ分析基盤があるとして、データレイク層からデータマート層に分析用のデータを作成するような想定です。

より具体的には、データレイク層に日毎CSVファイルが配置され、Athenaを使ってデータマート層にparquet形式に変換して出力するような実装を検証します。本当はマートにする際になにかしらの処理が行われると思いますが、今回は簡単のため、parquetに変換する以外はそのままとします。

検証した構成

吹き出しで必要なIAMロールとエンドポイントを記載しています。これはDigdagで実行する処理に必要なものだけを記載している点にご注意ください。あらかじめワークフローがスケジュールされていて、スケジュール実行で自動的に動く場合はこれだけで動作します。

上記構成には明記していませんが、実際にはDigdagのUIを確認したりファイルを検証用のファイルをアップロードしたりするために、AWS Systems Manager Session Manager越しにEC2にSSHアクセスして、ポートフォワードしました。

ポートフォワードの方法としては例えば以下の記事をご参考ください。

実装

エンドポイントなどのCloudFormationテンプレートと、Digdagに設定するdigファイルなどを以下のGithubレポジトリに格納しました。

適当なEC2がVPCのプライベートサブネットにあり、Digdagがインストールされている前提です。

digファイルは以下のようにしています。

create_mart.dig

timezone: Asia/Tokyo
_export:
  # 使用するPythonの指定
  py:
    python: /usr/bin/python3
  # 変数設定
  sql_file_path: sql/sample_mart.sql
  yyyymmdd: ${moment().format("YYYYMMDD")}
  workgroup: Athenaのワークグループ名
  database_name: テーブル定義を作成するデータベース名

  # s3://マート用バケット名/プレフィクス、のような形式を想定
  mart_key_prefix: s3://データレイク層用のバケット名/wine_mart
  # s3://Athena結果出力用バケット名/プレフィクス、のような形式を想定
  athena_result_key_prefix: s3://Athena検索結果出力用のバケット名/result

# 簡単のため、毎回洗い替えすることとする
+clear_partition:
  sh>: >-
    aws s3 rm ${mart_key_prefix}/yyyymmdd=${yyyymmdd}/ --recursive;

+execute_query:
  py>: python.athena.execute_sql_file

+check_query_status:
  _retry:
    limit: 3
    interval: 5
    interval_type: constant
  py>: python.athena.check_query_status

特に、check_query_statusのタスクで、リトライ機能を使ったAthenaの実行結果の確認をしています。interval_typeconstantなので、5秒おきに3回までステータスを確認します。exponentialにすることで、間隔をinterval x 2^(retry_count-1)のように増やすことも可能です。

呼び出しているcheck_query_status関数は以下のようにしています。Athenaの処理が終わっていない場合は例外を送出します。

athena.pyから抜粋

def check_query_status():
    """ クエリの実行が終わっているか確認する。終わっていない場合は例外を送出してdigdag側でリトライする。
    """
    param = digdag.env.params
    execution_id = param['execution_id']

    athena_client = boto3.client('athena', region_name='ap-northeast-1')
    status = athena_client.get_query_execution(QueryExecutionId=execution_id)["QueryExecution"]["Status"]
    if status["State"] not in ["SUCCEEDED", "FAILED", "CANCELLED"]:
        raise Exception("Query is running now!")
    else:
        print(f"QueryExecutionId: {execution_id}")
        print(f"Status: {status['State']}")
        is_query_success = (status['State']=="SUCCEEDED")
        digdag.env.store({"is_query_success": is_query_success})

また、リトライはステータスチェックの粒度のタスクに設定するようにしています。リトライする条件は細かくは設定できなかったので、あまり大きい粒度でリトライ設定すると、リトライ対象の範囲を初めから再実行するので、意図しない処理が再実行される可能性があり、注意が必要です。

動かしてみた

前提

以下のバージョンで検証しました。

  • Digdag:0.10.0
  • Python:3.7.9
  • boto3:1.17.82

事前準備

データレイク層用バケットの当日の日付のパーティションに、作成したCSVデータをアップロードしておきました。

データレイク層のファイル

このデータは、scikit-learnのwine datasetをデータフレームに加工し、6MB程度にコピーを繰り返して増幅させておいたものです。リトライがされているか確認するために、処理時間がかかるよう、少し大きめのサイズにしています。

Digdagサーバーを実行しておきます。EC2にログインし、digファイルなどをコピーしておきます。例えば以下のコマンドでサーバーを起動しておきます。

# -o でDBを保存するディレクトリを指定
# -n でポートを指定
# --logおよび--task-logでログの出力先を設定
digdag server -o ./digdag_sample -n 8081 --log ./digdag_sample --task-log ./digdag_sample

プロジェクトは事前にpushしておきます。

# creta_data_martディレクトリにdigファイルなどが入っているとする
cd create_data_mart
digdag push create_data_mart -e localhost:8081

ワークフローからの動作確認

DigdagのUIからcreate_data_martワークフローを実行します。

ワークフローを実行する

しばらくすると、ワークフローが正常に完了しました。Tasksの欄を見ると、リトライがされていることが分かります。

ワークフロー実行結果

データマート層にも期待通りparquetファイルが出力されていました。

データマート層のファイル

Athenaの処理が終わっていない場合、Python側で例外を送出しているので当たり前ですが、ログはエラーが起きたような感じになるのが少し気にはなります。ログの中身を見て機械的に何かしている場合は注意が必要そうです。

リトライ対象のエラー

リトライ時には正常に終了していました。

リトライされたタスク

最後に

今回はDigdagからboto3を使い、Athenaで処理を実行する際に、Digdagのリトライ機能を使って実行ステータスを確認する方法を試してみました。

細かなリトライ制御はできないので、一長一短ではありますが、Python側でループを書かなくてよくなったので、個人的にはシンプルでよいなと思いました。

参考になりましたら幸いです。