Cloud Composer で Dataplex のデータリネージ統合を試してみた

Cloud Composer で Dataplex のデータリネージ統合を試してみた

Clock Icon2024.09.16

こんにちは!エノカワです。

Cloud Composer は、Apache Airflow で構築されたフルマネージドのワークフローオーケストレーションサービスです。
Apache Airflow のオープンソース プロジェクトを基に構築されており、Python プログラミング言語を使用して動作します。

試したこと

今回は、Cloud Composer で Dataplex のデータリネージ統合を試してみました。
下記のドキュメントに沿って実施してみましたので、その内容をご紹介します。

Data Lineage API を有効にする

まずは Dataplex のデータリネージ機能を利用するために、Data Lineage API を有効にします。
APIの有効化は Google Cloud コンソールの 「APIとサービス」 メニューから行います。

cloud-composer-dataplex_04

Data Lineage API を有効にすると、Dataplex は BigQuery、Cloud Data Fusion、Dataproc のデータの取り込みを自動的に開始します。

環境作成

次に、Cloud Composer 環境を作成します。
Google Cloud コンソールの「Cloud Composer の環境作成」ページから、test-composerという名前の環境を、東京リージョンで作成しました。
サービスアカウントなどその他の設定はデフォルトのままで作成しました。

cloud-composer-dataplex_01

cloud-composer-dataplex_02

Cloud Composer 環境でデータリネージ統合を有効にする

Cloud Composer 環境でデータリネージ統合を有効にするには、Composer の環境設定を開き、必要なオプションを設定します。

環境構成の 「Dataplex データリネージ統合」 を編集します。

cloud-composer-dataplex_03

「Dataplex データリネージとの統合を有効にする」 を選択して保存します。

cloud-composer-dataplex_05

環境の更新が行われ、しばらくすると環境構成の 「Dataplex データリネージ統合」 が有効になりました。

DAG を作成する

Cloud Composer 環境でデータリネージ統合を有効にして、サポートされているオペレーター を利用する DAG を実行すると、Cloud Composer が Data Lineage API にリネージ情報を報告します。

2024年9月時点でサポートされているオペレーターは下記のとおりです。

  • airflow.providers.google.cloud.operators.bigquery.BigQueryExecuteQueryOperator
  • airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator
  • airflow.providers.google.cloud.transfers.bigquery_to_bigquery.BigQueryToBigQueryOperator
  • airflow.contrib.operators.bigquery_to_gcs.BigQueryToCloudStorageOperator
  • airflow.providers.google.cloud.transfers.bigquery_to_gcs.BigQueryToGCSOperator
  • airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSToBigQueryOperator
  • airflow.contrib.operators.gcs_to_bq.GoogleCloudStorageToBigQueryOperator
  • airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator

今後も追加されていくと思われますので最新のサポート状況は下記ドキュメントを参照ください。

今回はGCSToBigQueryOperatorを使用してみました。
以下は、Google Cloud Storage (GCS) にあるファイルを BigQuery に取り込むための Airflow DAG の例です。

gcs_to_bigquery_lineage_dag.py
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator

PROJECT_ID = '{プロジェクトID}'
DATASET_ID = 'work'
TABLE_ID = 'sales_data'
BUCKET_NAME = 'cm_enokawa_work'

# DAGの基本設定
default_args = {
    'start_date': days_ago(1),
}

with DAG(
    dag_id='gcs_to_bigquery_lineage_dag',
    default_args=default_args,
    schedule_interval=None,  # 手動で実行する
    catchup=False
) as dag:

    # GCS から BigQuery へのデータ取り込み
    gcs_to_bq = GCSToBigQueryOperator(
        task_id='load_gcs_to_bq',
        bucket=BUCKET_NAME,  # GCS バケット名
        source_objects=['sales.csv'],  # GCS 内のファイルパス
        destination_project_dataset_table=f'{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}',  # BigQuery のデータセットとテーブル
        source_format='CSV',  # ソースファイル形式
        write_disposition='WRITE_TRUNCATE',  # 既存データの上書き
        skip_leading_rows=1  # CSV のヘッダー行をスキップ
    )

    gcs_to_bq

この DAG では、Google Cloud Storage に保存された CSV ファイル (sales.csv) を BigQuery のテーブル(sales_data)にインポートしています。
GCSToBigQueryOperatorによるリネージの追跡は自動的に行われるため、特に追加の操作は必要ありません。

DAG を実行する

Cloud Composer の DAG 実行画面から先ほど作成した DAG を手動実行します。

cloud-composer-dataplex_06

DAG の実行が完了し、load_gcs_tobqタスクが成功したことを確認しました。
これでデータが GCS から BigQuery に取り込まれました。

cloud-composer-dataplex_07

Dataplex の UI 上でリネージグラフを確認してみましょう。

リネージグラフ を確認する

Dataplex の UI からデータ取り込み先である BigQuery テーブル sales_data を検索します。

cloud-composer-dataplex_08

sales_data を選択して 「リネージ」 を表示します。
リネージグラフは以下のように表示され、GCS(sales.csv) から BigQuery(sales_data) へのデータの流れが視覚的に確認できました。

cloud-composer-dataplex_09

GCS から BigQuery へのフロー上にある Cloud Composer のアイコンをクリックすると、環境名や DAG ID や Task ID などの情報を確認することができます。

cloud-composer-dataplex_10

また、「COMPOSER で環境を開く」「COMPSER で DAG を開く」 から各画面を開くこともできます。

まとめ

以上、Cloud Composer で Dataplex を使ってデータリネージの統合を試してみました。

Dataplex のデータリネージ統合を有効にすることにより、データフローの可視化や追跡が非常に簡単にできました!

特に、サポートされているオペレーターを利用すれば、自動的にリネージ情報がキャプチャされるため、複雑なデータパイプラインの全体像を把握するのに有力な機能ではないでしょうか。

今回は自動リネージレポートでサポートされているオペレーターGCSToBigQueryOperatorを使用しました。
サポートされていないオペレーターでも カスタム リネージ イベント を送信することでリネージを報告することが可能のようですので、次回試してみたいと思います。

カスタム リネージ イベントを送信する

参考

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.