AWS GlueのジョブスクリプトをCIする構成(Rye 0.24.0利用)
はじめに
AWS GlueのジョブスクリプトをPythonで書いていて、動作確認にする際に毎回デプロイし、AWS コンソールで動作確認するサイクルがあまり効率的ではありませんでした。
理由としては以下があげられます。
- Glueのリソースのプロビジョニングによる待ち時間
- Pythonが動的型付けかつ静的解析に限界があるため、実行前の検証手段が乏しい
- 構文ミスがあった場合でも、1の後にスクリプトは実行されるため、ケアレスミスでもプロビジョニングの待ち時間は発生する
以上より、AWSが掲載しているDeveloping and testing AWS Glue job scripts locallyを参考に、Glueジョブスクリプトを開発する上で必要な(CIやローカルでテストできる)テンプレート作ってみました。
構成
ソースコード
GitHub上にソースコードを公開しています。
GitHub Actionsで継続的にCIを回しているので、他の環境で動作しないということは起こりにくと思います。ローカルでの利用ツールは、Requirementsを参考にしてください。
プロジェクトのディレクトリ構成
npmとRyeのワークスペース機能を使っています。これはモノレポの中に、CDKやNodeバックエンド、フロントエンドが入ることが私が関わるプロジェクトで多いためです。ジョブランナーは、npmに寄せています。少しキメラっぽい構成ですいません。。必要な要素だけ抜き取って、読んで頂けたら幸いです。
$ exa -L 2 -T --git-ignore ├── cspell.json ├── package-lock.json ├── package.json ├── packages │ ├── config │ ├── iac │ ├── py-tool │ └── src-glue ├── pyproject.toml ├── README.md ├── requirements-dev.lock ├── requirements.lock └── setup-rye.sh
ワークスペースでの役割は以下の通りです。今回は、@glue-template/src-glue
について解説します。他の設定が気になる場合はリポジトリを参考にしてください。
ワークスペース名 | 役割 |
---|---|
@glue-template/config | tsconfig,ruff(Pythonの静的解析,フォーマッター),biome(TSの静的解析,フォーマッター)の設定 |
@glue-template/iac | CDKのコード |
@glue-template/py-tool | 任意のPythonプロジェクト(ワークスペース機能動作確認用) |
@glue-template/src-glue | Glueジョブスクリプトとテスト |
CI実装に関して
はじめに
GlueがS3からデータを取得して、同じS3に変換したデータを出力しています。Glueのジョブは、以下のような変換をします。
{"code": 2} {"code": 5} {"code": 3} {"code": 4} {"code": 1}
{"code":2,"name":"orange"} {"code":5,"name":"banana"} {"code":3,"name":"grape"} {"code":4,"name":"pear"} {"code":1,"name":"apple"}
対応した番号に該当する果物の名前を、nameフィールドで追記するという変換内容です。
コンテナの構成
サービス | 役割 |
---|---|
glue.dev.s3.local | locastackのコンテナイメージを利用し、S3をエミュレートする |
glue.dev.summary | ・amazon/aws-glue-libs:glue_libs_4.0.0_image_01 のコンテナイメージを利用し、Glueをエミュレートする・pytestの実行元 |
version: '3.5' services: glue.dev.s3.local: container_name: s3.dev image: localstack/localstack:3.0.2 environment: - SERVICES=s3 - AWS_DEFAULT_REGION=ap-northeast-1 - AWS_DEFAULT_OUTPUT=json - AWS_ACCESS_KEY_ID=test - AWS_SECRET_ACCESS_KEY=test ports: - 4566:4566 networks: - glue.dev.network glue.dev.summary: container_name: glue.dev image: amazon/aws-glue-libs:glue_libs_4.0.0_image_01 # 私のローカルマシンから実行した際に、処理できるファイル上限を超えたため、ulimits設定を追加 ulimits: nofile: soft: 12288 hard: 12288 volumes: - ./:/home/glue_user/workspace/jupyter_workspace - ./spark.conf:/home/glue_user/spark/conf/spark-defaults.conf environment: - DISABLE_SSL=true - AWS_REGION=ap-northeast-1 - AWS_OUTPUT=json - AWS_ACCESS_KEY_ID=test - AWS_SECRET_ACCESS_KEY=test ports: # jupyterlab - 8888:8888 # spark ui - 4040:4040 command: /home/glue_user/jupyter/jupyter_start.sh networks: - glue.dev.network networks: glue.dev.network: name: glue.dev.network
glue.dev.summary
のコンテナに、ローカルのpytestのソースコードマウントして、コンテナ側のpytestを実行しています(なので、packages/src-glue/pyproject.toml
にパッケージの記載はありません)。- glue側から、localstack側へはdockerのnetworkでコンテナ名「s3.dev」で名前解決しています。
Glueジョブスクリプト
コメントにてポイントを補足します。
import sys from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from pyspark.sql.functions import udf from pyspark.sql.types import StringType def run_job( glue_context, s3_bucket, target_year, target_month, target_day, target_hour, ): dyf = glue_context.create_dynamic_frame.from_options( format_options={"multiline": False}, connection_type="s3", format="json", connection_options={ "paths": [ f"s3://{s3_bucket}/input/year={target_year}/month={target_month}/day={target_day}/hour={target_hour}/" ], "recurse": True, }, transformation_ctx="dyf", ) df = dyf.toDF() if df.count() <= 0: raise Exception("No Data.") code_to_fruit_udf = udf(code_to_fruit, StringType()) df = df.withColumn("name", code_to_fruit_udf(df["code"])) df.coalesce(1).write.option("compression", "gzip").mode("overwrite").json( f"s3://{s3_bucket}/output/{target_year}/{target_month}/{target_day}/{target_hour}" ) def code_to_fruit(code): fruits = { 1: "apple", 2: "orange", 3: "grape", 4: "pear", 5: "banana", 6: "cherry", 7: "strawberry", 8: "kiwi", 9: "peach", 10: "melon", } return fruits.get(code, "unknown") def main(): args = getResolvedOptions(sys.argv, ["JOB_NAME", "IO_S3_BUCKET"]) job_name = args["JOB_NAME"] s3_bucket = args["IO_S3_BUCKET"] [target_year, target_month, target_day, target_hour] = ["2023", "01", "05", "07"] sc = SparkContext() glue_context = GlueContext(sc) job = Job(glue_context) job.init(job_name, args) run_job( glue_context, s3_bucket, target_year, target_month, target_day, target_hour, ) job.commit() if __name__ == "__main__": main()
pytestの構成
sparkの設定を初期化しています。前述したlocalstackのエンドポイント設定もこちらで実施しています。
import pytest from awsglue.context import GlueContext from awsglue.job import Job from helpers.config import S3_ENDPOINT_URL from pyspark.sql import SparkSession @pytest.fixture() def fixture_setup_glue(): sc = SparkSession.builder.getOrCreate() sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", S3_ENDPOINT_URL) sc._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true") sc._jsc.hadoopConfiguration().set("fs.s3a.signing-algorithm", "S3SignerType") sc._jsc.hadoopConfiguration().set("fs.s3a.change.detection.mode", "None") sc._jsc.hadoopConfiguration().set( "fs.s3a.change.detection.version.required", "false" ) context = GlueContext(sc) job = Job(context) yield (context) job.commit() sc.stop()
こちらがテストコードです。以下の順序で実行されます。
- fixture_setup_glue yieldまで
- setup_sample_data yieldまで
- test_run_job
- setup_sample_data yield以降
- fixture_setup_glue yield以降
import gzip import os import pytest from src import convert_fruits_job from tests.helpers import s3 from tests.helpers.config import S3_BUCKET_NAME @pytest.fixture() def setup_sample_data(): s3_file_name = "input/year=2023/month=01/day=05/hour=07/input-data-7-2024-01-21-23-42-22-ac74facb-bc41-3931-a76e-66e445872d60.gz" s3.create_bucket(S3_BUCKET_NAME) s3.upload_file_to_s3( S3_BUCKET_NAME, "tests/testdata/input-data.jsonl", s3_file_name ) yield s3.delete_file_from_s3(S3_BUCKET_NAME, s3_file_name) s3.delete_bucket(S3_BUCKET_NAME) class TestConvertFruitsJob: def test_run_job(self, fixture_setup_glue, setup_sample_data): convert_fruits_job.run_job( fixture_setup_glue, S3_BUCKET_NAME, "2023", "01", "05", "07", ) file_prefix = "output/2023/01/05/07" s3_object_paths = s3.list_objects(S3_BUCKET_NAME, file_prefix) assert len(s3_object_paths) == 1 s3_object_path = s3_object_paths[0] s3_object_name = os.path.basename(s3_object_path) local_path = f"/tmp/{s3_object_name}" s3.download_object(S3_BUCKET_NAME, f"{s3_object_path}", local_path) expected_output_lines = [ b'{"code":2,"name":"orange"}\n', b'{"code":5,"name":"banana"}\n', b'{"code":3,"name":"grape"}\n', b'{"code":4,"name":"pear"}\n', b'{"code":1,"name":"apple"}\n', ] with gzip.open(local_path) as f: content = f.read() assert content == b"".join(expected_output_lines)
GitHub Actionsの構成
まず全体を示します。
name: CI src-glue on: push: paths: - 'packages/src-glue/**' - '.github/workflows/ci-glue.yml' env: GLUE_LIB_CACHE_PATH: glue-libs-image GLUE_LIB_VERSION: 4.0.0 jobs: ci: runs-on: ubuntu-latest timeout-minutes: 30 steps: - name: Checkout uses: actions/checkout@v4 - uses: actions/setup-python@v5 with: python-version: '3.12' - name: Cache a GlueLib Docker image id: cache-python-packages uses: actions/cache@v4 with: path: ${{ env.GLUE_LIB_CACHE_PATH }} key: ${{ runner.os }}-glue-libs-${{ env.GLUE_LIB_VERSION }} - name: Pull and save a GlueLib Docker image if: steps.cache-python-packages.outputs.cache-hit != 'true' run: | docker pull amazon/aws-glue-libs:glue_libs_${GLUE_LIB_VERSION}_image_01 docker save amazon/aws-glue-libs:glue_libs_${GLUE_LIB_VERSION}_image_01 -o ${GLUE_LIB_CACHE_PATH} - name: Load GlueLib Docker Image run: | docker load -i ${GLUE_LIB_CACHE_PATH} - name: lunch container run: | cd packages/src-glue/ docker-compose up -d - name: Install Rye run: | ./setup-rye.sh - name: Restore python packages uses: actions/cache@v4 id: cache_dependency_python env: cache-name: cache-dependency-python with: path: '.venv' key: ${{ runner.os }}-build-${{ env.cache-name }}-${{ hashFiles('requirements-dev.lock') }} - name: Install python module if: ${{ steps.cache_dependency_python.outputs.cache-hit != 'true' }} run: | source "$HOME/.rye/env" rye sync - uses: actions/setup-node@v4 with: node-version-file: ./.node-version - name: Restore node modules uses: actions/cache@v4 id: cache_dependency_node env: cache-name: cache-dependency-node with: path: '**/node_modules' key: ${{ runner.os }}-build-${{ env.cache-name }}-${{ hashFiles('package-lock.json') }} - name: Install node modules if: ${{ steps.cache_dependency_node.outputs.cache-hit != 'true' }} run: npm ci --no-audit --progress=false --silent - name: CI shell: bash run: | source "$HOME/.rye/env" npm run check \ -w @glue-template/src-glue npm run test \ -w @glue-template/src-glue
ソースを切り出して、説明します。
Glueのイメージは3G程度あり、push時に実行するとGitHub Actionsのクレジットを無駄に消費してしまうため、パスでフィルタしています。これにより、ワークスペース毎にCIを設定する構成にしています。
on: push: paths: - 'packages/src-glue/**' - '.github/workflows/ci-glue.yml'
コンテナの容量が重いため、コンテナイメージをキャッシュしています。キャッシュが見つかっても、キャッシュからpullして展開する時間がかかるので、結構ばらつきはありますが、9分程度が3分程度になったケースもあるので、入れる価値はあります。
- name: Cache a GlueLib Docker image id: cache-python-packages uses: actions/cache@v4 with: path: ${{ env.GLUE_LIB_CACHE_PATH }} key: ${{ runner.os }}-glue-libs-${{ env.GLUE_LIB_VERSION }} - name: Pull and save a GlueLib Docker image if: steps.cache-python-packages.outputs.cache-hit != 'true' run: | docker pull amazon/aws-glue-libs:glue_libs_${GLUE_LIB_VERSION}_image_01 docker save amazon/aws-glue-libs:glue_libs_${GLUE_LIB_VERSION}_image_01 -o ${GLUE_LIB_CACHE_PATH} - name: Load GlueLib Docker Image run: | docker load -i ${GLUE_LIB_CACHE_PATH} - name: lunch container run: | cd packages/src-glue/ docker-compose up -d
Ryeをインストールしています。GitHub Actionsがあればそちらを使ってもいいと思います。Ryeの0.24.0でuvが実験的に導入され、本シェルで有効化しています。24秒かかっていたパッケージのインストール処理が4秒程度で終わるようになりました。パッケージ系は、GitHub Actions上ではキャッシュされるのでローカルでインストールする機会が多いほど恩恵が大きいと思います。
- name: Install Rye run: | ./setup-rye.sh
#/bin/bash # Ryeのバージョン固定 RYE_VERSION=0.24.0 curl -sSf https://rye-up.com/get | RYE_INSTALL_OPTION="--yes" bash source "$HOME/.rye/env" # uvの有効化 rye config --set-bool behavior.use-uv=true
テストの実行
モジュールをインストールします。
# nodeのパッケージインストール npm ci # pythonのパッケージインストール npm run deps
テストを実行し、通っていることが確認できました!コンソールから実行すると、90秒くらいかかるので13秒は早くて便利ですね!
# テスト実行 $ cd packages/src-glue/ $ finch compose up -d $ npm run test > test > ./run_test.sh WARN[0000] treating lima version "4ea0a83" from "/Applications/Finch/lima/data/finch/lima-version" as very latest release ============================= test session starts ============================== platform linux -- Python 3.10.2, pytest-7.2.1, pluggy-1.3.0 rootdir: /home/glue_user/workspace/jupyter_workspace, configfile: pyproject.toml, testpaths: tests plugins: anyio-4.0.0 collected 1 item tests/test_convert_fruits_job.py . [100%] =============================== warnings summary =============================== tests/test_convert_fruits_job.py::TestConvertFruitsJob::test_run_job /home/glue_user/spark/python/pyspark/sql/context.py:112: FutureWarning: Deprecated in 3.0.0. Use SparkSession.builder.getOrCreate() instead. warnings.warn( tests/test_convert_fruits_job.py::TestConvertFruitsJob::test_run_job /home/glue_user/spark/python/pyspark/sql/dataframe.py:127: UserWarning: DataFrame constructor is internal. Do not directly use it. warnings.warn("DataFrame constructor is internal. Do not directly use it.") -- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html ======================== 1 passed, 2 warnings in 13.53s ========================
さいごに
GlueのCIについて解説しました。今取り組んでいるプロジェクトではこの構成をとっています。動作確認のサイクルが高速になったことや何よりチームでの開発するGlueジョブスクリプトが被っても、動作確認がスケールするようになったので、絶大な効果を感じています。また本テンプレートでは、biome
やRye
をフル活用しているので、そちらも参考にして頂き、もっと良い静的解析ルールや、手法があればissueや@shuntaka_jpまで頂ければありがたいです!
参考
付属しているサンプルコードを含めて参考にさせて頂きました!コンテナのバージョンなどはできるだけ最新にして今回記事にしております!