こんにちは、川田です。以前、Google Cloud の Batch を利用してみた記事を投稿しました。以下のものです。
DevelopersIO | Google Cloud の Batch を触ってみました
今回は、ローカル SSD をアタッチさせた環境にて Batch を利用してみます。
実施してみたこと
以前の記事では、以下のようなことを実施しました。
Cloud Storage 上に置かれたファイルをダウンロードして、zip 圧縮し、Cloud Storage 上に置き直す、という単純な Batch ジョブを作成しています。作業時、以下の設定/実行内容について確認しています。
- 利用する Compute Engine をコンフィグファイルにて管理する
- Compute Engine には外部 IP アドレスを付与せず、限定公開の Google アクセスを利用させる
- ジョブで実行されるコードは、コンテナにて定義する
- コンテナ環境変数を利用する
- 実行されるコンテナ上プロセスに引数を渡す
- ジョブの実行を gcloud コマンドにて行う
Cloud Storage よりダウンロードしたファイルは、コンテナ上の /tmp 配下に置いていました。/tmp の領域は tmpfs ですから、思い返すと、これは少し片手落ちだったと考えています。そこで、ファイルをダウンロードするパスを、コンテナに bind mount したローカル SSD の領域として、必要な作業を確認してみようと思います。
なお今回はジョブの実行を、gcloud コマンドではなく、Python のクライアントライブラリを利用しています(理由は後述)。
環境
- google-cloud-batch 0.11.0
事前の準備
必要となる諸々の準備を行います。
VPC を作成
Compute Engine を起動させる VPC を作成します。
$ gcloud compute networks create batch-vpc --subnet-mode=custom
gcloud compute networks create
サブネットを作成します。前述の通り、作成するサブネットは限定公開の google アクセスを有効にしておきます。
$ gcloud compute networks subnets create batch-subnet \
--region=us-central1 \
--network=batch-vpc \
--range=10.10.0.0/24 \
--enable-private-ip-google-access
gcloud compute networks subnets create
コンテナイメージを作成
ジョブとして実行するコンテナイメージを作成します。
以下がディレクトリ構成です。
.
├── app
│ └── main.py
├── Dockerfile
└── requirements.txt
app/main.py
前述の通り、「Cloud Storage 上に置かれたファイルをダウンロードして、zip 圧縮し、Cloud Storage 上に置き直す」という Python のコードです。アクセスする Cloud Storage のバケット名をコンテナ環境変数より取得します。ダウンロードするファイル名を、実行引数より受け取るようにしています。ファイルをダウロードするパスを /mnt/batch/sample としています。ここがローカル SSD がマウントされたパスとなります。
import argparse
import os
import zipfile
from google.cloud import storage
TMPDIR = "/mnt/batch/sample"
BUCKET_NAME = os.environ["BUCKET_NAME"]
gcs_client = storage.Client()
def main(args: argparse.Namespace):
os.makedirs(TMPDIR)
bucket = gcs_client.bucket(BUCKET_NAME)
dl_blob = bucket.blob(f"batch/dl/{args.file_name}")
dl_blob.download_to_filename(f"{TMPDIR}/{args.file_name}")
with zipfile.ZipFile(f"{TMPDIR}/{args.file_name}.zip", "w", compression=zipfile.ZIP_DEFLATED, compresslevel=3) as zf:
zf.write(f"{TMPDIR}/{args.file_name}", arcname=args.file_name)
ul_blob = bucket.blob(f"batch/ul/{args.file_name}.zip")
ul_blob.upload_from_filename(f"{TMPDIR}/{args.file_name}.zip")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--file_name", type=str, required=False)
args = parser.parse_args()
main(args)
Dockerfile
FROM python:3.11.3-slim
WORKDIR /
COPY . .
RUN pip install --no-cache-dir --requirement requirements.txt
ENTRYPOINT ["python", "-m", "app.main"]
requirements.txt
google-cloud-storage==2.9.0
Artifact Repository に新規リポジトリを作成して、
$ gcloud artifacts repositories create batch \
--repository-format=docker \
--location=us-central1 \
--description="sample"
gcloud artifacts repositories create
Cloud Build にてビルドします。
$ gcloud builds submit --tag us-central1-docker.pkg.dev/PROJECT_ID/batch/app .
サービスアカウントを作成
Compute Engine に付与するサービスアカウントを作成します。
$ gcloud iam service-accounts create sa-batch --display-name="Batch"
gcloud iam service-accounts create
作成したサービスアカウントに、必要となる事前定義ロールを付与します。
$ gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:sa-batch@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/logging.logWriter"
$ gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:sa-batch@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/artifactregistry.reader"
$ gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:sa-batch@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/batch.agentReporter"
$ gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:sa-batch@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/storage.objectAdmin"
gcloud projects add-iam-policy-binding
以下の事前定義ロールを利用しています。
ロール名 | 付与理由 |
---|---|
roles/logging.logWriter | Cloud Logging 出力用 |
roles/artifactregistry.reader | コンテナイメージの pull 用 |
roles/batch.agentReporter | Compute Engine が Batch のサービスエージェントをやり取りする際に必要となる |
roles/storage.objectAdmin | コンテナ内のコードで Cloud Storage にアクセスするため |
コンテナジョブを利用する場合、上位3つのロールは必須になるとものと考えています。
Batch ジョブを実行
Batch のジョブを実行するコードを、クライアントライブリを利用して Python で記述します。
import argparse
import datetime
import time
from zoneinfo import ZoneInfo
from google.cloud import batch_v1
PROJECT_ID = "PROJECT_ID"
REGION = "us-central1"
BUCKET_NAME = "BUCKET_NAME"
def submit_batch(args: argparse.Namespace):
# https://cloud.google.com/python/docs/reference/batch/latest/google.cloud.batch_v1.services.batch_service
client = batch_v1.BatchServiceClient()
# runnable (実行するコンテナ情報)を定義
# https://cloud.google.com/python/docs/reference/batch/latest/google.cloud.batch_v1.types.Runnable
runnable = batch_v1.Runnable()
runnable.container = batch_v1.Runnable.Container()
runnable.container.image_uri = f"us-central1-docker.pkg.dev/{PROJECT_ID}/batch/app:latest"
runnable.container.entrypoint = "python"
runnable.container.commands = ["-m", "app.main", "--file_name", args.file_name]
runnable.container.volumes = ["/mnt/disks/batch:/mnt/batch"]
# タスク毎に必要となるリソースサイズを定義
# https://cloud.google.com/python/docs/reference/batch/latest/google.cloud.batch_v1.types.ComputeResource
resources = batch_v1.ComputeResource()
resources.cpu_milli = 4000
resources.memory_mib = 16000
# GCE にマウントするディスクを定義
# https://cloud.google.com/python/docs/reference/batch/latest/google.cloud.batch_v1.types.Volume
volume = batch_v1.Volume()
volume.device_name = "disk-123"
volume.mount_path = "/mnt/disks/batch"
volume.mount_options = ["rw", "async"]
# タスクを定義
# https://cloud.google.com/python/docs/reference/batch/latest/google.cloud.batch_v1.types.TaskSpec
task = batch_v1.TaskSpec()
task.runnables = [runnable]
task.compute_resource = resources
task.max_run_duration = "1800s"
task.max_retry_count = 1
task.volumes = [volume]
# コンテナで利用する環境変数を定義
# https://cloud.google.com/python/docs/reference/batch/latest/google.cloud.batch_v1.types.Environment
environment = batch_v1.Environment()
environment.variables = {"BUCKET_NAME": BUCKET_NAME}
# タスクグループを定義
# https://cloud.google.com/python/docs/reference/batch/latest/google.cloud.batch_v1.types.TaskGroup
group = batch_v1.TaskGroup()
group.task_spec = task
group.task_count = 1
group.parallelism = 1
group.task_environments = [environment]
# インスタンスポリシー(利用する GCE インスタンス)を定義
# https://cloud.google.com/python/docs/reference/batch/latest/google.cloud.batch_v1.types.AllocationPolicy.InstancePolicy
policy = batch_v1.AllocationPolicy.InstancePolicy()
policy.machine_type = "c2-standard-4"
policy.provisioning_model = batch_v1.AllocationPolicy.ProvisioningModel.STANDARD
policy.boot_disk = batch_v1.AllocationPolicy.Disk(image="batch-cos", type_="pd-standard", size_gb=30)
policy.disks = [batch_v1.AllocationPolicy.AttachedDisk(
new_disk=batch_v1.AllocationPolicy.Disk(type_="local-ssd", size_gb=375, disk_interface="NVMe"),
device_name="disk-123"
)]
# インスタンスポリシーの利用を宣言
# https://cloud.google.com/python/docs/reference/batch/latest/google.cloud.batch_v1.types.AllocationPolicy.InstancePolicyOrTemplate
instances = batch_v1.AllocationPolicy.InstancePolicyOrTemplate()
instances.policy = policy
# 利用するネットワーク情報を設定
# https://cloud.google.com/python/docs/reference/batch/latest/google.cloud.batch_v1.types.AllocationPolicy.NetworkPolicy
network = batch_v1.AllocationPolicy.NetworkPolicy()
network.network_interfaces = [batch_v1.AllocationPolicy.NetworkInterface(
network=f"projects/{PROJECT_ID}/global/networks/batch-vpc",
subnetwork=f"projects/{PROJECT_ID}/regions/{REGION}/subnetworks/batch-subnet",
no_external_ip_address=True
)]
# アロケーション・ポリシーを定義
# https://cloud.google.com/python/docs/reference/batch/latest/google.cloud.batch_v1.types.AllocationPolicy
allocation_policy = batch_v1.AllocationPolicy()
allocation_policy.location = batch_v1.AllocationPolicy.LocationPolicy(allowed_locations=["zones/us-central1-c"])
allocation_policy.instances = [instances]
allocation_policy.service_account = batch_v1.ServiceAccount(email=f"sa-batch@{PROJECT_ID}.iam.gserviceaccount.com")
allocation_policy.network = network
# 作成するジョブを定義
# https://cloud.google.com/python/docs/reference/batch/latest/google.cloud.batch_v1.types.Job
job = batch_v1.Job()
job.task_groups = [group]
job.allocation_policy = allocation_policy
job.logs_policy = batch_v1.LogsPolicy()
job.logs_policy.destination = batch_v1.LogsPolicy.Destination.CLOUD_LOGGING
# ジョブを実行
# https://cloud.google.com/python/docs/reference/batch/latest/google.cloud.batch_v1.services.batch_service.BatchServiceClient#google_cloud_batch_v1_services_batch_service_BatchServiceClient_create_job
create_request = batch_v1.CreateJobRequest()
create_request.job = job
create_request.job_id = f"sample-job-{get_yyyymmddhhmm()}"
create_request.parent = f"projects/{PROJECT_ID}/locations/{REGION}"
created_job = client.create_job(create_request)
# ジョブの実行結果を確認
# https://cloud.google.com/python/docs/reference/batch/latest/google.cloud.batch_v1.services.batch_service.BatchServiceClient#google_cloud_batch_v1_services_batch_service_BatchServiceClient_get_job
get_request = batch_v1.GetJobRequest()
get_request.name = created_job.name
while True:
print(".", end="", flush=True)
time.sleep(60)
job = client.get_job(get_request)
if job.status.state.name in ["SUCCEEDED", "FAILED"]:
break
print(f"job id:{created_job.name} status: {job.status.state.name}")
def get_yyyymmddhhmm() -> str:
now = datetime.datetime.now(tz=ZoneInfo("Asia/Tokyo"))
return now.strftime("%Y%m%d%H%M")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--file_name", type=str, required=False)
args = parser.parse_args()
submit_batch(args)
公式ドキュメントの下記あたりを参考に記述しています。
GCE インスタンスに、boot disk のほかローカル SSD を /mnt/disks/batch パスにアタッチし、コンテナ上の /mnt/batch パスに bind mount しています。ローカル SSD へのフォーマットは不要で、google 側にて自動的に行ってくれるほか、ジョブ終了時には自動的に削除もしてくれます。
クライアントライブラリを利用して Batch ジョブを作成した理由
前回の投稿では、ジョブが利用する GCE インスタンスを定義した情報(上のコード内では allocation_policy にあたる部分)を記述した JSON のコンフィグファイルを用意し、実行するコンテナ情報(イメージや ENTRYPOINT、CMD の設定)は、gcloud beta batch jobs submit
コマンド実行時のオプションとして指定し、ジョブを作成していました。今回も同じ方法が取れないか最初は考えたのですが、結局クライアントライブラリを採用しています。
その理由について記述します。
前回の方法では、GCE インスタンスに関する情報を JSON コンフィグファイルで管理し、実行するコンテナ情報を gcloud コマンドのオプションとして指定することで、実行したいコンテナの処理をなるべく柔軟に管理できるようする狙いがありました。gcloud コマンド側で、--container-image-uri
や --container-entrypoint
といったオプションが用意されているためできた事ですが、今回の構成ではコンテナにローカル SSD を bind mount する情報も渡す必要があるものの、それに該当する gcloud コマンドのオプションが用意されていないため、同じ方法を取ることができませんでした。
JSON のコンフィグファイルに、ジョブに関する情報全てを記述して用意しておくのではなく、もう少し柔軟に Batch を利用できるようにしたい思いがあったため、クライアントライブラリを利用してジョブを作成する方針にしています。
なお、gcloud コマンドでの実行時に JSON のコンフィグファイルのパスを指定するほか、HereDoc を利用して JSON を渡すこともできます。
--config=CONFIG
The file path of the JSON job config JSON. It also supports direct input from stdin with '-' or HereDoc (in shells with HereDoc support like Bash) with '- <<DELIMITER'.
そのため HereDoc を用いる方法を利用すれば、gcloud コマンドにて実行する方法においても、ジョブの情報を柔軟に管理できると思っています。
HerDoc ではなく、クライアントライブラリで試してみたのは、僕のただの好みとなります。