Cloud FunctionsでSFTPサーバーからGCSへファイル転送する際のメモリ使用量を調べてみた

2022.12.05

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

データアナリティクス事業本部のkobayashiです。

Cloud FunctionsでSFTPサーバーからGoogle Cloud Storage(移行GCS)へファイル転送を行う際にそれなりに大きなサイズのファイルを転送する事があり、その際のCloud Functionsのメモリ使用量が気になったので調べてみました。

Cloud FunctionsでSFTPサーバーからGCSへファイル転送する

Cloud FunctionsにてSFTPサーバーからGCSへファイル転送を行うには以下の弊社ブログエントリーに従って準備を行っておきます。ファイル転送を行うPythonコードに関しては2パターンで比較を行いたかったため多少変更してあります。こちらは後述します。

Cloud Functionsのメモリ使用量を調べてみる

では上記の環境を元にCloud Functionsでもメモリ使用量を調べてみます。方法は非常に単純で、50MB/100MB/250MB/500MB/1GBのファイルをSFTPサーバー上に用意してそれらをCloud FunctionsでGCSへ転送します。その際に使用したメモリはCloud MonitoringのCLOUD FUNCTIONS - MEMORY USAGEメトリクスから読み取るといった単純な方法になります。

Pythonスクリプトに関してはParamikoモジュールを使いSFTPサーバーからファイルを取得します。その際に、転送方法に関しては以下の2パターンを試します。

  • paramikoのgetofoを使ってファイルオブジェクトとしてソースファイルをメモリ上に展開してからGCSへ転送する(オンメモリでの転送)
  • paramikoのgetを使って一度Cloud Functionsのファイルシステムに書き込んだ後にGCSへ転送する(ファイル保存での転送)

この意図としてはCloud Functionsではファイルシステムに書き込む際に割り当てられたメモリを使う(Cloud Functions メモリとファイル システム より)のですが、それを含めてどの程度メモリを使うのかを確認したかったためです。

環境

  • Cloud Functions
    • gen2
    • trigger-http
    • memory=2GiB
    • runtime=python310
  • Python 3.10
    • pramiko 2.12.0

(参考)

Cloud Functionsを実行してみる

今回使うコードは以下になります。ベースは先にお伝えした通り「Cloud Functions で SFTP サーバから GCS にファイル転送してみた。 | DevelopersIO 」になります。

from google.cloud import secretmanager
from google.cloud import storage
import os
import paramiko
import io
import tempfile


def get_secret_version(project_id, secret_id, version_id="latest"):
    client = secretmanager.SecretManagerServiceClient()
    name = f"projects/{project_id}/secrets/{secret_id}/versions/{version_id}"
    response = client.access_secret_version(request={"name": name})
    payload = response.payload.data.decode("UTF-8")
    return payload


def put_blob(bucket_name, file_path, stream, content_type="text/plain"):
    client = storage.Client()
    bucket = client.get_bucket(bucket_name)
    blob = bucket.blob(file_path)
    blob.upload_from_string(stream, content_type=content_type)


def put_blob_file(bucket_name, file_path, filename):
    client = storage.Client()
    bucket = client.get_bucket(bucket_name)
    blob = bucket.blob(file_path)
    blob.upload_from_filename(filename)


# entry-point
def handler(request):
    try:
        GCP_PROJECT = os.getenv("GCP_PROJECT", "None")
        SECRET_ID_SFTP = os.getenv("SECRET_ID_SFTP", "None")
        SFTP_HOST = os.getenv("SFTP_HOST", "None")
        SFTP_PORT = os.getenv("SFTP_PORT", "None")
        SFTP_USER = os.getenv("SFTP_USER", "None")
        DST_BUCKET = os.getenv("DST_BUCKET", "None")
        if not GCP_PROJECT or not SECRET_ID_SFTP or not SFTP_HOST or not SFTP_USER:
            raise Exception("Missing value in env.")

        params = request.get_json()
        source_path = params.get("source_path")
        target_path = params.get("target_path")
        process_type = params.get("process_type")
        print(params)

        key_stream = get_secret_version(GCP_PROJECT, SECRET_ID_SFTP)
        private_key = paramiko.RSAKey.from_private_key(
            io.StringIO(key_stream.rstrip("\n"))
        )

        client = paramiko.SSHClient()
        client.set_missing_host_key_policy(paramiko.WarningPolicy())
        client.connect(
            SFTP_HOST, port=int(SFTP_PORT), username=SFTP_USER, pkey=private_key
        )
        sftp_connection = client.open_sftp()

        src_file = source_path.split("/")[-1]
        if process_type == "stream":
            obj = io.BytesIO()
            with obj as fp:
                sftp_connection.getfo(source_path, fp)
                stream = fp.getvalue().decode("UTF-8")
                put_blob(
                    DST_BUCKET,
                    target_path + src_file,
                    stream,
                    content_type="text/plain",
                )
        else:
            with tempfile.TemporaryDirectory(dir="/tmp/") as tmpdir:
                sftp_connection.get(source_path, f"{tmpdir}/{src_file}")
                put_blob_file(DST_BUCKET, target_path + src_file, f"{tmpdir}/{src_file}")
        return "OK"
    except Exception as e:
        raise

主な変更点としては、参照元のソースがイベントトリガーだったのをhttpトリガーにして、リクエスト時にソースパス、ターゲットパス、処理方法を与えられるようにしました。

処理方法はオンメモリで転送する(63-72行目)か一度ファイルとして保存してから転送する(74-76行目)で分けています。オンメモリで転送する方法はほぼ参照元のソースを使わせていただきました。

ではこのPythonスクリプトをCloud Functionsに展開してCloud Monitoringでメモリ使用量を調べてみます。

メモリ使用量と実行時間

メモリ使用量は以下のような条件で調べました。

  • 50MB-1GBまでのファイルをSFTPサーバーからGCSへCloud Functionsで転送する
  • Cloud MonitoringでCLOUD FUNCTIONS - MEMORY USAGEメトリクスの転送時における最大値を記録する
  • 参考としてCloud LoggingからCloud Functionsの実行時間を記録する
  • この計測を時間帯をずらして5回行いその平均値を計測値とする

SFTPサーバーのリソースやNW環境に左右されると思いますが、今回の計測結果は以下になりました。

  • メモリ使用量
オンメモリ ファイル保存
50MB 283MB 233MB
100MB 433MB 327MB
250MB 1056MB 588MB
500MB 1806MB 827MB
1GB Memory limit of 2048M exceeded with 2561M 1284MB
  • 処理時間
オンメモリ ファイル保存
50MB 9sec 9sec
100MB 20sec 21sec
250MB 43sec 45sec
500MB 98sec 106sec
1GB - 231sec

処理時間としてはオンメモリでの処理でもファイル保存からの転送でもほぼ差はありませんでしたが、メモリ使用量は大きな差が出ました。

オンメモリでの転送はCloud Functionsで使用されるメモリがファイルサイズの4倍程度使用されるのに対して、/tmp/へ一度ファイル保存してからGCSへの転送ではファイルサイズ+200〜300MB程度のメモリ使用量となりました。 詳しい原因は調べきれていませんが、オンメモリにファイルを展開しオンメモリからGCSへ書き込むためファイル保存に比べメモリ使用量が増大したのではないかと推測されます。これはまた時間があったら詳しく調査したいと思います。

まとめ

Cloud FunctionsでSFTPサーバーからGCSへファイル転送を行う際にCloud Functionsのメモリ使用量がどの程度か調べてみました。Cloud Functionsではファイルシステムとして割り当てられたメモリを使用しますが、メモリ少量を少なく抑えるためにはメモリに展開してからGCSに転送するのではなく/tmp/等に一度保存してから転送したほうが効率的かと思われます。

最後まで読んで頂いてありがとうございました。