Cloud Storageにオブジェクトが置かれたらAmazon S3に転送するCloud Run FunctionsをPythonで書いた [Google Cloud --> AWS]

Cloud Storageにオブジェクトが置かれたらAmazon S3に転送するCloud Run FunctionsをPythonで書いた [Google Cloud --> AWS]

Google Cloud の Cloud Storage にオブジェクトが作られたことを検知して、それをトリガにAWSのAmazon S3に転送(push)する... ということをやってくれるCloud Run Functionsをつくりました
Clock Icon2024.09.09

Google Cloud の Cloud Storage につくったオブジェクトが自動的に AWS の Amazon S3 におかれて欲しい

そう思ったことはないですか? ぼくはあります(ここまで挨拶

そんなわけで、以下の動作をやってくれる Cloud Run Functions を作りました。

  • Cloud Storage にオブジェクトが作成されたことをトリガに起動
  • 該当のオブジェクトの内容を読み込み、予め指定されていた Amazon S3 上にオブジェクトを作成
  • AWS へのアクセスする際は、アクセスキー/シークレットキーによる認証を行う

ここで、

「このセキュリティが叫ばれる御時世、アクセスキーとシークレットキーとか」

という声も聞こえてきそうですが、それはそれで話が複雑になるので別途改めて解説することにして、今回はシンプルにこれで行きたいと思います。

「それ、マネージドサービスの〇〇でできるよ?」

という声も聞こえてきそうですが、とある事情でその方法は断念しました。後述します。

なお、ここに記載したコードは Claude 3.5 Sonnet 先生と Gemini 1.5 Pro 先生に聞きまくった内容を組み合わせて修正したものとなります。

構成

概ねこんな感じの構成になると思います。[1]

動作は単純で、Google Cloud 上の特定の Cloud Storage バケットにオブジェクトが作成されたら、それをトリガーに Cloud Run Functions を起動。
Cloud Run Functions は作成されたオブジェクトを読み込み、予め用意された IAM ユーザーのアクセスキー/シークレットキーを使って所定の Amazon S3 バケットにオブジェクトを書き込む、というものです。

他に必要なものとしては、Cloud Run Functions が動作するときの権限を規定するサービスアカウントと、払い出されたアクセスキー/シークレットキーを安全に保管する為の Secret Manager になります。

以下、順に見ていきます。

[AWS] Amazon S3 バケット (送信先)

まずは AWS 側から。最終的にオブジェクトが格納される S3 バケットを用意します。
既にある(そのバケットにオブジェクトを転送することがそもそもの目的である)場合が多い気がするので詳細は割愛しますが、試してみる場合は何か適当に作って下さい。

ここでは仮に destination_s3_bucket という名前(S3 バケット名)だと仮定します。

一応、これはお約束ですが、作る場合はパブリックアクセスにならないよう十分お気をつけください。

[AWS] 書き込み用 IAM ユーザー

destination_s3_bucket にオブジェクトを作成できるだけの権限をもつ IAM ユーザーを作ります。
アクセスキー/シークレットキーを払い出すので、最小権限でいきたいですね。今回の場合は s3:PutObject だけできれば最低限大丈夫です。

設定する IAM ポリシーはこんな感じになるかと思います:

{
    "Version": "2012-10-17",
    "Statement": [{
        "Sid": "VisualEditor0",
        "Effect": "Allow",
        "Action": "s3:PutObject",
        "Resource": "arn:aws:s3:::destination_s3_bucket/*"}]}

作成できたら、アクセスキーを作成しておいてください。

[Google Cloud] Secret Manager

ここからは Google Cloud 側の作業になります。

いま作ったアクセスキーとシークレットキーをそのままにしておきたくないので、さっさと格納しましょう。
名前はここでは aws_access_key_id / aws_secret_access_key としてます。

[Google Cloud] Cloud Storage バケット (送信元)

トリガーとなる Cloud Storage バケットも作ります。
もちろん既存のものでも問題ありませんが、基本的にはこのバケットにオブジェクトが作られたら全て後述の Cloud Functions が起動されると思ってください。

ここでは source_gcp_bucket という名前のバケットを作成したとします。

こちらも公開範囲などアクセス権限にはご注意ください(お約束)。

[Google Cloud] サービスアカウント

Cloud Run Functions ランタイム用のサービスアカウントを作ります。こちらに以下の権限をつけます。

  • Eventarc イベント受信者 roles/eventarc.eventReceiver
  • Secret Manager のシークレット アクセサー roles/secretmanager.secretAccessor
  • Storage オブジェクト閲覧者 roles/storage.objectViewer

ここでは sa-send-object という名前をつけました。
プリンシパルは E-mail アドレス形式の sa-send-object@<プロジェクト名>.iam.gserviceaccount.com となるかと思います。

[Google Cloud] Cloud Run Functions

お待たせしました。実際のコードは以下のようになります。

import functions_framework
import os
from google.cloud import storage
import boto3


@functions_framework.cloud_event
def main(cloud_event):

    # 送信先の設定
    destination_aws_bucket = 'destination_s3_bucket'

    # Secret Manager から AWS のクレデンシャルを取得
    destination_aws_access_key_id = os.environ.get('AWS_ACCESS_KEY_ID')
    destination_aws_secret_access_key = os.environ.get('AWS_SECRET_ACCESS_KEY')

    # 対象の Cloud Storage オブジェクト名を CloudEvent から取得
    event_data = cloud_event.data
    source_gcp_bucket = event_data["bucket"]
    object_name = event_data["name"]

    # オブジェクトの中身を Cloud Storage から取得
    gcs = storage.Client()
    source_bucket = gcs.bucket(source_gcp_bucket)
    source_blob = source_bucket.blob(object_name)
    object_content = source_blob.download_as_bytes()

    # 同じ内容・オブジェクト名で Amazon S3 上に put
    s3 = boto3.client(
        's3',
        aws_access_key_id=destination_aws_access_key_id,
        aws_secret_access_key=destination_aws_secret_access_key,
        region_name='ap-northeast-1')
    s3.put_object(
        Body=object_content,
        Bucket=destination_aws_bucket,
        Key=object_name)

    return ("Success.")


if __name__ == '__main__':
    main()
requirements.txt
functions-framework==3.*
cloudevents
google-cloud-storage
boto3

この内容で Cloud Run Functions を作成してください。要点は以下になります:

  • エントリ ポイント : main
  • トリガーについて
    • トリガータイプ : Cloud Storage
    • イベントタイプ : google.cloud.storage.object.v1.finalized
    • バケット : source_gcp_bucket
  • ランタイムサービスアカウント : sa-send-object@<プロジェクト名>.iam.gserviceaccount.com
  • シークレットの参照
    • シークレット : aws_access_key_id / aws_secret_access_key
    • 参照方法 : 環境変数として公開
    • 環境変数名 : AWS_ACCESS_KEY_ID / AWS_SECRET_ACCESS_KEY

あとは基本的にデフォルトで動くと思いますが、気になるところは変えてみてください。
対象とするオブジェクトのサイズによっては、割り当てメモリがデフォルトだと足りなくなるかも知れませんのでそこは注意事項かと思います。

ちなみにデプロイ用・ Eventarc 用のサービスアカウントは、今回作成した sa-send-object では権限が足りませんので、別途用意するかデフォルトのサービスアカウントを利用する感じになります。

動作確認

Cloud Storage バケット source_gcp_bucket に置いたオブジェクトが、同じ名前で Amazon S3 の destination_aws_bucket に作成されたら成功です!
実際の実行には、オブジェクトのサイズにもよりますけど数秒〜数十秒程度かと思います。

まとめと今後の展望

最初は「こんなものテンプレ構成やろ」と思ってたんですが、実際に探してみると意外と「これ!」というものが見つからず、だったらということでブログにしました。

実際に常用するにあたって、改変ポイントは以下のような感じでしょうか:

  • ハードコードしている送信先バケット名の扱い(環境変数で指定?)
  • 送信すべきオブジェクトのフィルタリングとバリデーション
  • 送信時の上書き判定など異常系処理の追加
  • IaC 化
  • アクセスキー/シークレットキーを使わない認証(Workload Identity Federation)

特に最後の項目は、本番運用に載せるにあたって気になるところと思いますので、 別途解説します こちらをご参照下さい。

https://dev.classmethod.jp/articles/202409-google-cloud-aws-workload-identity-and-assumerole-python/

余録:同じ事をやってくれるマネージドサービスについて

今回わざわざ Cloud Run Functions を書いたわけですが、実は Cloud Storage のオブジェクトを Amazon S3 に同期してくれる AWS のマネージドサービスがあります。


https://aws.amazon.com/jp/datasync/

こいつ(AWS DataSync)自体は、S3 を含む AWS 上のデータストアにデータを集めるための汎用的な機能で、その収集元(ソース)として Cloud Storage も使える、というものです。公式がブログを書いていたりもします。

やりたいことだけで言えばこれで出来るのですが、今回は採用を見送りました。その主な理由は以下です:

  1. やりたいこと(月に数回、せいぜい数十 kB × 数オブジェクトの転送)と比較して大きすぎる
  2. Pull(AWS 上の仕組みで Google Cloud から持ってくる)ではなく、Push(Google Cloud 上の仕組みで AWS に送信)したい

特に今回のケースは特に 2 番目がクリティカルで、AWS 上にはあまり手を加えられないという事情もあってこうなりました。

ちなみに Google Cloud も、似たようなマネージドサービスをもってたりします。


https://cloud.google.com/storage-transfer-service?hl=ja

しかし今回の用途としては致命的なことに、Amazon S3 は転送先として選択できません。

いろいろと事情は推測できますが、ともあれ今回の用途程度であれば、いくつかのクラウドリソースと数十行規模のコードを書くだけで実現できました。何かの参考になれば幸いです。

脚注
  1. 実際は Cloud Storage と Cloud Run 関数の間にトリガ情報を伝える Eventarc が存在しますが、説明上の都合でここでは割愛してます ↩︎

この記事をシェアする

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.