Cloud Composer で Airflow 環境を立ち上げて、GCS から BigQuery へのデータロードを実行してみた

2020.11.27

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは、みかみです。

OSS の ワークフローエンジン Airflow は、特に Python 実装の環境では利用されることも多いのではないかと思いますが、Airflow の実行環境をゼロから作成しようとすると少し手間がかかります。

Google Cloud Composer を使うとマネージドな Airflow 環境がすぐに利用可能とのことなので、ためしてみました。

やりたいこと

  • Cloud Composer をさわってみたい
  • Cloud Composer は通常の Airflow と同じような使い勝手かどうか確認したい
  • Cloud Composer で、GCS から BigQuery にデータロードするジョブを実行したい

前提

課金が有効になったプロジェクトは作成済みです。

Airflow 環境を作成

GCP 管理コンソールナビゲーションメニューから「Composer」を選択し、Cloud Composer の管理画面に遷移します。

初回アクセス時のみ、API 設定画面が表示されるので、Cloud Composer API を有効にします。

「環境を作成」リンクをクリックし、任意の名前を入力して Airflow 環境を作成します。 リージョンは東京を選択し、Python バージョンは 3 を選択しました。

「作成」ボタンをクリックすると Airflow 環境の構築が始まり、20分ほどで環境ができあがります。

「Airflow サーバー」欄のリンクから、Airflow の管理画面が表示できます。

通常の Airflow と全く同じ管理画面が無事表示されました。

DAG を登録して実行

Airflow ではジョブの処理内容を DAG として定義するため、処理を記載した DAG ファイルを作成して Airflow に登録します。

BigQuery のデータセットを作成した後、GCS のファイルを BigQuery にロードする DAG ファイルを作成しました。

DAG ファイルの書き方は、通常の Airflow と同じです。

Airflow の BashOperatorbq コマンドを実行して、データセットの作成と GCS からのデータロードを実行します。

import airflow
from airflow import models
from airflow.operators import bash_operator

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
}
dag = models.DAG(
    dag_id='test_composer_gcs2bq_cli',
    default_args=args,
)

# create dataset
create_dataset = bash_operator.BashOperator(
    task_id='create_dataset',
    bash_command='bq mk test_composer_cli',
    dag=dag
)

# load csv data from gcs to bigquery
load_csv = bash_operator.BashOperator(
    task_id='load_csv',
    bash_command='bq load --autodetect --source_format=CSV test_composer_cli.load_csv gs://test-mikami/pref_code.csv',
    dag=dag
)

create_dataset >> load_csv

GCS の管理画面を開くと Cloud Composer が作成してくれたバケットがあるので、dags 配下に作成した DAG ファイルをアップロードします。

少し待ってから Airflow の管理画面を更新すると、アップロードした DAG が追加&実行されていることが確認できました。

BigQuery 管理画面からも、データセットとテーブルが新規作成され、データがロードされていることが確認できました。

同じ処理を別の Operator で実装

Airflow には各種 Operator があり、BigQuery や GCS に関する操作を実行するインターフェースも用意されています。

Cloud Composer クイックスタートのサンプル DAG でも BashOperator を使用していましたが、他の Operator も問題なく使用できるのか確認してみます。

先ほどと同じくデータセットを新規作成して GCS ファイルデータを BigQuery にロードする処理を、BigQueryCreateEmptyDatasetOperatorGoogleCloudStorageToBigQueryOperator を使用して実行してみます。

以下の DAG ファイルを新しく作成し、アップロードしました。

import airflow
from airflow import models
from airflow.contrib.operators import bigquery_operator
from airflow.contrib.operators import gcs_to_bq

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
}
dag = models.DAG(
    dag_id='test_composer_gcs2bq',
    default_args=args,
)

# create dataset
create_dataset = bigquery_operator.BigQueryCreateEmptyDatasetOperator(
    task_id='create_dataset',
    dataset_id='test_composer',
    dag=dag
)

# load csv data from gcs to bigquery
load_csv = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
    task_id='load_csv',
    bucket='test-mikami',
    source_objects=['pref_code.csv'],
    destination_project_dataset_table='test_composer.load_csv',
    write_disposition='WRITE_TRUNCATE',
    autodetect=True,
    dag=dag
)

create_dataset >> load_csv

DAG の追加と実行が完了したことを確認して、

BigQuery のデータを確認してみます。

GoogleCloudStorageToBigQueryOperatorskip_leading_rows パラメータを指定していなかったためヘッダ行もテーブルにロードされてしまいましたが、BashOperator 以外の Operator も問題なく利用できることが確認できました。

まとめ(所感)

OSS なのに多機能な Airflow はバッチ処理の管理に便利ですが、ウェブサーバー、ワーカー、スケジューラー、ジョブキューなどから構成される Airflow の環境をスケーリング含めて自前で構築しようとすると少し手間がかかります。

その点、Cloud Composer を使えば、本当にボタン一つで Airflow 環境を自動で作成してくれるので非常に楽でした。 DAG ファイルも GCS にアップロードするだけで追加でき、使い心地は通常の Airfow と変わりません。

マネージドな分スケールアウト時などに自前の環境よりも多少コストがかかるケースがあるかもしれませんが、 環境のメンテナンスコストを考えれば、DAG の作成に注力できる Cloud Composer は有用なサービスだと思います。

Airflow を全く触ったことがない場合には DAG の書き方など少し調べる必要があると思いますが、 既に Airflow を使っていて環境の拡張を考えている場合や、Airflow がどんなものなのか試しに触ってみたいというような場合には、 Cloud Composer が非常に便利ではないかと思いました。

参考