はじめに
AWS GlueのジョブスクリプトをPythonで書いていて、動作確認にする際に毎回デプロイし、AWS コンソールで動作確認するサイクルがあまり効率的ではありませんでした。
理由としては以下があげられます。
- Glueのリソースのプロビジョニングによる待ち時間
- Pythonが動的型付けかつ静的解析に限界があるため、実行前の検証手段が乏しい
- 構文ミスがあった場合でも、1の後にスクリプトは実行されるため、ケアレスミスでもプロビジョニングの待ち時間は発生する
以上より、AWSが掲載しているDeveloping and testing AWS Glue job scripts locallyを参考に、Glueジョブスクリプトを開発する上で必要な(CIやローカルでテストできる)テンプレート作ってみました。
構成
ソースコード
GitHub上にソースコードを公開しています。
GitHub Actionsで継続的にCIを回しているので、他の環境で動作しないということは起こりにくと思います。ローカルでの利用ツールは、Requirementsを参考にしてください。
プロジェクトのディレクトリ構成
npmとRyeのワークスペース機能を使っています。これはモノレポの中に、CDKやNodeバックエンド、フロントエンドが入ることが私が関わるプロジェクトで多いためです。ジョブランナーは、npmに寄せています。少しキメラっぽい構成ですいません。。必要な要素だけ抜き取って、読んで頂けたら幸いです。
$ exa -L 2 -T --git-ignore
├── cspell.json
├── package-lock.json
├── package.json
├── packages
│ ├── config
│ ├── iac
│ ├── py-tool
│ └── src-glue
├── pyproject.toml
├── README.md
├── requirements-dev.lock
├── requirements.lock
└── setup-rye.sh
ワークスペースでの役割は以下の通りです。今回は、@glue-template/src-glue
について解説します。他の設定が気になる場合はリポジトリを参考にしてください。
ワークスペース名 | 役割 |
---|---|
@glue-template/config | tsconfig,ruff(Pythonの静的解析,フォーマッター),biome(TSの静的解析,フォーマッター)の設定 |
@glue-template/iac | CDKのコード |
@glue-template/py-tool | 任意のPythonプロジェクト(ワークスペース機能動作確認用) |
@glue-template/src-glue | Glueジョブスクリプトとテスト |
CI実装に関して
はじめに
GlueがS3からデータを取得して、同じS3に変換したデータを出力しています。Glueのジョブは、以下のような変換をします。
Glue変換前
{"code": 2}
{"code": 5}
{"code": 3}
{"code": 4}
{"code": 1}
Glue変換後
{"code":2,"name":"orange"}
{"code":5,"name":"banana"}
{"code":3,"name":"grape"}
{"code":4,"name":"pear"}
{"code":1,"name":"apple"}
対応した番号に該当する果物の名前を、nameフィールドで追記するという変換内容です。
コンテナの構成
サービス | 役割 |
---|---|
glue.dev.s3.local | locastackのコンテナイメージを利用し、S3をエミュレートする |
glue.dev.summary | ・amazon/aws-glue-libs:glue_libs_4.0.0_image_01 のコンテナイメージを利用し、Glueをエミュレートする・pytestの実行元 |
packages/src-glue/compose.yml
version: '3.5'
services:
glue.dev.s3.local:
container_name: s3.dev
image: localstack/localstack:3.0.2
environment:
- SERVICES=s3
- AWS_DEFAULT_REGION=ap-northeast-1
- AWS_DEFAULT_OUTPUT=json
- AWS_ACCESS_KEY_ID=test
- AWS_SECRET_ACCESS_KEY=test
ports:
- 4566:4566
networks:
- glue.dev.network
glue.dev.summary:
container_name: glue.dev
image: amazon/aws-glue-libs:glue_libs_4.0.0_image_01
# 私のローカルマシンから実行した際に、処理できるファイル上限を超えたため、ulimits設定を追加
ulimits:
nofile:
soft: 12288
hard: 12288
volumes:
- ./:/home/glue_user/workspace/jupyter_workspace
- ./spark.conf:/home/glue_user/spark/conf/spark-defaults.conf
environment:
- DISABLE_SSL=true
- AWS_REGION=ap-northeast-1
- AWS_OUTPUT=json
- AWS_ACCESS_KEY_ID=test
- AWS_SECRET_ACCESS_KEY=test
ports:
# jupyterlab
- 8888:8888
# spark ui
- 4040:4040
command: /home/glue_user/jupyter/jupyter_start.sh
networks:
- glue.dev.network
networks:
glue.dev.network:
name: glue.dev.network
glue.dev.summary
のコンテナに、ローカルのpytestのソースコードマウントして、コンテナ側のpytestを実行しています(なので、packages/src-glue/pyproject.toml
にパッケージの記載はありません)。- glue側から、localstack側へはdockerのnetworkでコンテナ名「s3.dev」で名前解決しています。
Glueジョブスクリプト
コメントにてポイントを補足します。
packages/src-glue/src/convert_fruits_job.py
import sys
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def run_job(
glue_context,
s3_bucket,
target_year,
target_month,
target_day,
target_hour,
):
dyf = glue_context.create_dynamic_frame.from_options(
format_options={"multiline": False},
connection_type="s3",
format="json",
connection_options={
"paths": [
f"s3://{s3_bucket}/input/year={target_year}/month={target_month}/day={target_day}/hour={target_hour}/"
],
"recurse": True,
},
transformation_ctx="dyf",
)
df = dyf.toDF()
if df.count() <= 0:
raise Exception("No Data.")
code_to_fruit_udf = udf(code_to_fruit, StringType())
df = df.withColumn("name", code_to_fruit_udf(df["code"]))
df.coalesce(1).write.option("compression", "gzip").mode("overwrite").json(
f"s3://{s3_bucket}/output/{target_year}/{target_month}/{target_day}/{target_hour}"
)
def code_to_fruit(code):
fruits = {
1: "apple",
2: "orange",
3: "grape",
4: "pear",
5: "banana",
6: "cherry",
7: "strawberry",
8: "kiwi",
9: "peach",
10: "melon",
}
return fruits.get(code, "unknown")
def main():
args = getResolvedOptions(sys.argv, ["JOB_NAME", "IO_S3_BUCKET"])
job_name = args["JOB_NAME"]
s3_bucket = args["IO_S3_BUCKET"]
[target_year, target_month, target_day, target_hour] = ["2023", "01", "05", "07"]
sc = SparkContext()
glue_context = GlueContext(sc)
job = Job(glue_context)
job.init(job_name, args)
run_job(
glue_context,
s3_bucket,
target_year,
target_month,
target_day,
target_hour,
)
job.commit()
if __name__ == "__main__":
main()
pytestの構成
sparkの設定を初期化しています。前述したlocalstackのエンドポイント設定もこちらで実施しています。
packages/src-glue/tests/conftest.py
import pytest
from awsglue.context import GlueContext
from awsglue.job import Job
from helpers.config import S3_ENDPOINT_URL
from pyspark.sql import SparkSession
@pytest.fixture()
def fixture_setup_glue():
sc = SparkSession.builder.getOrCreate()
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", S3_ENDPOINT_URL)
sc._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
sc._jsc.hadoopConfiguration().set("fs.s3a.signing-algorithm", "S3SignerType")
sc._jsc.hadoopConfiguration().set("fs.s3a.change.detection.mode", "None")
sc._jsc.hadoopConfiguration().set(
"fs.s3a.change.detection.version.required", "false"
)
context = GlueContext(sc)
job = Job(context)
yield (context)
job.commit()
sc.stop()
こちらがテストコードです。以下の順序で実行されます。
- fixture_setup_glue yieldまで
- setup_sample_data yieldまで
- test_run_job
- setup_sample_data yield以降
- fixture_setup_glue yield以降
packages/src-glue/tests/test_convert_fruits_job.py
import gzip
import os
import pytest
from src import convert_fruits_job
from tests.helpers import s3
from tests.helpers.config import S3_BUCKET_NAME
@pytest.fixture()
def setup_sample_data():
s3_file_name = "input/year=2023/month=01/day=05/hour=07/input-data-7-2024-01-21-23-42-22-ac74facb-bc41-3931-a76e-66e445872d60.gz"
s3.create_bucket(S3_BUCKET_NAME)
s3.upload_file_to_s3(
S3_BUCKET_NAME, "tests/testdata/input-data.jsonl", s3_file_name
)
yield
s3.delete_file_from_s3(S3_BUCKET_NAME, s3_file_name)
s3.delete_bucket(S3_BUCKET_NAME)
class TestConvertFruitsJob:
def test_run_job(self, fixture_setup_glue, setup_sample_data):
convert_fruits_job.run_job(
fixture_setup_glue,
S3_BUCKET_NAME,
"2023",
"01",
"05",
"07",
)
file_prefix = "output/2023/01/05/07"
s3_object_paths = s3.list_objects(S3_BUCKET_NAME, file_prefix)
assert len(s3_object_paths) == 1
s3_object_path = s3_object_paths[0]
s3_object_name = os.path.basename(s3_object_path)
local_path = f"/tmp/{s3_object_name}"
s3.download_object(S3_BUCKET_NAME, f"{s3_object_path}", local_path)
expected_output_lines = [
b'{"code":2,"name":"orange"}\n',
b'{"code":5,"name":"banana"}\n',
b'{"code":3,"name":"grape"}\n',
b'{"code":4,"name":"pear"}\n',
b'{"code":1,"name":"apple"}\n',
]
with gzip.open(local_path) as f:
content = f.read()
assert content == b"".join(expected_output_lines)
GitHub Actionsの構成
まず全体を示します。
.github/workflows/ci-glue.yml
name: CI src-glue
on:
push:
paths:
- 'packages/src-glue/**'
- '.github/workflows/ci-glue.yml'
env:
GLUE_LIB_CACHE_PATH: glue-libs-image
GLUE_LIB_VERSION: 4.0.0
jobs:
ci:
runs-on: ubuntu-latest
timeout-minutes: 30
steps:
- name: Checkout
uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Cache a GlueLib Docker image
id: cache-python-packages
uses: actions/cache@v4
with:
path: ${{ env.GLUE_LIB_CACHE_PATH }}
key: ${{ runner.os }}-glue-libs-${{ env.GLUE_LIB_VERSION }}
- name: Pull and save a GlueLib Docker image
if: steps.cache-python-packages.outputs.cache-hit != 'true'
run: |
docker pull amazon/aws-glue-libs:glue_libs_${GLUE_LIB_VERSION}_image_01
docker save amazon/aws-glue-libs:glue_libs_${GLUE_LIB_VERSION}_image_01 -o ${GLUE_LIB_CACHE_PATH}
- name: Load GlueLib Docker Image
run: |
docker load -i ${GLUE_LIB_CACHE_PATH}
- name: lunch container
run: |
cd packages/src-glue/
docker-compose up -d
- name: Install Rye
run: |
./setup-rye.sh
- name: Restore python packages
uses: actions/cache@v4
id: cache_dependency_python
env:
cache-name: cache-dependency-python
with:
path: '.venv'
key: ${{ runner.os }}-build-${{ env.cache-name }}-${{ hashFiles('requirements-dev.lock') }}
- name: Install python module
if: ${{ steps.cache_dependency_python.outputs.cache-hit != 'true' }}
run: |
source "$HOME/.rye/env"
rye sync
- uses: actions/setup-node@v4
with:
node-version-file: ./.node-version
- name: Restore node modules
uses: actions/cache@v4
id: cache_dependency_node
env:
cache-name: cache-dependency-node
with:
path: '**/node_modules'
key: ${{ runner.os }}-build-${{ env.cache-name }}-${{ hashFiles('package-lock.json') }}
- name: Install node modules
if: ${{ steps.cache_dependency_node.outputs.cache-hit != 'true' }}
run: npm ci --no-audit --progress=false --silent
- name: CI
shell: bash
run: |
source "$HOME/.rye/env"
npm run check \
-w @glue-template/src-glue
npm run test \
-w @glue-template/src-glue
ソースを切り出して、説明します。
Glueのイメージは3G程度あり、push時に実行するとGitHub Actionsのクレジットを無駄に消費してしまうため、パスでフィルタしています。これにより、ワークスペース毎にCIを設定する構成にしています。
on:
push:
paths:
- 'packages/src-glue/**'
- '.github/workflows/ci-glue.yml'
コンテナの容量が重いため、コンテナイメージをキャッシュしています。キャッシュが見つかっても、キャッシュからpullして展開する時間がかかるので、結構ばらつきはありますが、9分程度が3分程度になったケースもあるので、入れる価値はあります。
- name: Cache a GlueLib Docker image
id: cache-python-packages
uses: actions/cache@v4
with:
path: ${{ env.GLUE_LIB_CACHE_PATH }}
key: ${{ runner.os }}-glue-libs-${{ env.GLUE_LIB_VERSION }}
- name: Pull and save a GlueLib Docker image
if: steps.cache-python-packages.outputs.cache-hit != 'true'
run: |
docker pull amazon/aws-glue-libs:glue_libs_${GLUE_LIB_VERSION}_image_01
docker save amazon/aws-glue-libs:glue_libs_${GLUE_LIB_VERSION}_image_01 -o ${GLUE_LIB_CACHE_PATH}
- name: Load GlueLib Docker Image
run: |
docker load -i ${GLUE_LIB_CACHE_PATH}
- name: lunch container
run: |
cd packages/src-glue/
docker-compose up -d
Ryeをインストールしています。GitHub Actionsがあればそちらを使ってもいいと思います。Ryeの0.24.0でuvが実験的に導入され、本シェルで有効化しています。24秒かかっていたパッケージのインストール処理が4秒程度で終わるようになりました。パッケージ系は、GitHub Actions上ではキャッシュされるのでローカルでインストールする機会が多いほど恩恵が大きいと思います。
- name: Install Rye
run: |
./setup-rye.sh
#/bin/bash
# Ryeのバージョン固定
RYE_VERSION=0.24.0
curl -sSf https://rye-up.com/get | RYE_INSTALL_OPTION="--yes" bash
source "$HOME/.rye/env"
# uvの有効化
rye config --set-bool behavior.use-uv=true
テストの実行
モジュールをインストールします。
# nodeのパッケージインストール
npm ci
# pythonのパッケージインストール
npm run deps
テストを実行し、通っていることが確認できました!コンソールから実行すると、90秒くらいかかるので13秒は早くて便利ですね!
# テスト実行
$ cd packages/src-glue/
$ finch compose up -d
$ npm run test
> test
> ./run_test.sh
WARN[0000] treating lima version "4ea0a83" from "/Applications/Finch/lima/data/finch/lima-version" as very latest release
============================= test session starts ==============================
platform linux -- Python 3.10.2, pytest-7.2.1, pluggy-1.3.0
rootdir: /home/glue_user/workspace/jupyter_workspace, configfile: pyproject.toml, testpaths: tests
plugins: anyio-4.0.0
collected 1 item
tests/test_convert_fruits_job.py . [100%]
=============================== warnings summary ===============================
tests/test_convert_fruits_job.py::TestConvertFruitsJob::test_run_job
/home/glue_user/spark/python/pyspark/sql/context.py:112: FutureWarning: Deprecated in 3.0.0. Use SparkSession.builder.getOrCreate() instead.
warnings.warn(
tests/test_convert_fruits_job.py::TestConvertFruitsJob::test_run_job
/home/glue_user/spark/python/pyspark/sql/dataframe.py:127: UserWarning: DataFrame constructor is internal. Do not directly use it.
warnings.warn("DataFrame constructor is internal. Do not directly use it.")
-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
======================== 1 passed, 2 warnings in 13.53s ========================
さいごに
GlueのCIについて解説しました。今取り組んでいるプロジェクトではこの構成をとっています。動作確認のサイクルが高速になったことや何よりチームでの開発するGlueジョブスクリプトが被っても、動作確認がスケールするようになったので、絶大な効果を感じています。また本テンプレートでは、biome
やRye
をフル活用しているので、そちらも参考にして頂き、もっと良い静的解析ルールや、手法があればissueや@shuntaka_jpまで頂ければありがたいです!
参考
付属しているサンプルコードを含めて参考にさせて頂きました!コンテナのバージョンなどはできるだけ最新にして今回記事にしております!