Cloud Functions から Cloud Scheduler ジョブを作成/更新して、1つ目の関数実行から一定時間経過後に次の関数を実行してみた。

Cloud Functions から Cloud Scheduler ジョブを作成/更新して、1つ目の関数実行から一定時間経過後に次の関数を実行してみた。

Clock Icon2023.01.28

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、みかみです。

10年にいちどの寒波とも言われている今週は、さすがに沖縄も寒かったです。。(これが終われば春が来るはず!?

やりたいこと

  • 1つ目の Cloud Functions 実行から一定時間経過後に、2つ目の Cloud Functions を実行したい
  • 1つ目の Cloud Functions で Cloud Scheduler ジョブを作成/更新したい

イベント駆動で実行したいけど、次の処理実行までにある程度時間を空けたい。

数分の待機時間であれば、簡単に思いつくのは元の処理で sleep して一定時間待機する方法ですが、Cloud Functions を使う場合実行時間制限があるし、せっかくのサーバレスフレームワーク、sleep でリソース使い続けるのももったいない。。

イベント駆動で実行する1つ目の Cloud Functions で Cloud Scheduler ジョブを作成/更新して、2つ目の Cloud Functions を実行してみます。 1つ目の Cloud Functions は前回のブログ で使用した、BigQuery へのデータ INSERT をトリガに実行する関数です。 BigQuery へのデータ INSERT が行われてから一定時間後に、次の Cloud Functions を実行します。

前提

Google Cloud SDK(gcloud コマンド)の実行環境は準備済みであるものとします。 本エントリでは、Cloud Shell を使用しました。

以下のコマンドで、Cloud Scheduler の Python ライブラリをインストール済みです。

pip install google-cloud-scheduler

Eventarc API や、本エントリで利用している BigQuery, Cloud Functions, Cloud Build などの API は有効化済みで、操作するアカウントに各設定に必要な権限は付与済みです。 動作確認では、両方のプロジェクトでプロジェクトオーナーロールを付与したアカウントを使用しています。。

また、Cloud Functions デフォルトサービスアカウント([PROJECT_NUMBER]-compute@developer.gserviceaccount.com)に、Eventarc、Pub/Sub および Cloud Scheduler の操作権限を付与済みです。

Pub/Sub トピックを作成

Cloud Scheduler がメッセージを Publish する Pub/Sub トピックを作成します。

Cloud Shell から以下のコマンドを実行しました。

gcloud pubsub topics create sample_scheduler_topic

1つ目の Cloud Functions に Cloud Scheduler ジョブ作成/更新処理を追加

前回 デプロイした、BigQuery のデータ INSERT で実行される Cloud Functions に、Cloud Scheduler ジョブ作成/更新処理を追加しました。

import base64
import json
import functions_framework

from google.cloud import scheduler_v1
from google.protobuf import field_mask_pb2
import datetime
from pytz import timezone

@functions_framework.cloud_event
def fetch_insert_log_sync(cloud_event):
    message = json.loads(base64.b64decode(cloud_event.data["message"]["data"]))
    print(message)
    protoPayload = message.get('protoPayload', None)
    metadata = protoPayload.get('metadata', None)
    tableDataChange = metadata.get('tableDataChange', None)
    if tableDataChange is None:
        print('no tableDataChange...')
        return
    insertedRowsCount = tableDataChange.get('insertedRowsCount', None)
    if insertedRowsCount is None:
        print('no INSERT data...')

    resourceName = protoPayload.get('resourceName', None)
    project = resourceName.split('/')[1]
    dataset = resourceName.split('/')[3]
    table = resourceName.split('/')[5]
    print(f'**** insert data: {project}.{dataset}.{table}({insertedRowsCount} rows) ****')

    job_name = f'projects/cm-da-mikami-yuki-258308/locations/asia-northeast1/jobs/{dataset}_{table}'
    now = datetime.datetime.now(timezone('Asia/Tokyo'))
    schedule = now + datetime.timedelta(minutes=5)
    hour = schedule.hour
    minute = schedule.minute
    schedule = f'{str(minute).zfill(2)} {str(hour).zfill(2)} * * *'
    client = scheduler_v1.CloudSchedulerClient()
    try:
        request = scheduler_v1.GetJobRequest(
            name=job_name
        )
        job = client.get_job(request=request)
        job.schedule = schedule
        param = f'{{"params" : {{"dataset": "{dataset}", "table": "{table}", "status": "updated"}}}}'
        job.pubsub_target = scheduler_v1.types.PubsubTarget(
            topic_name='projects/cm-da-mikami-yuki-258308/topics/sample_scheduler_topic',
            data=bytes(param, 'utf-8')
        )
        update_mask = field_mask_pb2.FieldMask(paths=['schedule', 'pubsub_target'])
        request = scheduler_v1.UpdateJobRequest(
            job=job,
            update_mask=update_mask
        )
        response = client.update_job(request=request)
        print(response)
    except:
        print('No Job.')
        job = scheduler_v1.Job()
        job.name = job_name
        job.description = 'test_schedule'
        param = f'{{"params" : {{"dataset": "{dataset}", "table": "{table}", "status": "create"}}}}'
        job.pubsub_target = scheduler_v1.types.PubsubTarget(
            topic_name='projects/cm-da-mikami-yuki-258308/topics/sample_scheduler_topic',
            data=bytes(param, 'utf-8')
        )
        job.schedule = schedule
        job.time_zone = 'Asia/Tokyo'
        request = scheduler_v1.CreateJobRequest(
            parent='projects/cm-da-mikami-yuki-258308/locations/asia-northeast1',
            job=job
        )
        response = client.create_job(request=request)
        print(response)

データ INSERT イベントで通知された [データセット名]_[テーブル名] という名前の Cloud Scheduler ジョブが既にあるかどうか確認し

    job_name = f'projects/cm-da-mikami-yuki-258308/locations/asia-northeast1/jobs/{dataset}_{table}}'
(省略)
    client = scheduler_v1.CloudSchedulerClient()
    try:
        request = scheduler_v1.GetJobRequest(
            name=job_name
        )
        job = client.get_job(request=request)
(省略)
    except:
(省略)

同名の Scheduler ジョブが既にある場合は、実行スケジュールを今から5分後に、Scheduler 実行パラメータの status 値を updated に更新します。

(省略)
    now = datetime.datetime.now(timezone('Asia/Tokyo'))
    schedule = now + datetime.timedelta(minutes=5)
    hour = schedule.hour
    minute = schedule.minute
    schedule = f'{str(minute).zfill(2)} {str(hour).zfill(2)} * * *'
    client = scheduler_v1.CloudSchedulerClient()
    try:
(省略)
        job.schedule = schedule
        param = f'{{"params" : {{"dataset": "{dataset}", "table": "{table}", "status": "updated"}}}}'
        job.pubsub_target = scheduler_v1.types.PubsubTarget(
            topic_name='projects/cm-da-mikami-yuki-258308/topics/sample_scheduler_topic',
            data=bytes(param, 'utf-8')
        )
        update_mask = field_mask_pb2.FieldMask(paths=['schedule', 'pubsub_target'])
        request = scheduler_v1.UpdateJobRequest(
            job=job,
            update_mask=update_mask
        )
        response = client.update_job(request=request)
        print(response)
(省略)

同名の Scheduler ジョブが無い場合には、今から5分後に Scheduler パラメータ statuscreated で実行するジョブを新規作成します。

(省略)
    now = datetime.datetime.now(timezone('Asia/Tokyo'))
    schedule = now + datetime.timedelta(minutes=5)
    hour = schedule.hour
    minute = schedule.minute
    schedule = f'{str(minute).zfill(2)} {str(hour).zfill(2)} * * *'
    client = scheduler_v1.CloudSchedulerClient()
    try:
(省略)
    except:
        print('No Job.')
        job = scheduler_v1.Job()
        job.name = job_name
        job.description = 'test_schedule'
        param = f'{{"params" : {{"dataset": "{dataset}", "table": "{table}", "status": "created"}}}}'
        job.pubsub_target = scheduler_v1.types.PubsubTarget(
            topic_name='projects/cm-da-mikami-yuki-258308/topics/sample_scheduler_topic',
            data=bytes(param, 'utf-8')
        )
        job.schedule = schedule
        job.time_zone = 'Asia/Tokyo'
        request = scheduler_v1.CreateJobRequest(
            parent='projects/cm-da-mikami-yuki-258308/locations/asia-northeast1',
            job=job
        )
        response = client.create_job(request=request)
        print(response)

合わせて、以下の requirements.txt も追加します。

google-cloud-scheduler>=2.9.0
pytz>=2022.1

以下のコマンドで修正コードをデプロイしました。

gcloud functions deploy fetch-insert-log-sync \
    --gen2 \
    --region asia-northeast1 \
    --runtime python310 \
    --entry-point=fetch_insert_log_sync \
    --trigger-topic="audit_log_sync" \
    --trigger-location=asia-northeast1

2つ目の Cloud Functions を追加

1つ目の Cloud Functions が作成/更新した Cloud Scheduler ジョブから実行される、2つ目の Cloud Functions をデプロイします。

処理内容は、Scheduler から渡されたパラメータの内容をログ出力するだの簡易なものです。

以下の python コードを、main.py というファイル名で保存しました。

import base64
import json
import functions_framework

@functions_framework.cloud_event
def print_pubsub_message(cloud_event):
   message = json.loads(base64.b64decode(cloud_event.data["message"]["data"]))
   param = message.get("params")
   print(param.get("dataset"))
   print(param.get("table"))
   print(param.get("status"))

以下のコマンドでデプロイしました。

gcloud functions deploy sample-scheduler-function \
    --gen2 \
    --region asia-northeast1 \
    --runtime python310 \
    --entry-point=print_pubsub_message \
    --trigger-topic="sample_scheduler_topic" \
    --trigger-location=asia-northeast1

動作確認

Scheduler ジョブ新規作成

前回 同様、BigQuery INSERT イベントを発行して、1つ目の Cloud Functions を実行しました。

ログを確認すると

正常に Cloud Scheduler ジョブ作成できたようです。

管理コンソールからも、Cloud Scheduler ジョブが作成されていることが確認できました。

Scheduler で指定した時間になるのを待って、2つ目の Cloud Functions が実行されたか確認してみます。

期待通り、1つ目の Cloud Functions で作成した Cloud Scheduler ジョブで指定した時間に、2つ目の Cloud Functions が実行されたことが確認できました。

Scheduler ジョブ更新

続いて、既に同名の Cloud Scheduler ジョブがある状態で、実行時間とパラメータが更新できるか確認してみます。

先ほど同様、BigQuery INSERT イベントを発行して、1つ目の Cloud Functions を実行しました。

ログから Cloud Scheduler ジョブが正常に更新されたことが確認できました。

管理コンソールからも、Scheduler ジョブの実行時間が更新されたことが確認できます。

更新した Scheduler 実行時間まで待って、2つ目の Cloud Functions のログも確認してみます。

期待通り、更新時間に、更新パラメータで実行されたことが確認できました。

まとめ(所感)

これまで、Cloud Scheduler ジョブはあらかじめ作成しておいて、Cloud Functions や Cloud Workflows のスケジュール実行トリガに使用していましたが、Scheduler ジョブをプログラムで作成/更新できれば、より実装の幅が広がるのではないかと思います。

例えば「前回の処理の正常終了から一定期間後」に次の処理を実行したい場合、スケジュール実行にしてしまうと、厳密には前回の処理実行時間分次の処理開始時間が早まってしまいますし、前回の処理が異常終了した場合でも次の処理が動いてしまいます。

イベント駆動とスケジュール実行を組み合わせることにより、より柔軟に対応できてうれしいと思いました。

参考

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.