Cloud Composer で GCS から SFTP サーバ にファイル転送してみた 〜パスワード認証編〜
本記事はGCP(Google Cloud Platform) Advent Calendar 2022 11日目の記事です。
こんにちは!エノカワです。
Cloud Composer は、Apache Airflow で構築されたフルマネージドのワークフローオーケストレーションサービスです。
Apache Airflow のオープンソース プロジェクトを基に構築されており、Python プログラミング言語を使用して動作します。
今回は、Cloud Storage にあるファイルをSFTPサーバに転送するというワークロードを Cloud Composer で試してみたのでご紹介します。
下記エントリでは、同様のワークロードを Cloud Functions を使って実現した事例が紹介されていますので、こちらも是非ご参照ください!
SFTPサーバをたてる
ファイル転送先のSFTPサーバをたてます。
Google Cloud コンソールで[VM インスタンス]ページに移動し、インスタンスを作成します。
test-sftp-server
という名前で、東京リージョン、最小のマシン構成を選択し、他はデフォルトのままで作成しました。
インスタンス起動後、[SSH]リンクから接続します。
インスタンスに接続できました!
そのまま、SSH接続するためのセットアップを行います。
ユーザー追加
sftp_user
ユーザーを追加し、パスワードを設定します。
sudo adduser sftp_user sudo passwd sftp_user
グループ作成&ユーザー追加
restricted
グループを作成し、sftp_user
ユーザーを追加します。
sudo groupadd restricted sudo usermod -g restricted sftp_user
フォルダ作成
ファイルをアップロードするための書き込みアクセス権を持つupload
フォルダを作成します。
sudo mkdir -p /home/sftp/upload sudo chmod -R 777 /home/sftp/upload
SSH デーモン構成ファイル編集
SSH デーモン構成ファイルを下記のように編集します。
SSH デーモン構成ファイルを開く
sudo vi /etc/ssh/sshd_config
Subsystem sftp の値を変更
# Subsystem sftp /usr/lib/openssh/sftp-server Subsystem sftp internal-sftp
ファイルの末尾に追加
Match group restricted ChrootDirectory /home/sftp_user/ ForceCommand internal-sftp AllowTcpForwarding no X11Forwarding no PasswordAuthentication yes PermitRootLogin no
作成したupload
フォルダにはまだ何もありません。
これから作成するDAGでこのフォルダにファイル転送を試みます。
転送ファイルをアップロード
SFTPサーバに転送するファイルを Cloud Storage にアップロードします。
今回はtest-gcs-sftp
バケットを作成し、file.txt
をアップロードしました。
Cloud Composer 環境を作成
DAGを動かす Cloud Composer 環境を作成します。
Google Cloud コンソールで Cloud Composer の[環境の作成]ページに移動し、Cloud Composer 環境を作成します。
test-composer
という名前で、東京リージョン、最新のイメージバージョンを選択し、サービスアカウントなど他はでデフォルトのままで作成しました。
20分ほどで Cloud Composer 環境が作成されました。
[Airflow]リンクから Airflow ウェブインターフェースにアクセスします。
Airflow接続 を作成
Airflow接続を使用すると、Cloud Composer 環境から Google Cloud プロジェクトのリソースにアクセスできます。
今回は Compute Engine インスタンスにアクセスするので、そのための Airflow接続 を作成します。
[Admin] - [Connections]をクリックします。
Airflow接続の管理画面が表示されますので、[+]ボタンをクリックします。
Airflow接続の編集画面が表示されますので、以下の値を入力します。
- Connection Id
gcs_sftp_conn
- Connection Type
SSH
- Host
test-sftp-server
※Compute Engine のインスタンス名 - Username
sftp_user
※作成したユーザー - Password
(設定したパスワード)
- Port
22
[save]ボタンをクリックして、Airflow接続を保存します。
DAG を作成
Cloud Composer 環境で実行する DAGを作成します。
今回は Cloud Storage にあるファイルを SFTPサーバに転送したいので、お誂え向きのgcs_to_sftpオペレータを使用しました。
転送元にはtest-gcs-sftp
バケットとアップロードしたfile.txt
ファイル、転送先にはtest-sftp-server
インスタンスに作成したupload
フォルダを指定しました。
sftp_conn_id
には作成したAirflow接続IDgcs_sftp_conn
を指定しました。
from airflow import DAG from datetime import datetime from airflow.providers.google.cloud.transfers.gcs_to_sftp import GCSToSFTPOperator with DAG( dag_id="example_gcs_to_sftp", start_date=datetime.now(), schedule_interval=None, ) as dag: copy_file_from_gcs_to_sftp = GCSToSFTPOperator( task_id="file-copy-gsc-to-sftp", sftp_conn_id="gcs_sftp_conn", source_bucket="test-gcs-sftp", source_object="file.txt", destination_path="/sftp/upload", )
test-composer
の[DAG]リンクからDAGフォルダに移動します。
DAGファイルをアップロードしました。
これで、DAGがデプロイされるはずです。
SFTP パッケージをインストール
DAGファイルをアップロードしてからしばらくすると、コンソールにDAG の破損というエラーメッセージが表示されました。
どうやらgcs_to_sftp
オペレータのインポートに失敗しているようです。
SFTPのパッケージが必要とのことなので、PyPIパッケージ画面から追加してみましょう。
[PyPI パッケージ]セクションのパッケージ名にapache-airflow-providers-sftp
を入力します。
[保存]ボタンをクリックするとパッケージのインストールが始まります。
インストールが完了しました。
Airflow ウェブインターフェースから[DAGs]ページを確認すると、example_gcs_to_sftp
が表示されていました。
エラーが解消され、無事にDAGのデプロイができたようです。
DAG を実行
デプロイしたDAGを実行してみましょう。
今回は手動でDAGをトリガーします。
[DAGs]ページのexample_gcs_to_sftp
をクリックしてDAGの詳細画面に移動します。
画面右上の[再生マーク]ボタンからTrigger DAGをクリックします。
DAGが実行され、file-copy-gsc-to-sftp
タスクが緑色のsuccess
でマークされました。
正常終了したようです。
転送ファイルを確認
SFTPサーバにファイルが転送されているか確認してみましょう。
test-sftp-server
インスタンスに接続して、upload
フォルダ配下をチェックします。
file.txt
ファイルが居てました。
ファイル転送成功です!
まとめ
以上、Cloud Composer で GCS から SFTP サーバ にファイル転送してみました。
gcs_to_sftp
オペレータを使用するだけというシンプルなDAGで実現することができました。
また、Airflow接続を使用することで、Cloud Composer 環境から インスタンス名で Compute Engine にアクセスできちゃいました。
Cloud Composer が他の Google Cloud プロダクトとの統合されていることによる強みだと感じました。
Google Cloud オペレータも充実していそうなので、他も試してみたいと思います!
参考
- airflow.providers.google.cloud.transfers.gcs_to_sftp — apache-airflow-providers-google Documentation
- How to Setup a sFTP Server in Google Cloud Platform and Restrict Access | by Rubens Zimbres | Medium
- VM インスタンスの作成と開始 | Compute Engine ドキュメント | Google Cloud
- Cloud Composer の概要 | Google Cloud
- 環境の作成 | Cloud Composer | Google Cloud
- Airflow 接続の管理 | Cloud Composer | Google Cloud
- Python 依存関係をインストールする | Cloud Composer | Google Cloud
- DAG(ワークフロー)の作成 | Cloud Composer | Google Cloud
- DAG をトリガーする | Cloud Composer | Google Cloud