Cloud Composer で Dataplex のデータリネージ統合を試してみた 〜カスタム リネージ イベント編〜
この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
こんにちは!エノカワです。
Cloud Composer は、Apache Airflow で構築されたフルマネージドのワークフローオーケストレーションサービスです。
Apache Airflow のオープンソース プロジェクトを基に構築されており、Python プログラミング言語を使用して動作します。
前回試したCloud Composer と Dataplex を使ったデータリネージの統合に引き続き、カスタム リネージ イベントを試してみました。
試したこと
Cloud Composer の標準オペレーターでは、自動でリネージが記録されますが、カスタム リネージ イベントを使うことで、標準オペレーター以外の操作(例:CLI コマンドや API 呼び出し)でもデータフローを追跡できます。
前回試した環境をベースにしていますので、詳細は下記のエントリもご参照ください。
※以降、下記エントリの内容は 前回 という文脈で引用します。
環境作成
今回は、前回と同じ設定で環境を構築しています。
具体的には、test-composer という名前の環境を東京リージョンで作成しています。
また、前回と同様に Dataplex データリネージ統合 も有効化されています。

DAG を作成する
前回は自動リネージ レポートでサポートされているオペレーターGCSToBigQueryOperatorを使用したので、自動でリネージが記録されました。
自動リネージ レポートでサポートされていないオペレーターのリネージを報告する場合は、カスタム リネージ イベントを送信します。
具体的には、下記ドキュメントに記載のとおり、BashOperatorまたはPythonOperatorのパラメータを変更します。
BashOperator、タスク定義のinletsパラメータまたはoutletsパラメータを変更します。PythonOperator。タスク定義のtask.inletsパラメータまたはtask.outletsパラメータを変更します。inletsパラメータにAUTOを使用すると、値はアップストリーム タスクのoutletsと同じに設定されます。
inletsパラメータに入力エンティティ、outletsパラメータに出力エンティティを定義するイメージです。
今回は、BashOperator を使用して GCS バケット間でファイルをコピーするタスクを追加しました。
このタスクには、カスタム リネージ イベントを設定しており、データフロー全体が Dataplex で追跡できるようにしています。
以下は、前回作成した DAG を拡張し、GCS から BigQuery へのデータ転送に加えて、GCS バケット間のファイルコピー操作も追加した例です。
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.composer.data_lineage.entities import GCSEntity
PROJECT_ID = '{プロジェクトID}'
DATASET_ID = 'work'
TABLE_ID = 'sales_data'
BUCKET_NAME = 'cm_enokawa_work'
SOURCE_BUCKET_NAME = 'cm_enokawa_work_source'
FILE_NAME = 'sales.csv'
# DAGの基本設定
default_args = {
'start_date': days_ago(1),
}
with DAG(
dag_id='gcs_to_bigquery_custom_lineage_dag',
default_args=default_args,
schedule_interval=None, # 手動で実行する
catchup=False
) as dag:
# BashOperator を使って GCS上のファイルをコピー
copy_gcs_file_task = BashOperator(
task_id='copy_gcs_file',
bash_command=f'gcloud storage cp gs://{SOURCE_BUCKET_NAME}/{FILE_NAME} gs://{BUCKET_NAME}/{FILE_NAME}',
inlets=[GCSEntity(
bucket=f'{SOURCE_BUCKET_NAME}', # 入力元のGCSバケット名
path=f'{FILE_NAME}'
)],
outlets=[GCSEntity(
bucket=f'{BUCKET_NAME}', # 出力先のGCSバケット名
path=f'{FILE_NAME}'
)]
)
# GCS から BigQuery へのデータ取り込み
gcs_to_bq = GCSToBigQueryOperator(
task_id='load_gcs_to_bq',
bucket=BUCKET_NAME, # GCS バケット名
source_objects=[f'{FILE_NAME}'], # GCS 内のファイルパス
destination_project_dataset_table=f'{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}', # BigQuery のデータセットとテーブル
source_format='CSV', # ソースファイル形式
write_disposition='WRITE_TRUNCATE', # 既存データの上書き
skip_leading_rows=1 # CSV のヘッダー行をスキップ
)
copy_gcs_file_task >> gcs_to_bq
この DAG では、GCS バケット間のファイルコピー操作と BigQuery へのデータ取り込みを行い、カスタム リネージ イベントを使ってそのデータフローを追跡しています。
DAG を実行する
Cloud Composer の DAG 実行画面から先ほど作成した DAG を手動実行します。

DAG の実行が完了し、タスクが成功したことを確認しました。
copy_gcs_file_taskタスクにより、カスタム リネージ イベントが送信され、データリネージが記録されているはずです。

Dataplex の UI 上でリネージグラフを確認してみましょう。
リネージグラフ を確認する
Dataplex の UI からデータ取り込み先である BigQuery テーブルsales_dataを検索します。

sales_dataを選択して 「リネージ」 を表示します。
リネージグラフは以下のように表示され、GCS(sales.csv) から BigQuery(sales_data) へのデータの流れを視覚的に確認できます。
前回試したときと同様のフローですが、GCS から BigQuery へのフロー上にある Cloud Composer のアイコンが増えています。
アイコンをクリックすると、先ほど実行したDAGgcs_to_bigquery_custom_lineage_dagのフローだということが分かります。

今回追加した GCSバケット間のコピー のフローが見当たりませんね。
GCS(sales.csv)のアイコンの 「+」マーク をクリックします。

すると、GCS(sales.csv)の上流にフローが展開されました。
ファイル名がsales.csvと同じなので分かりづらいですが、今回追加した GCSバケット間のコピー のフローです。

GCSアイコン間のフロー上にある Cloud Composer のアイコンをクリックすると、BashOperatorで定義したタスクcopy_gcs_fileであることが分かります。

カスタム リネージ イベントの送信により、リネージが記録されていることが確認できました!
まとめ
以上、Cloud Composer で Dataplex のカスタム リネージ イベントを試してみました。
カスタム リネージ イベントを使用することで、標準オペレーターではサポートされていない操作に対しても、データフローを追跡できることが確認できました。
今回は GCS バケット間のファイルコピーを例にエンティティにGCSEntityを使用しましたが、BigQuery の場合はBigQueryTableにより定義できます。
from airflow.composer.data_lineage.entities import BigQueryTable
…
bash_task = BashOperator(
task_id='bash_task',
dag=dag,
bash_command='sleep 0',
inlets=[BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table1',
)],
outlets=[BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table2',
)]
)
また、Airflow 内でカスタム リネージ イベントを作成するために使用できる汎用的なエンティティとしてDataLineageEntityがあります。
GCSEntityやBigQueryTableといった特定のリソース(GCS ファイルや BigQuery テーブル)を扱うエンティティに対して、DataLineageEntityは、任意のリソースやプロセスのデータリネージを追跡したい場合に使用されます。
たとえば、GCS や BigQuery 以外のサービス(API 呼び出しによるデータのやり取りなど)でのリネージを記録したい場合に便利です。
機会があれば、他のユースケースについても試してみたいと思います。






