こんにちは!エノカワです。
Cloud Composer は、Apache Airflow で構築されたフルマネージドのワークフローオーケストレーションサービスです。
Apache Airflow のオープンソース プロジェクトを基に構築されており、Python プログラミング言語を使用して動作します。
下記エントリでは、Cloud Composer で Cloud Storage にあるファイルを鍵認証でSFTPサーバに転送するワークロードを紹介しました。
ワークロードの内容としては、秘密鍵を Secret Manager のシークレットに保存しておき、DAGファイル内でシークレットにアクセスして取得した秘密鍵で SFTPサーバへの接続ID 組み立てる、というものでした。
前回は Secret Manager API を使用してシークレットにアクセスしていましたが、今回は Secret Manager をバックエンドに構成して変数としてシークレットにアクセスする方法を試してみたのでご紹介します。
なお、[SFTPサーバをたてる]、[鍵認証の設定]、[Cloud Composer 環境を作成]の箇所は前回と同様となりますので、詳細については上記エントリを参照ください。
SFTPサーバをたてる
ファイル転送先のSFTPサーバをたてます。
前回と同様、test-sftp-server
という名前で、東京リージョン、最小のマシン構成を選択し、他はデフォルトのままで作成しました。
[SSH]リンクからインスタンスに接続し、sftp_user
ユーザーの追加とupload
フォルダの作成もしておきます。
転送ファイルをアップロード
SFTPサーバに転送するファイルを Cloud Storage にアップロードします。
今回はtest-gcs-sftp
バケットにfile_key_var.txt
をアップロードしました。
鍵認証の設定
SFTPサーバで鍵認証するための設定を行います。
前回と同様、SSH鍵ペアを生成して公開鍵をtest-sftp-server
インスタンスに登録します。
Secret Manager に 秘密鍵を保存
Cloud Composer から SFTP サーバにアクセスする際に使用する秘密鍵を Secret Manager に保存します。
前回と同様、秘密鍵を保管するシークレットとバージョンを作成し、Cloud Composer で使用するサービスアカウント(Compute Engine のデフォルトのサービスアカウント)に Secret Manager のシークレット アクセサーのロールを付与しておきます。
ただし、今回はシークレット名をairflow-variables-sftp-user-keyfile
にしています。
理由は、この後の[Secret Manager バックエンドを構成]の箇所で出てきます。
Cloud Composer 環境を作成
DAGを動かす Cloud Composer 環境を作成します。
前回と同様、test-composer
という名前で、東京リージョン、最新のイメージバージョンを選択し、サービスアカウントなど他はでデフォルトのままで作成しました。
今回もgcs_to_sftpオペレータを使用しますので、apache-airflow-providers-sftp
パッケージもインストールしておきまます。
Secret Manager バックエンドを構成
今回のポイントとなるオペレーションです。
下記ドキュメントを参考に Secret Manager バックエンドを構成します。
Cloud Composer 環境のAIRFLOW 構成のオーバーライド画面に移動し、[編集]ボタンをクリックします。
以下2つの構成オプションを追加し、[保存]ボタンをクリックします。
セクション | キー | 値 |
---|---|---|
secrets | backend | airflow.providers.google.cloud.secrets.secret_manager.CloudSecretManagerBackend |
secrets | backend_kwargs | {"variables_prefix": "airflow-variables"} |
Airflow 構成オプションがオーバーライドされました。
これで、Secret Manager バックエンドが有効になり、変数を取得するために読み取るシークレット名の接頭辞がairflow-variables
で構成されました。
先ほど作成したシークレットの名前をairflow-variables-sftp-user-keyfile
にしていたのは、接頭辞をairflow-variables
としていたからでした。
構成の確認
バックエンドが正しく構成できているか確認してみましょう。
変数がバックエンド シークレットから正しく読み取られていることを確認するには、以下のコマンドを実行します。
gcloud composer environments run test-composer \
--location asia-northeast1 \
variables -- get sftp-user-keyfile
読み取り対象のシークレット名はairflow-variables-sftp-user-keyfile
なのですが、airflow-variables
は接頭辞なので、それを除いた変数sftp-user-keyfile
で値を取得しています。
バックエンドが正しく構成できていれば、シークレットの内容(秘密鍵)が取得できるはずです。
DAG を作成
Cloud Composer 環境で実行する DAGを作成します。
前回のDAGとほぼ同じですが、Secret Manager から秘密鍵を取得する箇所を Secret Manager API ではなく変数から取得する形に変えています。
copy_file_from_gcs_to_sftp_key_var.py
import json
import os
from datetime import datetime
from airflow import DAG
from airflow.models import Connection
from airflow.models.variable import Variable
from airflow.providers.google.cloud.transfers.gcs_to_sftp import GCSToSFTPOperator
# Secret Managerから秘密鍵を取得
private_key = Variable.get('sftp-user-keyfile')
# Airflow接続を作成
extra_secret = {
"private_key": private_key,
}
conn = Connection(
conn_id="gcs_sftp",
conn_type="sftp",
host="test-sftp-server",
login="sftp_user",
port=22,
extra=json.dumps(extra_secret),
)
os.environ["AIRFLOW_CONN_GCS_SFTP"] = conn.get_uri()
with DAG(
dag_id="example_gcs_to_sftp_key_var",
start_date=datetime.now(),
schedule_interval=None,
) as dag:
copy_file_from_gcs_to_sftp = GCSToSFTPOperator(
task_id="file-copy-gsc-to-sftp_key_var",
sftp_conn_id="gcs_sftp",
source_bucket="test-gcs-sftp",
source_object="file_key_var.txt",
destination_path="/home/sftp/upload/",
)
DAGフォルダにファイルをアップロードしてDAGをデプロイします。
DAG を実行
デプロイしたDAGを実行してみましょう。
前回と同様、DAG詳細画面右上の[再生マーク]ボタンからTrigger DAGをクリックして、手動でDAGをトリガーします。
DAGが実行され、file-copy-gsc-to-sftp_key_var
タスクが緑色のsuccess
でマークされました。
転送ファイルを確認
SFTPサーバにファイルが転送されているか確認してみましょう。
test-sftp-server
インスタンスに接続して、upload
フォルダ配下をチェックします。
file_key_var.txt
ファイルが居てました。
ファイル転送成功です!
まとめ
以上、Cloud Composer のバックエンドに Secret Manager を構成してシークレットにアクセスする方法をご紹介しました。
バックエンドを構成することで変数としてシークレットにアクセスできるようになり、秘密鍵の取得処理がだいぶスッキリしました!
【前回】Secret Manager API でアクセス
# Secret Managerから秘密鍵を取得
client = secretmanager_v1beta1.SecretManagerServiceClient()
name = client.secret_version_path("{PROJECT_ID}", "sftp_user-key", "latest")
response = client.access_secret_version(name)
private_key = response.payload.data.decode("UTF-8")
※プロジェクトIDの箇所は{PROJECT_ID}
に置き換えています。
【今回】変数でアクセス
# Secret Managerから秘密鍵を取得
private_key = Variable.get('sftp-user-keyfile')
参考
- 環境に Secret Manager を構成する | Cloud Composer | Google Cloud
- Airflow 構成オプションをオーバーライドする | Cloud Composer | Google Cloud
- Airflow コマンドライン インターフェースにアクセスする | Cloud Composer | Google Cloud
- Google Cloud Secret Manager Backend — apache-airflow-providers-google Documentation
- airflow.providers.google.cloud.transfers.gcs_to_sftp — apache-airflow-providers-google Documentation
- How to Setup a sFTP Server in Google Cloud Platform and Restrict Access | by Rubens Zimbres | Medium
- シークレットの作成とアクセス | Secret Manager のドキュメント | Google Cloud
- IAM を使用したアクセス制御 | Secret Manager のドキュメント | Google Cloud
- Managing Connections — Airflow Documentation