Cloud Functionsの重複実行について、Firestoreを使った対策を考えてみた

Cloud Functionsの前回の実行情報をFirestoreに記録しておきます。あくまで案ですのでご参考までに。
2024.01.07

クラスメソッド株式会社データアナリティクス事業本部所属のニューシロです。
今回はGoogle CloudのサービスであるCloud Functionsの重複実行について、同じくGoogle CloudのサービスであるFirestoreを使った対策を考えてみました。

前提

Cloud Functionsの重複実行とは

Cloud Functionsのイベントトリガーを使用すると、特定のイベントの発生に対してCloud Functionsを起動させることができます。

しかし、イベントトリガーCloud Functionsは1回のイベントに対し2回以上起動してしまうことがあります。
重複実行が発生しても問題がないように、Cloud Functionsではベストプラクティスとしてべき等関数を作成することが挙げられています。

詳しくは以下ブログをご参照ください。

Firestoreとは

FirestoreとはGoogle Cloudが提供するNoSQLドキュメントデータベースです。
詳しくは以下ブログをご参照ください。

今回はこのFirestoreCloud Functionsを組み合わせて、Cloud Functionsの重複実行の対策を考えていきたいと思います。

本題

今回想定するCloud Functionsの実行ケース

今回は、Cloud SchedulerでCloud Functionsを定期実行するケースを考えます。

Cloud Schedulerは、フルマネージドのcronジョブサービスです。

Cloud Scheduler では、作業単位のスケジュールを設定して、定義した回数または一定の間隔で実行できます。これらの作業単位は、一般的に cron ジョブと呼ばれています。代表的な使い方としては、レポートメールを毎日送信する、10 分間隔でキャッシュ データを更新する、1 時間に 1 回要約情報を更新する、などがあります。

Cloud SchedulerでPub/Subへcronジョブを送信したことをトリガーに、Pub/SubトリガーCloud Functionsが起動する想定です。

方針①イベントIDで重複を判別する

Cloud Functions重複実行の対策について、方針を2つ考えてみます。
重複実行のパターンとして、まずPub/Sub側で重複が発生する場合があります。

Pub/Sub側で重複が発生する場合

図のように、イベントトリガーで用いられているPub/Sub pushサブスクリプションの性質によりメッセージを複数回配信してしまうことがあります。 この場合、重複して実行されるイベントは同じイベントIDを持つと考えられます。

よって、FirestoreにイベントIDを記録しておき、Cloud Functions実行時はFirestoreに同一イベントIDがあるかどうかを確認し、同一イベントIDがある場合はCloud Functionsの実行をスキップするといった方法が考えられます。

このイベントIDで重複を確認する方針は、公式ドキュメントにも記載があります。

コードとは関係なく、トランザクション チェックをサービスの外側に置きます。たとえば、指定されたイベント ID がすでに処理されたことを記録している場所の状態を保持します。

良い方針ですが、今回想定するケースはCloud Schedulerがイベント発生元ですので、その性質も考慮してみます。 今回使用するCloud Schedulerは、少なくとも 1 回実行されるcronジョブですので、Pub/Sub同様こちらも1回の実行で2回以上ジョブが実行されてしまう可能性があります。

Cloud Scheduler は、「少なくとも 1 回」を基本に処理を行うよう設計されています。つまり、ジョブはスケジュールされた実行ごとに少なくとも 1 回実行されます。まれに、スケジュールの 1 つのインスタンスに関連してジョブが複数回実行される可能性があるため、コードで繰り返し実行されても有害な副作用が生じないようにする必要があります。

このようにCloud Scheduler側で重複が発生する場合も考えてみましょう。

Cloud Scheduler側で重複が発生する場合

この場合、Pub/Sub側がそれぞれ別のメッセージと解釈するため、異なるイベントIDが付与されると考えられます。 そうすると、イベントIDで重複を確認する方法では対応できません。

異なるイベントIDに対しても重複実行の対応ができるよう、今回は別の方針を考えてみます。

方針②(今回の方針)Pub/Subがメッセージを受信した時点のタイムスタンプで重複を判別する

イベントトリガーCloud Functionsでは、イベントの送受信にPub/Subを使用しています。
Pub/Subメッセージには、Pub/Subがメッセージを受信した時点のタイムスタンプが付与されているため、こちらを利用する方針を考えてみます。

Pub/Subメッセージサンプル
  {
      "message": {
          "attributes": {
              "key": "value"
          },
          "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
          "messageId": "2070443601311540",
          "message_id": "2070443601311540",
          "publishTime": "2021-02-26T19:13:55.749Z",
          "publish_time": "2021-02-26T19:13:55.749Z"
      },
    "subscription": "projects/myproject/subscriptions/mysubscription"
  }

このpublish_time(publishTime)を利用して、前回のpublish_timeと今回のpublish_timeを比較し、短時間に複数メッセージが送信されている場合は2回目以降の実行はスキップするといった対策が考えられます。

時間差をコード上で計算することにより、イベントIDが同一の場合でも異なる場合でも対応できそうです。 今回はこちらの方針で考えてみたいと思います。

Firestore実装

データベース作成

以下の設定でデータベースを作成します。

  • モード:ネイティブモード
  • データベースID:(default)
  • 本番環境ルール

Firestoreの設定方法など、詳しくは以下ブログをご参照ください。

Cloud Functions実装

設定

今回利用したCloud Functionsの設定です。

  • 第2世代
  • Pub/Subトリガー
  • Python3.11

Pub/Subトリガーを用いるため、あらかじめトリガー元のPub/Subトピックも作成しておきます。
起動元であるCloud Schedulerも作成します。

サービスアカウントの権限

Cloud Functionsで使用したサービスアカウントには、roles/datastore.user(Cloud Datastore ユーザー)を付与してあります。

コード

Cloud Functionsに実装したコードです。

main.py

import base64
import os
from datetime import datetime, timedelta

import functions_framework
from google.cloud import firestore

MY_PROJECT_ID = os.getenv("MY_PROJECT_ID")  # 環境変数からプロジェクト名を取得


def is_duplicate_execution(current_publish_time: str, last_publish_time: str) -> bool:
    # datetime型へ変換
    current_publish_dt = datetime.strptime(current_publish_time, "%Y-%m-%dT%H:%M:%S.%fZ")
    last_publish_dt = datetime.strptime(last_publish_time, "%Y-%m-%dT%H:%M:%S.%fZ")

    # 時間差を計算し60秒以内ならTrueを返す
    time_diff = current_publish_dt - last_publish_dt
    print(f"time_diff: {time_diff}")
    max_diff_seconds = 60
    return time_diff <= timedelta(seconds=max_diff_seconds)


@functions_framework.cloud_event
def avoid_duplicate_execution(cloud_event) -> None:
    # Pub/Subメッセージ情報を取得
    pubsub_message = cloud_event.data["message"]
    publish_time = pubsub_message["publish_time"]

    # Firestoreから前回のPub/Subメッセージ情報を取得
    db = firestore.Client(project=MY_PROJECT_ID)
    doc_ref = db.collection("cloud-functions").document("avoid-duplicate-execution")
    doc_dict = doc_ref.get().to_dict()

    # 最初の実行なら重複確認はスキップ
    if not doc_dict:
        print("First execution.")
    # 重複実行であれば処理終了
    elif is_duplicate_execution(publish_time, doc_dict["publish_time"]):
        print("Duplicate execution.")
        return

    # Firestore更新用データ
    data = {
        "publish_time": publish_time,
        "message": base64.b64decode(pubsub_message["data"]).decode()
    }
    # Firestoreを更新
    doc_ref.set(data)

    # ========================================
    #
    # ここにCloud Functionsで実行したい処理を書く
    #
    # ========================================

    print("Function was executed.")

requirements.txt

functions-framework==3.*
google-cloud-firestore

前回のpublish_timeと今回のpublish_timeを比較し、時間差が60秒以内ならCloud Functionsの処理をスキップする仕様です。

期待通りの挙動をとるか検証してみます。

Cloud Scheduler実行

Cloud Schedulerを実行してみます。その後、コンソール上でCloud LoggingとFirestoreを確認していきます。

1回目の実行

Cloud Schedulerを実行します。1回目はFirestoreに過去の実行情報がありませんので、重複確認はしません。

今回の実行によりFirestoreへ書き込みが行われ、次回の実行から重複確認が行われるようになります。   Function was executed.がログに出力されていることから、Cloud Functionsで処理が実行されていることがわかります。

Firestoreに1回目の処理の情報が書き込まれていることもわかります。

前回の実行から約2分後に実行

前回の実行から約2分後にCloud Schedulerを再度実行します。
こちらは重複確認が行われ、前回の実行から60秒以上経過しているためCloud Functionsで処理が実行される想定です。

Function was executed.がログに出力されていることから、Cloud Functionsで処理が実行されていることがわかります。
こちらは重複確認も行われており、変数time_diffの出力結果から2分程度の時間差があることも確認できます。

Firestoreの情報も更新されています。

前回の実行から約30秒後に実行

約30秒後にCloud Schedulerを再度実行します。
前回の実行から60秒以内のため、Cloud Functionsは処理はスキップしてくれる想定です。

前回の実行との時間差が60秒以内のため、重複実行と判定しています。
ログにDuplicate execution.と出力されていること、またFunction was executed.と出力されていないことから、Cloud Functionsの処理がスキップされていることがわかります。

Cloud Schedulerを短時間に2回実行

実際の重複実行を想定したパターンとして、Cloud Schedulerを手動で短時間に2回実行してみます。

2回目の実行(オレンジ色の枠)の変数time_diffがとても小さい数値であることから前回の実行との時間差がほとんどないことがわかりますが、それでも重複実行と判定しCloud Functionsの処理をスキップしてくれています。

全て期待していた挙動です!以上で検証は終了です。

注意

今回、かなり短い時間差で2回Cloud Schedulerを起動した際も重複実行と判定してくれましたが、1回目と2回目の処理起動の時間差があまりに小さい場合は重複とみなされない可能性があると考えられます(Firestoreへほぼ同時にアクセスした場合)。
ただ、Firestoreが想定よりも早い処理時間でしたので、極端に小さい時間差でなければ問題はなさそうです。

また、今回はpublish_timeのみで重複実行かどうかを判別しているため、同時間帯にはCloud Functionsが複数回起動しません。
Cloud Schedulerのメッセージ内容でCloud Functionsの処理を分岐させたい場合などは、例えばメッセージ内容から抽出したパラメーターも重複かどうかの判断材料に加えることで対応できるかと思います。

最後に感想

このような設計をせずともべき等関数になっている方がもちろん良いと思いますが、どうしても難しい場合は本記事のような手法もありなのかなと思いました。
今回の実装については個人で考えたものですので、あくまでアイディアの一つとして読んでいただいた方々の参考になれば良いなと思います。

引用・参照まとめ