「Composer ローカル開発 CLI ツール」を試してみました

2023.01.20

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは。川田です。

今回は、 Google Cloud のサービスである Cloud Composer の DAG 開発を支援する、Composer ローカル開発 CLI ツール を試してみたので紹介いたします。

「Composer ローカル開発 CLI ツール」とは

Composer ローカル開発 CLI ツール とは、Cloud Composer 2 の Apache Airflow コンテナイメージを、ローカル上で起動してくれるツールです。

Composer ローカル開発 CLI ツールは、Airflow 環境をローカルで実行することにより、Cloud Composer 2 向けの Apache Airflow DAG 開発を合理化します。このローカルの Airflow 環境では、特定の Cloud Composer バージョンのイメージが使用されます。

Google Cloud - Composer ローカル開発 CLI ツールについて

ツールは、GitHub で公開されています。

GitHub - GoogleCloudPlatform/composer-local-dev

環境

  • composer-local-dev version 0.9.0

試したこと

  • Composer ローカル開発 CLI ツール による Airflow コンテナの起動
  • 簡単な DAG の実行

手順

事前準備(1)Docker プロセスの起動を確認

前述の通りコンテナイメージが起動することになるため、ローカル上で Docker のプロセスが起動していることを確認ください。

事前準備(2)gcloud CLI に認証情報を取得

Composer ローカル開発 CLI ツール 内の処理や、コンテナ上 Airflow 内 DAG からの Google API アクセスは、gcloud CLI に登録されている認証情報を利用するとのことです。

Composer ローカル開発 CLI ツールと DAG によって行われる API 呼び出しはすべて、gcloud CLI で使用するアカウントから実行されます。たとえば、ローカルの Airflow 環境の DAG が Cloud Storage バケットのコンテンツを読み取る場合、このアカウントにはバケットにアクセスするための権限が必要です。この点は、環境のサービス アカウントが呼び出しを行う Cloud Composer 環境とは異なります。

Google Cloud - 認証情報の構成

そのため、認証情報をまだ取得していない場合には取得しておきます。

$ gcloud auth application-default login
$ gcloud auth login

なお、現在のツールのバージョン(0.9.0)ですと、コンテナ上のプロセスがローカル上認証情報のファイル ~/.config/gcloud/application_default_credentials.json をパーミッションの都合で読み取れず、Airflow 上で Google Cloud へのアクセスが permission denied となります。下記 Issue でコメントされているものです。

GitHub - When running on Linux, can't read application_default_credentials.json to access gcloud resources

Issue 内に記載してくれている Workaround の通り、すべてのユーザーにファイル読み取り権限を与えることで解消します。

$ chmod +r ~/.config/gcloud/application_default_credentials.json
$ ls -l ~/.config/gcloud/application_default_credentials.json
-rw-r--r-- 1 zunda zunda 335 Jan 24 14:56 /home/zunda/.config/gcloud/application_default_credentials.json

事前準備(3)作業用の環境を作成

作業用のディレクトリを作成した後、ローカル開発環境を汚さないため、pyenv 経由にて Python をインストールし、venv にて Python 仮想環境を作成します。

$ # 作業用のディレクトリ(今回は cloudcomposer という名前)を作成して移動
$ mkdir ./cloudcomposer
$ cd ./cloudcomposer

$ # pyenv経由でインストール
$ pyenv install 3.8.12
$ pyenv local 3.8.12

$ # Python 仮想環境の作成
$ python -m venv .venv
$ source .venv/bin/activate

今回インストールしている Python のバージョン 3.8.12 は、今回使う Airflow コンテナイメージで利用している Python バージョンになります。各イメージで利用されている Python バージョンは以下ページにて確認でき、今回使う Airflow コンテナイメージは、バージョン composer-2.1.3-airflow-2.3.4 となります。

Google Cloud - Cloud Composer version list ※最新情報は英語ページにあるため注意

「Composer ローカル開発 CLI ツール」をインストール

作業用ディレクトリにて、Composer ローカル開発 CLI ツール を git clone した後、ダウンロードされたディレクトリに移動、ツールで必要となる Python パッケージをインストールします。

(.venv) $ git clone https://github.com/GoogleCloudPlatform/composer-local-dev.git
(.venv) $ cd ./composer-local-dev
(.venv) $ pip install .

Compoer イメージバージョンを指定して Airflow 環境の作成

作成できる環境のバージョンは、以下の composer-dev list-available-versions コマンドにて確認できます。今回は、現時点で最も新しい composer-2.1.3-airflow-2.3.4 を利用します。

(.venv) $ composer-dev list-available-versions --include-past-releases --limit 10

Image version │ Release Date
╶───────────────────────────────┼──────────────╴
composer-2.1.3-airflow-2.3.4 │ 16/01/2023
composer-2.1.3-airflow-2.2.5 │ 16/01/2023
composer-2.1.2-airflow-2.3.4 │ 19/12/2022
composer-2.1.2-airflow-2.2.5 │ 19/12/2022
composer-2.1.1-airflow-2.3.4 │ 12/12/2022
composer-2.1.1-airflow-2.2.5 │ 12/12/2022
composer-2.1.0-airflow-2.3.4 │ 07/12/2022
composer-2.1.0-airflow-2.2.5 │ 07/12/2022
composer-2.0.32-airflow-2.3.4 │ 21/11/2022
composer-2.0.32-airflow-2.2.5 │ 21/11/2022

ディレクトリを 1 階層戻り、環境を作成するコマンド composer-dev create を実行します。コマンドを実行したディレクトリで、Airflow の DAG 配置ディレクトリを含めたディレクトリ・ファイルが作成されます。

(.venv) $ # 作業ディレクトリのルートに戻ってから実行する
(.venv) $ cd ../

(.venv) $ # 利用するイメージのバージョンと作成する環境名を指定する
(.venv) $ composer-dev create --from-image-version composer-2.1.3-airflow-2.3.4 [作成する環境名を設定]

(.venv) $ # 下記のように、ディレクトリ・ファイルが作成されている
(.venv) $ tree ./composer
./composer
└── [環境名]
    ├── airflow.db
    ├── config.json
    ├── dags
    ├── data
    ├── plugins
    ├── requirements.txt
    └── variables.env

Airflow コンテナの起動

以下の composer-dev start コマンドを実行すると、該当コンテナイメージが pull され、コンテナが起動します。

(.venv) $ composer-dev start [環境名]
Starting sample composer environment...
2023-01-19T05:21:38.006061957Z + sudo chown airflow:airflow airflow

...

2023-01-19T05:21:50.971678551Z DAG_PROCESSOR_MANAGER_LOG:[2023-01-19 05:21:50,971] {manager.py:482} INFO - Checking for new files in /home/airflow/airflow/dags every 10 seconds
2023-01-19T05:21:50.971817481Z DAG_PROCESSOR_MANAGER_LOG:[2023-01-19 05:21:50,971] {manager.py:709} INFO - Searching for files in /home/airflow/airflow/dags

Started environ_name environment.

1. You can put your DAGs in /home/user_name/cloudcomposer/composer/environ_name/dags
2. Access Airflow at http://localhost:8080

Airflow UI にアクセス

http://localhost:8080 の URL にて、Airflow UI にアクセスできます。ログイン認証を必要とされず、Admin のロールにてログインされます。

以上にて、Cloud Composer 2 の Apache Airflow 環境を作成できました。実行したコマンドの詳細、その他コマンドに関しては GitHub のページが詳しいです。

GitHub - GoogleCloudPlatform/composer-local-dev

登録する DAG ファイルを配置

作成した Airflow に、簡単な DAG を登録してみます。以下の、インストール済み pandas バージョンをプリントする DAG ファイルを、上記で作成された ./composer/[環境名]/dags ディレクトリに配置します。

import datetime

from airflow import models
from airflow.decorators import task

YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    'owner': 'sample',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': YESTERDAY,
}

with models.DAG(
        'print_pandas_version',
        catchup=False,
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1)) as dag:

    @task(task_id="print_pandas_version")
    def print_pandas_version():
        import pandas
        print(f"pandas version: {pandas.__version__}")

    print_pandas_version()

Cloud Composer のイメージには、Apache Airflow の他に様々なパッケージが事前にインストールされています。今回利用した composer-2.1.3-airflow-2.3.4 のイメージであれば、バージョン 1.4.3 の pandas がインストールされています。そこで、実行された DAG のログを確認し、想定通りのバージョンが出力されているか確認してみたいと思います。

事前にインストールされているパッケージも、Cloud Composer version list のページにて確認できます。

Google Cloud - Cloud Composer version list ※最新情報は英語ページにあるため注意

登録された DAG の実行

DAG ファイルを配置した後、Airflow UI の画面を更新すれば、DAG が登録されているはずです。Unpause を行い、DAG を実行させます。

実行された DAG のログを確認

DAG 実行結果のログ画面を確認します。pandas のバージョンは、1.4.3 と出力されています。想定の通りですね。

その他補足

Composer ローカル開発 CLI ツール で利用する Python パッケージ内には、Airflow は含まれていませんでした。上記作業内pip install . コマンドを実行した後、ローカル Python 仮想環境で requirements.txt を取ってみたところ、インストールされていたものは下記でした。

cachetools==4.2.4
certifi==2022.12.7
charset-normalizer==3.0.1
click==8.1.3
commonmark==0.9.1
composer-dev @ file:///home/user_name/cloudcomposer/composer-local-dev
docker==6.0.1
google-api-core==1.34.0
google-auth==1.30.2
google-cloud-artifact-registry==1.6.0
google-cloud-orchestration-airflow==1.6.0
googleapis-common-protos==1.58.0
grpc-google-iam-v1==0.12.6
grpcio==1.51.1
grpcio-status==1.48.2
idna==3.4
packaging==23.0
proto-plus==1.22.2
protobuf==3.20.3
pyasn1==0.4.8
pyasn1-modules==0.2.8
Pygments==2.14.0
requests==2.28.2
rich==13.1.0
rich-click==1.4
rsa==4.9
six==1.16.0
typing_extensions==4.4.0
urllib3==1.26.14
websocket-client==1.4.2

ローカルの Python 仮想環境には Airflow がいないため、IDE で DAG ファイルを編集しようとすると、警告メッセージを貰います。以下は、僕の Visual Studio Code の画面です。

そこで、Python 仮想環境に composer-2.1.3-airflow-2.3.4 イメージで利用されている Airflow のバージョンを入れてしまいます。併せて pandas もインストールします。

(.venv) $ # Airflowのインストール
(.venv) $ AIRFLOW_VERSION=2.3.4
(.venv) $ PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
(.venv) $ CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
(.venv) $ pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

(.venv) $ # 併せてpandasもインストール
(.venv) $ pip install "pandas==1.4.3"

Apache Airflow - Installation from PyPI

警告が消えてくれました。


以上となります。より便利な使い方を見つけることできたら、また紹介したいと思います。