Cloud Composer で GCS から SFTP サーバ にファイル転送してみた 〜パスワード認証編〜

2022.12.11

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

本記事はGCP(Google Cloud Platform) Advent Calendar 2022 11日目の記事です。

こんにちは!エノカワです。

Cloud Composer は、Apache Airflow で構築されたフルマネージドのワークフローオーケストレーションサービスです。
Apache Airflow のオープンソース プロジェクトを基に構築されており、Python プログラミング言語を使用して動作します。

今回は、Cloud Storage にあるファイルをSFTPサーバに転送するというワークロードを Cloud Composer で試してみたのでご紹介します。

下記エントリでは、同様のワークロードを Cloud Functions を使って実現した事例が紹介されていますので、こちらも是非ご参照ください!

SFTPサーバをたてる

ファイル転送先のSFTPサーバをたてます。

Google Cloud コンソールで[VM インスタンス]ページに移動し、インスタンスを作成します。
test-sftp-serverという名前で、東京リージョン、最小のマシン構成を選択し、他はデフォルトのままで作成しました。

インスタンス起動後、[SSH]リンクから接続します。

インスタンスに接続できました!

そのまま、SSH接続するためのセットアップを行います。

ユーザー追加

sftp_userユーザーを追加し、パスワードを設定します。  

sudo adduser sftp_user
sudo passwd sftp_user

グループ作成&ユーザー追加

restrictedグループを作成し、sftp_userユーザーを追加します。

sudo groupadd restricted
sudo usermod -g restricted sftp_user

フォルダ作成

ファイルをアップロードするための書き込みアクセス権を持つuploadフォルダを作成します。

sudo mkdir -p /home/sftp/upload
sudo chmod -R 777 /home/sftp/upload

SSH デーモン構成ファイル編集

SSH デーモン構成ファイルを下記のように編集します。

SSH デーモン構成ファイルを開く
sudo vi /etc/ssh/sshd_config
Subsystem sftp の値を変更
# Subsystem sftp /usr/lib/openssh/sftp-server
Subsystem sftp internal-sftp
ファイルの末尾に追加
Match group restricted
ChrootDirectory /home/sftp_user/
ForceCommand internal-sftp
AllowTcpForwarding no
X11Forwarding no

PasswordAuthentication yes
PermitRootLogin no

作成したuploadフォルダにはまだ何もありません。
これから作成するDAGでこのフォルダにファイル転送を試みます。

転送ファイルをアップロード

SFTPサーバに転送するファイルを Cloud Storage にアップロードします。

今回はtest-gcs-sftpバケットを作成し、file.txtをアップロードしました。

Cloud Composer 環境を作成

DAGを動かす Cloud Composer 環境を作成します。

Google Cloud コンソールで Cloud Composer の[環境の作成]ページに移動し、Cloud Composer 環境を作成します。

test-composerという名前で、東京リージョン、最新のイメージバージョンを選択し、サービスアカウントなど他はでデフォルトのままで作成しました。

20分ほどで Cloud Composer 環境が作成されました。

[Airflow]リンクから Airflow ウェブインターフェースにアクセスします。

Airflow接続 を作成

Airflow接続を使用すると、Cloud Composer 環境から Google Cloud プロジェクトのリソースにアクセスできます。

今回は Compute Engine インスタンスにアクセスするので、そのための Airflow接続 を作成します。

[Admin] - [Connections]をクリックします。

Airflow接続の管理画面が表示されますので、[+]ボタンをクリックします。

Airflow接続の編集画面が表示されますので、以下の値を入力します。

  • Connection Id
    gcs_sftp_conn
  • Connection Type
    SSH
  • Host
    test-sftp-server ※Compute Engine のインスタンス名
  • Username
    sftp_user ※作成したユーザー
  • Password
    (設定したパスワード)
  • Port
    22

[save]ボタンをクリックして、Airflow接続を保存します。

DAG を作成

Cloud Composer 環境で実行する DAGを作成します。

今回は Cloud Storage にあるファイルを SFTPサーバに転送したいので、お誂え向きのgcs_to_sftpオペレータを使用しました。

転送元にはtest-gcs-sftpバケットとアップロードしたfile.txtファイル、転送先にはtest-sftp-serverインスタンスに作成したuploadフォルダを指定しました。

sftp_conn_idには作成したAirflow接続IDgcs_sftp_connを指定しました。

copy_file_from_gcs_to_sftp.py

from airflow import DAG
from datetime import datetime
from airflow.providers.google.cloud.transfers.gcs_to_sftp import GCSToSFTPOperator

with DAG(
    dag_id="example_gcs_to_sftp",
    start_date=datetime.now(),
    schedule_interval=None,
) as dag:

    copy_file_from_gcs_to_sftp = GCSToSFTPOperator(
        task_id="file-copy-gsc-to-sftp",
        sftp_conn_id="gcs_sftp_conn",
        source_bucket="test-gcs-sftp",
        source_object="file.txt",
        destination_path="/sftp/upload",
    )

test-composer[DAG]リンクからDAGフォルダに移動します。

DAGファイルをアップロードしました。

これで、DAGがデプロイされるはずです。

SFTP パッケージをインストール

DAGファイルをアップロードしてからしばらくすると、コンソールにDAG の破損というエラーメッセージが表示されました。

どうやらgcs_to_sftpオペレータのインポートに失敗しているようです。
SFTPのパッケージが必要とのことなので、PyPIパッケージ画面から追加してみましょう。

[PyPI パッケージ]セクションのパッケージ名にapache-airflow-providers-sftpを入力します。

[保存]ボタンをクリックするとパッケージのインストールが始まります。

インストールが完了しました。

Airflow ウェブインターフェースから[DAGs]ページを確認すると、example_gcs_to_sftpが表示されていました。
エラーが解消され、無事にDAGのデプロイができたようです。

DAG を実行

デプロイしたDAGを実行してみましょう。

今回は手動でDAGをトリガーします。
[DAGs]ページのexample_gcs_to_sftpをクリックしてDAGの詳細画面に移動します。

画面右上の[再生マーク]ボタンからTrigger DAGをクリックします。

DAGが実行され、file-copy-gsc-to-sftpタスクが緑色のsuccessでマークされました。
正常終了したようです。

転送ファイルを確認

SFTPサーバにファイルが転送されているか確認してみましょう。

test-sftp-serverインスタンスに接続して、uploadフォルダ配下をチェックします。

file.txtファイルが居てました。
ファイル転送成功です!

まとめ

以上、Cloud Composer で GCS から SFTP サーバ にファイル転送してみました。

gcs_to_sftpオペレータを使用するだけというシンプルなDAGで実現することができました。
また、Airflow接続を使用することで、Cloud Composer 環境から インスタンス名で Compute Engine にアクセスできちゃいました。
Cloud Composer が他の Google Cloud プロダクトとの統合されていることによる強みだと感じました。

Google Cloud オペレータも充実していそうなので、他も試してみたいと思います!

参考