Google Cloud の Batch を触ってみました(ローカル SSD 利用編)

2023.06.09

こんにちは、川田です。以前、Google Cloud の Batch を利用してみた記事を投稿しました。以下のものです。

DevelopersIO | Google Cloud の Batch を触ってみました

今回は、ローカル SSD をアタッチさせた環境にて Batch を利用してみます。

実施してみたこと

以前の記事では、以下のようなことを実施しました。

Cloud Storage 上に置かれたファイルをダウンロードして、zip 圧縮し、Cloud Storage 上に置き直す、という単純な Batch ジョブを作成しています。作業時、以下の設定/実行内容について確認しています。

  • 利用する Compute Engine をコンフィグファイルにて管理する
  • Compute Engine には外部 IP アドレスを付与せず、限定公開の Google アクセスを利用させる
  • ジョブで実行されるコードは、コンテナにて定義する
  • コンテナ環境変数を利用する
  • 実行されるコンテナ上プロセスに引数を渡す
  • ジョブの実行を gcloud コマンドにて行う

Cloud Storage よりダウンロードしたファイルは、コンテナ上の /tmp 配下に置いていました。/tmp の領域は tmpfs ですから、思い返すと、これは少し片手落ちだったと考えています。そこで、ファイルをダウンロードするパスを、コンテナに bind mount したローカル SSD の領域として、必要な作業を確認してみようと思います。

なお今回はジョブの実行を、gcloud コマンドではなく、Python のクライアントライブラリを利用しています(理由は後述)。

環境

  • google-cloud-batch 0.11.0

事前の準備

必要となる諸々の準備を行います。

VPC を作成

Compute Engine を起動させる VPC を作成します。

$ gcloud compute networks create batch-vpc --subnet-mode=custom

gcloud compute networks create

サブネットを作成します。前述の通り、作成するサブネットは限定公開の google アクセスを有効にしておきます。

$ gcloud compute networks subnets create batch-subnet \
--region=us-central1 \
--network=batch-vpc \
--range=10.10.0.0/24 \
--enable-private-ip-google-access

gcloud compute networks subnets create

コンテナイメージを作成

ジョブとして実行するコンテナイメージを作成します。

以下がディレクトリ構成です。

.
├── app
│   └── main.py
├── Dockerfile
└── requirements.txt

app/main.py

前述の通り、「Cloud Storage 上に置かれたファイルをダウンロードして、zip 圧縮し、Cloud Storage 上に置き直す」という Python のコードです。アクセスする Cloud Storage のバケット名をコンテナ環境変数より取得します。ダウンロードするファイル名を、実行引数より受け取るようにしています。ファイルをダウロードするパスを /mnt/batch/sample としています。ここがローカル SSD がマウントされたパスとなります。

import argparse
import os
import zipfile

from google.cloud import storage

TMPDIR = "/mnt/batch/sample"
BUCKET_NAME = os.environ["BUCKET_NAME"]

gcs_client = storage.Client()

def main(args: argparse.Namespace):
    os.makedirs(TMPDIR)
    bucket = gcs_client.bucket(BUCKET_NAME)

    dl_blob = bucket.blob(f"batch/dl/{args.file_name}")
    dl_blob.download_to_filename(f"{TMPDIR}/{args.file_name}")

    with zipfile.ZipFile(f"{TMPDIR}/{args.file_name}.zip", "w", compression=zipfile.ZIP_DEFLATED, compresslevel=3) as zf:
        zf.write(f"{TMPDIR}/{args.file_name}", arcname=args.file_name)

    ul_blob = bucket.blob(f"batch/ul/{args.file_name}.zip")
    ul_blob.upload_from_filename(f"{TMPDIR}/{args.file_name}.zip")


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--file_name", type=str, required=False)
    args = parser.parse_args()

    main(args)

Dockerfile

FROM python:3.11.3-slim
WORKDIR /
COPY . .
RUN pip install --no-cache-dir --requirement requirements.txt
ENTRYPOINT ["python", "-m", "app.main"]

requirements.txt

google-cloud-storage==2.9.0

Artifact Repository に新規リポジトリを作成して、

$ gcloud artifacts repositories create batch \
--repository-format=docker \
--location=us-central1 \
--description="sample"

gcloud artifacts repositories create

Cloud Build にてビルドします。

$ gcloud builds submit --tag us-central1-docker.pkg.dev/PROJECT_ID/batch/app .

gcloud builds submit

サービスアカウントを作成

Compute Engine に付与するサービスアカウントを作成します。

$ gcloud iam service-accounts create sa-batch --display-name="Batch"

gcloud iam service-accounts create

作成したサービスアカウントに、必要となる事前定義ロールを付与します。

$ gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:sa-batch@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/logging.logWriter"
$ gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:sa-batch@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/artifactregistry.reader"
$ gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:sa-batch@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/batch.agentReporter"
$ gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:sa-batch@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/storage.objectAdmin"

gcloud projects add-iam-policy-binding

以下の事前定義ロールを利用しています。

ロール名 付与理由
roles/logging.logWriter Cloud Logging 出力用
roles/artifactregistry.reader コンテナイメージの pull 用
roles/batch.agentReporter Compute Engine が Batch のサービスエージェントをやり取りする際に必要となる
roles/storage.objectAdmin コンテナ内のコードで Cloud Storage にアクセスするため

コンテナジョブを利用する場合、上位3つのロールは必須になるとものと考えています。

Batch ジョブを実行

Batch のジョブを実行するコードを、クライアントライブリを利用して Python で記述します。

import argparse
import datetime
import time

from zoneinfo import ZoneInfo

from google.cloud import batch_v1

PROJECT_ID = "PROJECT_ID"
REGION = "us-central1"
BUCKET_NAME = "BUCKET_NAME"

def submit_batch(args: argparse.Namespace):
    # https://cloud.google.com/python/docs/reference/batch/latest/google.cloud.batch_v1.services.batch_service
    client = batch_v1.BatchServiceClient()

    # runnable (実行するコンテナ情報)を定義
    # https://cloud.google.com/python/docs/reference/batch/latest/google.cloud.batch_v1.types.Runnable
    runnable = batch_v1.Runnable()
    runnable.container = batch_v1.Runnable.Container()
    runnable.container.image_uri = f"us-central1-docker.pkg.dev/{PROJECT_ID}/batch/app:latest"
    runnable.container.entrypoint = "python"
    runnable.container.commands = ["-m", "app.main", "--file_name", args.file_name]
    runnable.container.volumes = ["/mnt/disks/batch:/mnt/batch"]

    # タスク毎に必要となるリソースサイズを定義
    # https://cloud.google.com/python/docs/reference/batch/latest/google.cloud.batch_v1.types.ComputeResource
    resources = batch_v1.ComputeResource()
    resources.cpu_milli = 4000
    resources.memory_mib = 16000

    # GCE にマウントするディスクを定義
    # https://cloud.google.com/python/docs/reference/batch/latest/google.cloud.batch_v1.types.Volume
    volume = batch_v1.Volume()
    volume.device_name = "disk-123"
    volume.mount_path = "/mnt/disks/batch"
    volume.mount_options = ["rw", "async"]

    # タスクを定義
    # https://cloud.google.com/python/docs/reference/batch/latest/google.cloud.batch_v1.types.TaskSpec
    task = batch_v1.TaskSpec()
    task.runnables = [runnable]
    task.compute_resource = resources
    task.max_run_duration = "1800s"
    task.max_retry_count = 1
    task.volumes = [volume]

    # コンテナで利用する環境変数を定義
    # https://cloud.google.com/python/docs/reference/batch/latest/google.cloud.batch_v1.types.Environment
    environment = batch_v1.Environment()
    environment.variables = {"BUCKET_NAME": BUCKET_NAME}

    # タスクグループを定義
    # https://cloud.google.com/python/docs/reference/batch/latest/google.cloud.batch_v1.types.TaskGroup
    group = batch_v1.TaskGroup()
    group.task_spec = task
    group.task_count = 1
    group.parallelism = 1
    group.task_environments = [environment]

    # インスタンスポリシー(利用する GCE インスタンス)を定義
    # https://cloud.google.com/python/docs/reference/batch/latest/google.cloud.batch_v1.types.AllocationPolicy.InstancePolicy
    policy = batch_v1.AllocationPolicy.InstancePolicy()
    policy.machine_type = "c2-standard-4"
    policy.provisioning_model = batch_v1.AllocationPolicy.ProvisioningModel.STANDARD
    policy.boot_disk = batch_v1.AllocationPolicy.Disk(image="batch-cos", type_="pd-standard", size_gb=30)
    policy.disks = [batch_v1.AllocationPolicy.AttachedDisk(
        new_disk=batch_v1.AllocationPolicy.Disk(type_="local-ssd", size_gb=375, disk_interface="NVMe"),
        device_name="disk-123"
    )]

    # インスタンスポリシーの利用を宣言
    # https://cloud.google.com/python/docs/reference/batch/latest/google.cloud.batch_v1.types.AllocationPolicy.InstancePolicyOrTemplate
    instances = batch_v1.AllocationPolicy.InstancePolicyOrTemplate()
    instances.policy = policy

    # 利用するネットワーク情報を設定
    # https://cloud.google.com/python/docs/reference/batch/latest/google.cloud.batch_v1.types.AllocationPolicy.NetworkPolicy
    network = batch_v1.AllocationPolicy.NetworkPolicy()
    network.network_interfaces = [batch_v1.AllocationPolicy.NetworkInterface(
        network=f"projects/{PROJECT_ID}/global/networks/batch-vpc",
        subnetwork=f"projects/{PROJECT_ID}/regions/{REGION}/subnetworks/batch-subnet",
        no_external_ip_address=True
    )]

    # アロケーション・ポリシーを定義
    # https://cloud.google.com/python/docs/reference/batch/latest/google.cloud.batch_v1.types.AllocationPolicy
    allocation_policy = batch_v1.AllocationPolicy()
    allocation_policy.location = batch_v1.AllocationPolicy.LocationPolicy(allowed_locations=["zones/us-central1-c"])
    allocation_policy.instances = [instances]
    allocation_policy.service_account = batch_v1.ServiceAccount(email=f"sa-batch@{PROJECT_ID}.iam.gserviceaccount.com")
    allocation_policy.network = network

    # 作成するジョブを定義
    # https://cloud.google.com/python/docs/reference/batch/latest/google.cloud.batch_v1.types.Job
    job = batch_v1.Job()
    job.task_groups = [group]
    job.allocation_policy = allocation_policy
    job.logs_policy = batch_v1.LogsPolicy()
    job.logs_policy.destination = batch_v1.LogsPolicy.Destination.CLOUD_LOGGING

    # ジョブを実行
    # https://cloud.google.com/python/docs/reference/batch/latest/google.cloud.batch_v1.services.batch_service.BatchServiceClient#google_cloud_batch_v1_services_batch_service_BatchServiceClient_create_job
    create_request = batch_v1.CreateJobRequest()
    create_request.job = job
    create_request.job_id = f"sample-job-{get_yyyymmddhhmm()}"
    create_request.parent = f"projects/{PROJECT_ID}/locations/{REGION}"
    created_job = client.create_job(create_request)

    # ジョブの実行結果を確認
    # https://cloud.google.com/python/docs/reference/batch/latest/google.cloud.batch_v1.services.batch_service.BatchServiceClient#google_cloud_batch_v1_services_batch_service_BatchServiceClient_get_job
    get_request = batch_v1.GetJobRequest()
    get_request.name = created_job.name
    while True:
        print(".", end="", flush=True)
        time.sleep(60)
        job = client.get_job(get_request)
        if job.status.state.name in ["SUCCEEDED", "FAILED"]:
            break

    print(f"job id:{created_job.name} status: {job.status.state.name}")


def get_yyyymmddhhmm() -> str:
    now = datetime.datetime.now(tz=ZoneInfo("Asia/Tokyo"))
    return now.strftime("%Y%m%d%H%M")


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--file_name", type=str, required=False)
    args = parser.parse_args()

    submit_batch(args)

公式ドキュメントの下記あたりを参考に記述しています。

Google Cloud | 基本コンテナジョブを作成する

GCE インスタンスに、boot disk のほかローカル SSD を /mnt/disks/batch パスにアタッチし、コンテナ上の /mnt/batch パスに bind mount しています。ローカル SSD へのフォーマットは不要で、google 側にて自動的に行ってくれるほか、ジョブ終了時には自動的に削除もしてくれます。

クライアントライブラリを利用して Batch ジョブを作成した理由

前回の投稿では、ジョブが利用する GCE インスタンスを定義した情報(上のコード内では allocation_policy にあたる部分)を記述した JSON のコンフィグファイルを用意し、実行するコンテナ情報(イメージや ENTRYPOINT、CMD の設定)は、gcloud beta batch jobs submit コマンド実行時のオプションとして指定し、ジョブを作成していました。今回も同じ方法が取れないか最初は考えたのですが、結局クライアントライブラリを採用しています。

その理由について記述します。

前回の方法では、GCE インスタンスに関する情報を JSON コンフィグファイルで管理し、実行するコンテナ情報を gcloud コマンドのオプションとして指定することで、実行したいコンテナの処理をなるべく柔軟に管理できるようする狙いがありました。gcloud コマンド側で、--container-image-uri--container-entrypoint といったオプションが用意されているためできた事ですが、今回の構成ではコンテナにローカル SSD を bind mount する情報も渡す必要があるものの、それに該当する gcloud コマンドのオプションが用意されていないため、同じ方法を取ることができませんでした。

JSON のコンフィグファイルに、ジョブに関する情報全てを記述して用意しておくのではなく、もう少し柔軟に Batch を利用できるようにしたい思いがあったため、クライアントライブラリを利用してジョブを作成する方針にしています。

なお、gcloud コマンドでの実行時に JSON のコンフィグファイルのパスを指定するほか、HereDoc を利用して JSON を渡すこともできます。

--config=CONFIG

The file path of the JSON job config JSON. It also supports direct input from stdin with '-' or HereDoc (in shells with HereDoc support like Bash) with '- <<DELIMITER'.

gcloud beta batch jobs submit

そのため HereDoc を用いる方法を利用すれば、gcloud コマンドにて実行する方法においても、ジョブの情報を柔軟に管理できると思っています。

HerDoc ではなく、クライアントライブラリで試してみたのは、僕のただの好みとなります。