Cloud Storageトリガー で Cloud Functions 第2世代 を動かして GCS から SFTPサーバ にファイル転送してみた

Cloud Storageトリガー で Cloud Functions 第2世代 を動かして GCS から SFTPサーバ にファイル転送してみた

Clock Icon2023.02.11

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは!エノカワです。

Google Cloud Functions は、クラウド サービスの構築と接続に使用するサーバーレスのランタイム環境です。
Cloud Functions には、第1世代 と 第2世代 の2種類のプロダクトがありますが、第2世代は Cloud Run と Eventarc をベースにした新しいバージョンで機能が強化されています。

今回はCloud Functions 第2世代 を動かして GCS から SFTPサーバ にファイル転送を試してみたのでご紹介します。

鍵認証のSFTPサーバをたてる

Cloud Functions 関数を作成する前に ファイル転送先のSFTPサーバをたてます。

下記エントリで、Cloud Composer で Cloud Storage にあるファイルをSFTPサーバに転送するワークロードを紹介しました。

今回も同様の手順で鍵認証のSFTPサーバをたてます。
詳細は上記エントリの[SFTPサーバをたてる][鍵認証の設定][Secret Manager に 秘密鍵を保存]の箇所を参照ください。

VMインスタンス作成

  • test-sftp-serverという名前で、東京リージョン、最小のマシン構成を選択し、他はデフォルトのままで作成

フォルダ作成

  • [SSH]リンクからインスタンスに接続し、sftp_userユーザーの追加とuploadフォルダを作成

鍵認証の設定

  • Cloud Shell からssh-keygenコマンドを使用して、SSH鍵ペアを生成
  • test-sftp-serverインスタンスの編集画面に入り、[SSH 認証鍵]項目に公開鍵の内容を登録

Secret Manager に 秘密鍵を保存

  • 秘密鍵を保管するシークレットとバージョンを作成
  • Compute Engine のデフォルトのサービスアカウントに Secret Manager のシークレット アクセサー (roles/secretmanager.secretAccessor)のロールを付与

GCSバケットを作成

  • 転送ファイルをアップロードするGCSバケットtest-gcs-sftpを作成

Cloud Functions 関数を作成

準備が整ったので、Cloud Functions 関数を作成していきます。

Google Cloud コンソールで Cloud Functions の[関数の作成]ページに移動します。

環境第2世代を選択します。
トリガーCloud Storage トリガーを選択したいところですが、どのように設定するのでしょうか?

ドキュメントに 第2世代 でサポートされているトリガーについての記載がありました。

Cloud Functions(第 2 世代)のすべてのイベント ドリブン関数は、イベント配信に Eventarc トリガーを使用します。

Cloud Functions(第 2 世代)では、Pub/Sub トリガーと Cloud Storage トリガーは、Eventarc トリガーの一種として実装されます。

Cloud Functions(第 2 世代)でサポートされているトリガー

Cloud Storage トリガーは、Eventarc トリガーとして実装が必要とのことなので、[EVENTARC トリガーを追加]ボタンをクリックします。

Eventarc トリガーを追加

Eventarc トリガーの設定画面が表示されますので、以下の項目を選択します。

  • トリガーのタイプ
    Google のソース
  • イベント プロバイダ
    Cloud Storage
  • イベント
    google.cloud.storage.object.v1.finalized
  • バケット
    test-gcs-sftp

google.cloud.storage.object.v1.finalizedは、オブジェクトのファイナライズイベントです。
新しいオブジェクトが作成されるか、既存のオブジェクトが上書きされ、そのオブジェクトの新しい世代が作成されると送信されます。

Eventarc トリガーを追加するにあたって必要な権限が不足している場合は以下のようなメッセージが表示されます。

  • Cloud Pub/Sub でIDトークンを作成するには、Pub/Sub のサービスアカウントにroles/iam.serviceAccountTokenCreatorロールの付与が必要
  • Cloud Storage 経由でイベントを受け取るために、Cloud Storage のサービスアカウントにroles/pubsub.publisherロールの付与が必要

上記ロールが足りていないようなので、[付与]ボタンをクリックして必要なロールを付与します。

ロールが付与されたので、[トリガーを保存]ボタンをクリックします。

Eventarc トリガーが追加されました。

ランタイム環境変数を追加

関数内でSFTPサーバの接続情報を参照するため、ランタイム環境変数に追加しておきます。

[次へ]ボタンをクリックして次の画面に遷移します。

第2世代を使用するのに必要なAPIが有効化されていない場合、以下のような画面が表示されます。

[有効にする]ボタンをクリックして、有効化します。

関数をデプロイ

GCS から SFTPサーバ にファイル転送する関数を作成します。

今回は以下の内容でデプロイしました。

ランタイム

Python 3.10

エントリポイント

copy_file_from_gcs_to_sftp

ソースコード

import os
from io import StringIO

import functions_framework
import paramiko
from google.cloud import secretmanager, storage


def get_secret_version(project_id, secret_id, version_id="latest"):
    client = secretmanager.SecretManagerServiceClient()
    name = f"projects/{project_id}/secrets/{secret_id}/versions/{version_id}"
    response = client.access_secret_version(request={"name": name})
    payload = response.payload.data.decode("UTF-8")
    return payload


def get_fileblob(bucket_name, file_name):
    gcs_client = storage.Client()
    bucket = gcs_client.get_bucket(bucket_name)
    return bucket.blob(file_name)


def write_sftp_file(host, port, user, dst_path, bucket_name, file_name, private_key):
    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    client.connect(host, port=port, username=user, pkey=private_key, timeout=5.0)
    sftp_connection = client.open_sftp()
    file_path = f"{dst_path}{file_name}"
    with sftp_connection.file(file_path, "w") as target:
        fileblob = get_fileblob(bucket_name, file_name)
        fileblob.download_to_file(target)
    sftp_connection.close()
    client.close()


@functions_framework.cloud_event
def copy_file_from_gcs_to_sftp(cloud_event):
    PROJECT_ID = os.getenv("PROJECT_ID")
    SFTP_HOST = os.getenv("SFTP_HOST")
    SFTP_PORT = os.getenv("SFTP_PORT")
    SFTP_USER = os.getenv("SFTP_USER")
    SECRET_ID_SFTP = os.getenv("SECRET_ID_SFTP")
    DEST_PATH = os.getenv("DEST_PATH")

    data = cloud_event.data
    key_stream = get_secret_version(PROJECT_ID, SECRET_ID_SFTP)
    private_key = paramiko.RSAKey.from_private_key(StringIO(key_stream))

    write_sftp_file(SFTP_HOST, int(SFTP_PORT), SFTP_USER, DEST_PATH, data["bucket"], data["name"], private_key)
functions-framework==3.*
google-cloud-secret-manager>=2.12.0
google-cloud-storage>=2.4.0
paramiko>=2.8.0

[デプロイ]ボタンをクリックしてしばらくすると、デプロイされました。

Pub/Sub の[トピック]ページに移動すると、デプロイした関数のトピックが作成されていました。

転送ファイルをアップロード

では、Cloud Functions を動かしてみましょう!

SFTPサーバに転送するファイルを Cloud Storage にアップロードします。

今回はtest-gcs-sftpバケットにtrigger_file.txtをアップロードしました。

転送ファイルを確認

SFTPサーバにファイルが転送されているか確認してみましょう。

test-sftp-serverインスタンスに接続して、uploadフォルダ配下をチェックします。

trigger_file.txtファイルが居てました。
ファイル転送成功です!

まとめ

以上、Cloud Functions 第2世代 を動かして GCS から SFTPサーバ にファイル転送してみました。

Eventarc トリガーの追加や関数名に-が使用できないなど第1世代と異なる点はありましたが、デプロイに手間取ることはありませんでした。
今回はコンソール上で行ったのですが、ロールの追加やAPIの有効化など必要な操作をその場で確認しながら進めることができたのもその理由だと感じました。

第2世代では、Eventarc でサポートされているすべてのイベントタイプのトリガーをサポートしているそうなので、他も試してみたいと思います!

参考

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.