AWS GlueのジョブスクリプトをCIする構成(Rye 0.24.0利用)

AWS GlueのジョブスクリプトをCIする構成(Rye 0.24.0利用)

Clock Icon2024.02.19 06:53

はじめに

AWS GlueのジョブスクリプトをPythonで書いていて、動作確認にする際に毎回デプロイし、AWS コンソールで動作確認するサイクルがあまり効率的ではありませんでした。

理由としては以下があげられます。

  1. Glueのリソースのプロビジョニングによる待ち時間
  2. Pythonが動的型付けかつ静的解析に限界があるため、実行前の検証手段が乏しい
  3. 構文ミスがあった場合でも、1の後にスクリプトは実行されるため、ケアレスミスでもプロビジョニングの待ち時間は発生する

以上より、AWSが掲載しているDeveloping and testing AWS Glue job scripts locallyを参考に、Glueジョブスクリプトを開発する上で必要な(CIやローカルでテストできる)テンプレート作ってみました。

構成

ソースコード

GitHub上にソースコードを公開しています。

GitHub Actionsで継続的にCIを回しているので、他の環境で動作しないということは起こりにくと思います。ローカルでの利用ツールは、Requirementsを参考にしてください。

プロジェクトのディレクトリ構成

npmとRyeのワークスペース機能を使っています。これはモノレポの中に、CDKやNodeバックエンド、フロントエンドが入ることが私が関わるプロジェクトで多いためです。ジョブランナーは、npmに寄せています。少しキメラっぽい構成ですいません。。必要な要素だけ抜き取って、読んで頂けたら幸いです。

$ exa -L 2 -T --git-ignore
├── cspell.json
├── package-lock.json
├── package.json
├── packages
│  ├── config
│  ├── iac
│  ├── py-tool
│  └── src-glue
├── pyproject.toml
├── README.md
├── requirements-dev.lock
├── requirements.lock
└── setup-rye.sh

ワークスペースでの役割は以下の通りです。今回は、@glue-template/src-glueについて解説します。他の設定が気になる場合はリポジトリを参考にしてください。

ワークスペース名 役割
@glue-template/config tsconfig,ruff(Pythonの静的解析,フォーマッター),biome(TSの静的解析,フォーマッター)の設定
@glue-template/iac CDKのコード
@glue-template/py-tool 任意のPythonプロジェクト(ワークスペース機能動作確認用)
@glue-template/src-glue Glueジョブスクリプトとテスト

CI実装に関して

はじめに

GlueがS3からデータを取得して、同じS3に変換したデータを出力しています。Glueのジョブは、以下のような変換をします。

{"code": 2}
{"code": 5}
{"code": 3}
{"code": 4}
{"code": 1}
{"code":2,"name":"orange"}
{"code":5,"name":"banana"}
{"code":3,"name":"grape"}
{"code":4,"name":"pear"}
{"code":1,"name":"apple"}

対応した番号に該当する果物の名前を、nameフィールドで追記するという変換内容です。

コンテナの構成

サービス 役割
glue.dev.s3.local locastackのコンテナイメージを利用し、S3をエミュレートする
glue.dev.summary amazon/aws-glue-libs:glue_libs_4.0.0_image_01のコンテナイメージを利用し、Glueをエミュレートする
・pytestの実行元
version: '3.5'
services:
  glue.dev.s3.local:
    container_name: s3.dev
    image: localstack/localstack:3.0.2
    environment:
      - SERVICES=s3
      - AWS_DEFAULT_REGION=ap-northeast-1
      - AWS_DEFAULT_OUTPUT=json
      - AWS_ACCESS_KEY_ID=test
      - AWS_SECRET_ACCESS_KEY=test
    ports:
      - 4566:4566
    networks:
      - glue.dev.network
  glue.dev.summary:
    container_name: glue.dev
    image: amazon/aws-glue-libs:glue_libs_4.0.0_image_01
    # 私のローカルマシンから実行した際に、処理できるファイル上限を超えたため、ulimits設定を追加
    ulimits:
     nofile:
      soft: 12288
      hard: 12288
    volumes:
      - ./:/home/glue_user/workspace/jupyter_workspace
      - ./spark.conf:/home/glue_user/spark/conf/spark-defaults.conf
    environment:
      - DISABLE_SSL=true
      - AWS_REGION=ap-northeast-1
      - AWS_OUTPUT=json
      - AWS_ACCESS_KEY_ID=test
      - AWS_SECRET_ACCESS_KEY=test
    ports:
      # jupyterlab
      - 8888:8888
      # spark ui
      - 4040:4040
    command: /home/glue_user/jupyter/jupyter_start.sh
    networks:
      - glue.dev.network
networks:
 glue.dev.network:
   name: glue.dev.network

Glueジョブスクリプト

コメントにてポイントを補足します。

import sys

from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType


def run_job(
    glue_context,
    s3_bucket,
    target_year,
    target_month,
    target_day,
    target_hour,
):
    dyf = glue_context.create_dynamic_frame.from_options(
        format_options={"multiline": False},
        connection_type="s3",
        format="json",
        connection_options={
            "paths": [
                f"s3://{s3_bucket}/input/year={target_year}/month={target_month}/day={target_day}/hour={target_hour}/"
            ],
            "recurse": True,
        },
        transformation_ctx="dyf",
    )
    df = dyf.toDF()

    if df.count() <= 0:
        raise Exception("No Data.")

    code_to_fruit_udf = udf(code_to_fruit, StringType())
    df = df.withColumn("name", code_to_fruit_udf(df["code"]))

    df.coalesce(1).write.option("compression", "gzip").mode("overwrite").json(
        f"s3://{s3_bucket}/output/{target_year}/{target_month}/{target_day}/{target_hour}"
    )


def code_to_fruit(code):
    fruits = {
        1: "apple",
        2: "orange",
        3: "grape",
        4: "pear",
        5: "banana",
        6: "cherry",
        7: "strawberry",
        8: "kiwi",
        9: "peach",
        10: "melon",
    }

    return fruits.get(code, "unknown")


def main():
    args = getResolvedOptions(sys.argv, ["JOB_NAME", "IO_S3_BUCKET"])
    job_name = args["JOB_NAME"]
    s3_bucket = args["IO_S3_BUCKET"]

    [target_year, target_month, target_day, target_hour] = ["2023", "01", "05", "07"]

    sc = SparkContext()
    glue_context = GlueContext(sc)
    job = Job(glue_context)

    job.init(job_name, args)
    run_job(
        glue_context,
        s3_bucket,
        target_year,
        target_month,
        target_day,
        target_hour,
    )
    job.commit()


if __name__ == "__main__":
    main()

pytestの構成

sparkの設定を初期化しています。前述したlocalstackのエンドポイント設定もこちらで実施しています。

import pytest
from awsglue.context import GlueContext
from awsglue.job import Job
from helpers.config import S3_ENDPOINT_URL
from pyspark.sql import SparkSession


@pytest.fixture()
def fixture_setup_glue():
    sc = SparkSession.builder.getOrCreate()
    sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", S3_ENDPOINT_URL)
    sc._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
    sc._jsc.hadoopConfiguration().set("fs.s3a.signing-algorithm", "S3SignerType")
    sc._jsc.hadoopConfiguration().set("fs.s3a.change.detection.mode", "None")
    sc._jsc.hadoopConfiguration().set(
        "fs.s3a.change.detection.version.required", "false"
    )
    context = GlueContext(sc)
    job = Job(context)

    yield (context)

    job.commit()
    sc.stop()

こちらがテストコードです。以下の順序で実行されます。

  1. fixture_setup_glue yieldまで
  2. setup_sample_data yieldまで
  3. test_run_job
  4. setup_sample_data yield以降
  5. fixture_setup_glue yield以降
import gzip
import os

import pytest

from src import convert_fruits_job
from tests.helpers import s3
from tests.helpers.config import S3_BUCKET_NAME


@pytest.fixture()
def setup_sample_data():
    s3_file_name = "input/year=2023/month=01/day=05/hour=07/input-data-7-2024-01-21-23-42-22-ac74facb-bc41-3931-a76e-66e445872d60.gz"
    s3.create_bucket(S3_BUCKET_NAME)
    s3.upload_file_to_s3(
        S3_BUCKET_NAME, "tests/testdata/input-data.jsonl", s3_file_name
    )
    yield
    s3.delete_file_from_s3(S3_BUCKET_NAME, s3_file_name)
    s3.delete_bucket(S3_BUCKET_NAME)


class TestConvertFruitsJob:
    def test_run_job(self, fixture_setup_glue, setup_sample_data):
        convert_fruits_job.run_job(
            fixture_setup_glue,
            S3_BUCKET_NAME,
            "2023",
            "01",
            "05",
            "07",
        )

        file_prefix = "output/2023/01/05/07"
        s3_object_paths = s3.list_objects(S3_BUCKET_NAME, file_prefix)
        assert len(s3_object_paths) == 1

        s3_object_path = s3_object_paths[0]
        s3_object_name = os.path.basename(s3_object_path)
        local_path = f"/tmp/{s3_object_name}"
        s3.download_object(S3_BUCKET_NAME, f"{s3_object_path}", local_path)

        expected_output_lines = [
            b'{"code":2,"name":"orange"}\n',
            b'{"code":5,"name":"banana"}\n',
            b'{"code":3,"name":"grape"}\n',
            b'{"code":4,"name":"pear"}\n',
            b'{"code":1,"name":"apple"}\n',
        ]

        with gzip.open(local_path) as f:
            content = f.read()
            assert content == b"".join(expected_output_lines)

GitHub Actionsの構成

まず全体を示します。

name: CI src-glue

on:
  push:
    paths:
      - 'packages/src-glue/**'
      - '.github/workflows/ci-glue.yml'

env:
  GLUE_LIB_CACHE_PATH: glue-libs-image
  GLUE_LIB_VERSION: 4.0.0

jobs:
  ci:
    runs-on: ubuntu-latest
    timeout-minutes: 30
    steps:
      - name: Checkout
        uses: actions/checkout@v4

      - uses: actions/setup-python@v5
        with:
          python-version: '3.12'

      - name: Cache a GlueLib Docker image
        id: cache-python-packages
        uses: actions/cache@v4
        with:
          path: ${{ env.GLUE_LIB_CACHE_PATH }}
          key: ${{ runner.os }}-glue-libs-${{ env.GLUE_LIB_VERSION }}

      - name: Pull and save a GlueLib Docker image
        if: steps.cache-python-packages.outputs.cache-hit != 'true'
        run: |
          docker pull amazon/aws-glue-libs:glue_libs_${GLUE_LIB_VERSION}_image_01
          docker save amazon/aws-glue-libs:glue_libs_${GLUE_LIB_VERSION}_image_01 -o ${GLUE_LIB_CACHE_PATH}

      - name: Load GlueLib Docker Image
        run: |
          docker load -i ${GLUE_LIB_CACHE_PATH}

      - name: lunch container
        run: |
          cd packages/src-glue/
          docker-compose up -d

      - name: Install Rye
        run: |
          ./setup-rye.sh

      - name: Restore python packages
        uses: actions/cache@v4
        id: cache_dependency_python
        env:
          cache-name: cache-dependency-python
        with:
          path: '.venv'
          key: ${{ runner.os }}-build-${{ env.cache-name }}-${{ hashFiles('requirements-dev.lock') }}

      - name: Install python module
        if: ${{ steps.cache_dependency_python.outputs.cache-hit != 'true' }}
        run: |
          source "$HOME/.rye/env"
          rye sync

      - uses: actions/setup-node@v4
        with:
          node-version-file: ./.node-version

      - name: Restore node modules
        uses: actions/cache@v4
        id: cache_dependency_node
        env:
          cache-name: cache-dependency-node
        with:
          path: '**/node_modules'
          key: ${{ runner.os }}-build-${{ env.cache-name }}-${{ hashFiles('package-lock.json') }}

      - name: Install node modules
        if: ${{ steps.cache_dependency_node.outputs.cache-hit != 'true' }}
        run: npm ci --no-audit --progress=false --silent

      - name: CI
        shell: bash
        run: |
          source "$HOME/.rye/env"
          npm run check \
            -w @glue-template/src-glue
          npm run test \
            -w @glue-template/src-glue

ソースを切り出して、説明します。

Glueのイメージは3G程度あり、push時に実行するとGitHub Actionsのクレジットを無駄に消費してしまうため、パスでフィルタしています。これにより、ワークスペース毎にCIを設定する構成にしています。

on:
  push:
    paths:
      - 'packages/src-glue/**'
      - '.github/workflows/ci-glue.yml'

コンテナの容量が重いため、コンテナイメージをキャッシュしています。キャッシュが見つかっても、キャッシュからpullして展開する時間がかかるので、結構ばらつきはありますが、9分程度が3分程度になったケースもあるので、入れる価値はあります。

      - name: Cache a GlueLib Docker image
        id: cache-python-packages
        uses: actions/cache@v4
        with:
          path: ${{ env.GLUE_LIB_CACHE_PATH }}
          key: ${{ runner.os }}-glue-libs-${{ env.GLUE_LIB_VERSION }}

      - name: Pull and save a GlueLib Docker image
        if: steps.cache-python-packages.outputs.cache-hit != 'true'
        run: |
          docker pull amazon/aws-glue-libs:glue_libs_${GLUE_LIB_VERSION}_image_01
          docker save amazon/aws-glue-libs:glue_libs_${GLUE_LIB_VERSION}_image_01 -o ${GLUE_LIB_CACHE_PATH}

      - name: Load GlueLib Docker Image
        run: |
          docker load -i ${GLUE_LIB_CACHE_PATH}

      - name: lunch container
        run: |
          cd packages/src-glue/
          docker-compose up -d

Ryeをインストールしています。GitHub Actionsがあればそちらを使ってもいいと思います。Ryeの0.24.0でuv実験的に導入され、本シェルで有効化しています。24秒かかっていたパッケージのインストール処理が4秒程度で終わるようになりました。パッケージ系は、GitHub Actions上ではキャッシュされるのでローカルでインストールする機会が多いほど恩恵が大きいと思います。

      - name: Install Rye
        run: |
          ./setup-rye.sh
#/bin/bash
# Ryeのバージョン固定
RYE_VERSION=0.24.0
curl -sSf https://rye-up.com/get | RYE_INSTALL_OPTION="--yes" bash
source "$HOME/.rye/env"
# uvの有効化
rye config --set-bool behavior.use-uv=true

テストの実行

モジュールをインストールします。

# nodeのパッケージインストール
npm ci
# pythonのパッケージインストール
npm run deps

テストを実行し、通っていることが確認できました!コンソールから実行すると、90秒くらいかかるので13秒は早くて便利ですね!

# テスト実行
$ cd packages/src-glue/
$ finch compose up -d
$ npm run test

> test
> ./run_test.sh

WARN[0000] treating lima version "4ea0a83" from "/Applications/Finch/lima/data/finch/lima-version" as very latest release
============================= test session starts ==============================
platform linux -- Python 3.10.2, pytest-7.2.1, pluggy-1.3.0
rootdir: /home/glue_user/workspace/jupyter_workspace, configfile: pyproject.toml, testpaths: tests
plugins: anyio-4.0.0
collected 1 item

tests/test_convert_fruits_job.py .                                       [100%]

=============================== warnings summary ===============================
tests/test_convert_fruits_job.py::TestConvertFruitsJob::test_run_job
  /home/glue_user/spark/python/pyspark/sql/context.py:112: FutureWarning: Deprecated in 3.0.0. Use SparkSession.builder.getOrCreate() instead.
    warnings.warn(

tests/test_convert_fruits_job.py::TestConvertFruitsJob::test_run_job
  /home/glue_user/spark/python/pyspark/sql/dataframe.py:127: UserWarning: DataFrame constructor is internal. Do not directly use it.
    warnings.warn("DataFrame constructor is internal. Do not directly use it.")

-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
======================== 1 passed, 2 warnings in 13.53s ========================

さいごに

GlueのCIについて解説しました。今取り組んでいるプロジェクトではこの構成をとっています。動作確認のサイクルが高速になったことや何よりチームでの開発するGlueジョブスクリプトが被っても、動作確認がスケールするようになったので、絶大な効果を感じています。また本テンプレートでは、biomeRyeをフル活用しているので、そちらも参考にして頂き、もっと良い静的解析ルールや、手法があればissue@shuntaka_jpまで頂ければありがたいです!

参考

付属しているサンプルコードを含めて参考にさせて頂きました!コンテナのバージョンなどはできるだけ最新にして今回記事にしております!

この記事をシェアする

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.