Data Validation Tool (DVT) で、BigQuery と Redshift のデータが一致してるか検証してみた。
こんにちは、みかみです。
最近、大好きな伏見稲荷大社に観光行ったのですが、荘厳に立ち並ぶ鳥居が続くあの道の最後にすら辿り着けず、途中でリタイアしてきました。。(昔は山まで登ったんだけどなぁ。。体力つけねばw
やりたいこと
- BigQuery と Redshift のデータを検証したい
- BigQuery と Redshift のテーブルデータ件数を比較したい
- BigQuery と Redshift のテーブルのサンプリングデータが一致しているか確認したい
Data Validation Tool とは
BigQuery & Redshift に限らず、Cloud SQL や Teradata、Snowflak、ファイルシステム(GCS、S3 など)にも対応している、データ検証ツールです。 テーブルレベルでの行数や列単位の集計値の比較などはもちろん、カラムレベルでデータ型の比較や、カスタムクエリの実行なども可能です。
- Introducing the Data Validation Tool for EDW migrations | Google Cloud ブログ
- Data Validation Tool | GitHub
オープンソースですが、Google Cloud 公式ドキュメントでも紹介されています。
- BigQuery Migration Service の概要 | BigQuery ドキュメント
- Overview: Migrate data warehouses to BigQuery | BigQuery ドキュメント
前提
Google Cloud SDK(gcloud
コマンド)の実行環境は準備済みであるものとします。
本エントリでは、Cloud Shell を使用ました。
また、作業環境に Data Validation Tool のリポジトリを clone 済みです。
検証用データを準備
BigQuery の dataset_1
データセットに、以下のテーブルを準備しました。
Redshift にも同様に dataset_1
という名前のスキーマを作成し、validate_sample
テーブルに以下のデータを格納しました。
dev=# set search_path to dataset_1; SET dev=# select * from validate_sample; col_1 | col_2 | col_3 -------+--------+------- 1 | value1 | aaa 2 | value2 | bbb 3 | value3 | ccc 4 | value4 | ddd (4 rows)
レコード件数は一致していますが、col_1 = 4
のレコードの col_3
カラムの値に差異があるテーブルデータです。
Cloud Run 環境を構築
ローカル環境や GCE などの Compute Instance に Data Validation Tool をインストールしてコマンドラインから実行することもできますが、
今回はお手軽に Cloud Run で Data Validation Tool の実行環境を構築します。
上記ガイド「Quick Steps」に記載の通り、環境変数に PROJECT_ID
を設定して deploy.sh
を実行するだけで環境構築可能ですが、今回は Redshift(AWS)側の接続元 IP 制限をかけたいため、 Cloud Run に固定 IP を付与します。
また、権限管理のために 、Cloud Run 実行用のサービスアカウントを新規作成&設定します。
clone 済みの Data Validation Tool リポジトリの、Cloud Run 用サンプルコードのディレクトリに移動します。
cd professional-services-data-validator/samples/run/
deploy.sh
を以下に書き換えて実行します。
#!/bin/bash export PROJECT_ID=$DEVSHELL_PROJECT_ID export NETWORK_NAME=vpc-data-validation export SUBNET_NAME=sb-data-validation export FIREWALL_NAME=data-validation-allow-redshift export FIREWALL_RULES=tcp:5439 export FIREWALL_SOURCE_RANGE=0.0.0.0/0 export RANGE=10.124.0.0/28 export REGION=asia-northeast1 export CONNECTOR_NAME=ac-data-validation export ROUTER_NAME=rt-data-validation export ORIGIN_IP_NAME=ip-data-validation export NAT_NAME=nat-data-validation export SERVICE_ACCOUNT_NAME=sa-valitation # create network gcloud compute networks create ${NETWORK_NAME} --subnet-mode=custom gcloud compute networks subnets create ${SUBNET_NAME} \ --range=${RANGE} --network=vpc-data-validation --region=${REGION} gcloud compute firewall-rules create ${FIREWALL_NAME} \ --direction=INGRESS \ --priority=1000 \ --network=${NETWORK_NAME} \ --action=ALLOW \ --rules=${FIREWALL_RULES} \ --source-ranges=${FIREWALL_SOURCE_RANGE} # create vpc access cpnnector gcloud compute networks vpc-access connectors create ${CONNECTOR_NAME} \ --region=${REGION} \ --subnet-project=${PROJECT_ID} \ --subnet=${SUBNET_NAME} gcloud compute routers create ${ROUTER_NAME} \ --network=${NETWORK_NAME} \ --region=${REGION} gcloud compute addresses create ${ORIGIN_IP_NAME} --region=${REGION} gcloud compute routers nats create ${NAT_NAME} \ --router=${ROUTER_NAME} \ --region=${REGION} \ --nat-custom-subnet-ip-ranges=${SUBNET_NAME} \ --nat-external-ip-pool=${ORIGIN_IP_NAME} # create service account gcloud iam service-accounts create ${SERVICE_ACCOUNT_NAME} --display-name ${SERVICE_ACCOUNT_NAME} gcloud projects add-iam-policy-binding ${PROJECT_ID} \ --member serviceAccount:${SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com \ --role roles/bigquery.dataEditor gcloud projects add-iam-policy-binding ${PROJECT_ID} \ --member serviceAccount:${SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com \ --role roles/bigquery.user # deploy cloud run gcloud builds submit --tag gcr.io/${PROJECT_ID}/data-validation \ --project=${PROJECT_ID} gcloud run deploy data-validation --image gcr.io/${PROJECT_ID}/data-validation \ --service-account=${SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com \ --vpc-connector=${CONNECTOR_NAME} \ --vpc-egress=all-traffic \ --region=${REGION} --project=${PROJECT_ID}
まず、Cloud Run 実行用の VPC とサブネット、Redshift への接続を許可するためのファイウォールを作成し、
# create network gcloud compute networks create ${NETWORK_NAME} --subnet-mode=custom gcloud compute networks subnets create ${SUBNET_NAME} \ --range=${RANGE} --network=vpc-data-validation --region=${REGION} gcloud compute firewall-rules create ${FIREWALL_NAME} \ --direction=INGRESS \ --priority=1000 \ --network=${NETWORK_NAME} \ --action=ALLOW \ --rules=${FIREWALL_RULES} \ --source-ranges=${FIREWALL_SOURCE_RANGE}
Cloud Run からサーバレス VPC アクセスコネクタを使って Cloud NAT の固定 IP で Redshift にアクセスするために、サーバーレス VPC アクセスコネクタとルート、静的 IP アドレスと Cloud NAT を作成します。
# create vpc access cpnnector gcloud compute networks vpc-access connectors create ${CONNECTOR_NAME} \ --region=${REGION} \ --subnet-project=${PROJECT_ID} \ --subnet=${SUBNET_NAME} gcloud compute routers create ${ROUTER_NAME} \ --network=${NETWORK_NAME} \ --region=${REGION} gcloud compute addresses create ${ORIGIN_IP_NAME} --region=${REGION} gcloud compute routers nats create ${NAT_NAME} \ --router=${ROUTER_NAME} \ --region=${REGION} \ --nat-custom-subnet-ip-ranges=${SUBNET_NAME} \ --nat-external-ip-pool=${ORIGIN_IP_NAME}
続いて、Cloud Run 実行用のサービスアカウントを作成し、BigQuery データ編集者(roles/bigquery.dataEditor
)と BigQuery ユーザー(roles/bigquery.user
)のロールを付与します。
検証結果を BigQuery に書き込む必要がなければ、データ編集者ではなくデータ閲覧者ロールで大丈夫です。
また、はじめ BigQuery ユーザーの代わりに BigQuery ジョブユーザーを付与していたのですが、ジョブユーザーだと bigquery.readsessions.create
権限不足でエラーが発生しました。
権限を絞る必要がある場合は、ジョブユーザー + 読み取りセッションユーザーのご利用などをご検討ください。
# create service account gcloud iam service-accounts create ${SERVICE_ACCOUNT_NAME} --display-name ${SERVICE_ACCOUNT_NAME} gcloud projects add-iam-policy-binding ${PROJECT_ID} \ --member serviceAccount:${SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com \ --role roles/bigquery.dataEditor gcloud projects add-iam-policy-binding ${PROJECT_ID} \ --member serviceAccount:${SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com \ --role roles/bigquery.user
最後に、GitHub 上の deploy.sh
同様、Cloud Build でデプロイ対象のイメージをビルドし、Cloud Run をデプロイします。
Cloud Run のデプロイコマンドに、サーバレス VPC アクセスコネクタとサービスアカウントの指定を追加しました。
# deploy cloud run gcloud builds submit --tag gcr.io/${PROJECT_ID}/data-validation \ --project=${PROJECT_ID} gcloud run deploy data-validation --image gcr.io/${PROJECT_ID}/data-validation \ --service-account=${SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com \ --vpc-connector=${CONNECTOR_NAME} \ --vpc-egress=all-traffic \ --region=${REGION} --project=${PROJECT_ID}
無事、Cloud Run 環境が構築できました。
Google Cloud コンソール [VPC ネットワーク] > [IP アドレス] から静的 IP アドレスを確認し、AWS 側で、Redshift のセキュリティグループに、Cloud NAT の固定 IP からのアクセス許可設定を追加しました。
テーブル件数を比較
GitHub に準備されている、検証用のサンプルコードを修正して、Redshift と BigQuery のテーブルのデータ件数を比較してみます。
今回は検証結果を BigQuery にエクスポートするため、あらかじめ BigQuery の dataset_1
データセットに result_validate
という名前で以下のテーブルを作成済みです。
test.py を以下に修正して実行しました。
import os import re import requests PROJECT_ID = os.environ.get("DEVSHELL_PROJECT_ID") DESCRIBE_SERVICE = """ gcloud run services describe {service_name} --region=asia-northeast1 --project={project_id} """ def get_token(): with os.popen("gcloud auth print-identity-token") as cmd: token = cmd.read().strip() return token def get_cloud_run_url(service_name, project_id): describe_service = DESCRIBE_SERVICE.format( service_name=service_name, project_id=project_id ) with os.popen(describe_service) as service: description = service.read() return re.findall("URL:.*\n", description)[0].split()[1].strip() data = { "source_conn": { "source_type": "Redshift", "host": "cm-mikami-validate.xxxxxxxx.ap-northeast-1.redshift.amazonaws.com", "port":5439, "user": "user_name", "password": "xxxxxxxx", "database":"dev" }, "target_conn": { "source_type": "BigQuery", "project_id": PROJECT_ID, }, "type": "Column", "schema_name": "dataset_1", "table_name": "validate_sample", "target_schema_name": "cm-da-mikami-yuki-258308.dataset_1", "target_table_name": "validate_sample", "result_handler":{ "type":"BigQuery", "project_id": PROJECT_ID, "table_id": "dataset_1.result_validate" }, "aggregates": [ { "source_column": None, "target_column": None, "field_alias": "count", "type": "count", } ], } url = get_cloud_run_url("data-validation", PROJECT_ID) res = requests.post(url, headers={"Authorization": "Bearer " + get_token()}, json=data) print(res.content.decode())
※Redshift 接続情報は、一部伏せ字に変更しています。
mikami_yuki@cloudshell:~/DVT/professional-services-data-validator/samples/run (cm-da-mikami-yuki-258308)$ python3 test.py [{"validation_name": "count", "validation_type": "Column", "aggregation_type": "count", "source_table_name": "dataset_1.validate_sample", "source_column_name": null, "source_agg_value": "4", "target_table_name": "cm-da-mikami-yuki-258308.dataset_1.validate_sample", "target_column_name": null, "target_agg_value": "4", "group_by_columns": null, "primary_keys": null, "num_random_rows": null, "difference": 0.0, "pct_difference": 0.0, "pct_threshold": 0.0, "validation_status": "success", "run_id": "900ccc61-98ea-4620-8002-8a48f739c0e9", "labels": [], "start_time": "2022-08-05 14:03:19.427778+00:00", "end_time": "2022-08-05 14:03:22.676955+00:00"}]
コンソールの標準出力からも "validation_status": "success"
だったことが確認できますが、BigQuery の結果テーブルを確認してみます。
テーブルデータ件数が一致しているという検証結果が、BigQuery の指定テーブルに出力されていることが確認できました。
テーブルデータをサンプリングして比較
データ件数は一致していても、値が異なるデータが混じっている恐れがあります。
先ほどの test.py
のリクエストデータ部分を以下に修正した、test_row.py
を作成して実行しました。
data = { "source_conn": { "source_type": "Redshift", "host": "cm-mikami-validate.xxxxxxxx.ap-northeast-1.redshift.amazonaws.com", "port":5439, "user": "user_name", "password": "xxxxxxxx", "database":"dev" }, "target_conn": { "source_type": "BigQuery", "project_id": PROJECT_ID, }, "type": "Row", "schema_name": "dataset_1", "table_name": "validate_sample", "target_schema_name": "cm-da-mikami-yuki-258308.dataset_1", "target_table_name": "validate_sample", "result_handler":{ "type":"BigQuery", "project_id": PROJECT_ID, "table_id": "dataset_1.result_validate" }, "threshold": 0, "format": "table", "filters": [], "use_random_rows": True, "random_row_batch_size": "100", "calculated_fields": [], "comparison_fields": [ { "source_column": "col_1", "target_column": "col_1", "field_alias": "col_1", "cast": None }, { "source_column": "col_2", "target_column": "col_2", "field_alias": "col_2", "cast": None }, { "source_column": "col_3", "target_column": "col_3", "field_alias": "col_3", "cast": None } ], "dependent_aliases": [ "col_1", "col_2", "col_3", "col_1" ], "primary_keys": [ { "source_column": "col_1", "target_column": "col_1", "field_alias": "col_1", "cast": None } ] }
※Redshift 接続情報は、一部伏せ字に変更しています。
col_1 カラムを主キー(primary_keys
)として、任意の 100 行(random_row_batch_size
)をサンプリングし、col_1, col_2, col_3(comparison_fields
) の値が一致しているか確認します。
mikami_yuki@cloudshell:~/DVT/professional-services-data-validator/samples/run (cm-da-mikami-yuki-258308)$ python3 test_row.py [{"validation_name": "col_2", "validation_type": "Row", "aggregation_type": NaN, "source_table_name": "dataset_1.validate_sample", "source_column_name": "col_2", "source_agg_value": "value1", "target_table_name": "cm-da-mikami-yuki-258308.dataset_1.validate_sample", "target_column_name": "col_2", "target_agg_value": "value1", "group_by_columns": "{\"col_1\": \"1\"}", "primary_keys": "{col_1}", "num_random_rows": "100", "difference": null, "pct_difference": null, "pct_threshold": 0, "validation_status": "success", "run_id": "14ceb734-6b68-4a43-97ad-8586769c47f3", "labels": [], "start_time": "2022-08-05 14:11:29.989923+00:00", "end_time": "2022-08-05 14:11:32.346292+00:00"}, {"validation_name": "col_2", "validation_type": "Row", "aggregation_type": NaN, "source_table_name": "dataset_1.validate_sample", "source_column_name": "col_2", "source_agg_value": "value2", "target_table_name": "cm-da-mikami-yuki-258308.dataset_1.validate_sample", "target_column_name": "col_2", "target_agg_value": "value2", "group_by_columns": "{\"col_1\": \"2\"}", "primary_keys": "{col_1}", "num_random_rows": "100", "difference": null, "pct_difference": null, "pct_threshold": 0, "validation_status": "success", "run_id": "14ceb734-6b68-4a43-97ad-8586769c47f3", "labels": [], "start_time": "2022-08-05 14:11:29.989923+00:00", "end_time": "2022-08-05 14:11:32.346292+00:00"}, {"validation_name": "col_2", "validation_type": "Row", "aggregation_type": NaN, "source_table_name": "dataset_1.validate_sample", "source_column_name": "col_2", "source_agg_value": "value3", "target_table_name": "cm-da-mikami-yuki-258308.dataset_1.validate_sample", "target_column_name": "col_2", "target_agg_value": "value3", "group_by_columns": "{\"col_1\": \"3\"}", "primary_keys": "{col_1}", "num_random_rows": "100", "difference": null, "pct_difference": null, "pct_threshold": 0, "validation_status": "success", "run_id": "14ceb734-6b68-4a43-97ad-8586769c47f3", "labels": [], "start_time": "2022-08-05 14:11:29.989923+00:00", "end_time": "2022-08-05 14:11:32.346292+00:00"}, {"validation_name": "col_2", "validation_type": "Row", "aggregation_type": NaN, "source_table_name": "dataset_1.validate_sample", "source_column_name": "col_2", "source_agg_value": "value4", "target_table_name": "cm-da-mikami-yuki-258308.dataset_1.validate_sample", "target_column_name": "col_2", "target_agg_value": "value4", "group_by_columns": "{\"col_1\": \"4\"}", "primary_keys": "{col_1}", "num_random_rows": "100", "difference": null, "pct_difference": null, "pct_threshold": 0, "validation_status": "success", "run_id": "14ceb734-6b68-4a43-97ad-8586769c47f3", "labels": [], "start_time": "2022-08-05 14:11:29.989923+00:00", "end_time": "2022-08-05 14:11:32.346292+00:00"}, {"validation_name": "col_3", "validation_type": "Row", "aggregation_type": NaN, "source_table_name": "dataset_1.validate_sample", "source_column_name": "col_3", "source_agg_value": "aaa", "target_table_name": "cm-da-mikami-yuki-258308.dataset_1.validate_sample", "target_column_name": "col_3", "target_agg_value": "aaa", "group_by_columns": "{\"col_1\": \"1\"}", "primary_keys": "{col_1}", "num_random_rows": "100", "difference": null, "pct_difference": null, "pct_threshold": 0, "validation_status": "success", "run_id": "14ceb734-6b68-4a43-97ad-8586769c47f3", "labels": [], "start_time": "2022-08-05 14:11:29.989923+00:00", "end_time": "2022-08-05 14:11:32.346292+00:00"}, {"validation_name": "col_3", "validation_type": "Row", "aggregation_type": NaN, "source_table_name": "dataset_1.validate_sample", "source_column_name": "col_3", "source_agg_value": "bbb", "target_table_name": "cm-da-mikami-yuki-258308.dataset_1.validate_sample", "target_column_name": "col_3", "target_agg_value": "bbb", "group_by_columns": "{\"col_1\": \"2\"}", "primary_keys": "{col_1}", "num_random_rows": "100", "difference": null, "pct_difference": null, "pct_threshold": 0, "validation_status": "success", "run_id": "14ceb734-6b68-4a43-97ad-8586769c47f3", "labels": [], "start_time": "2022-08-05 14:11:29.989923+00:00", "end_time": "2022-08-05 14:11:32.346292+00:00"}, {"validation_name": "col_3", "validation_type": "Row", "aggregation_type": NaN, "source_table_name": "dataset_1.validate_sample", "source_column_name": "col_3", "source_agg_value": "ccc", "target_table_name": "cm-da-mikami-yuki-258308.dataset_1.validate_sample", "target_column_name": "col_3", "target_agg_value": "ccc", "group_by_columns": "{\"col_1\": \"3\"}", "primary_keys": "{col_1}", "num_random_rows": "100", "difference": null, "pct_difference": null, "pct_threshold": 0, "validation_status": "success", "run_id": "14ceb734-6b68-4a43-97ad-8586769c47f3", "labels": [], "start_time": "2022-08-05 14:11:29.989923+00:00", "end_time": "2022-08-05 14:11:32.346292+00:00"}, {"validation_name": "col_3", "validation_type": "Row", "aggregation_type": NaN, "source_table_name": "dataset_1.validate_sample", "source_column_name": "col_3", "source_agg_value": "ddd", "target_table_name": "cm-da-mikami-yuki-258308.dataset_1.validate_sample", "target_column_name": "col_3", "target_agg_value": "xyz", "group_by_columns": "{\"col_1\": \"4\"}", "primary_keys": "{col_1}", "num_random_rows": "100", "difference": null, "pct_difference": null, "pct_threshold": 0, "validation_status": "fail", "run_id": "14ceb734-6b68-4a43-97ad-8586769c47f3", "labels": [], "start_time": "2022-08-05 14:11:29.989923+00:00", "end_time": "2022-08-05 14:11:32.346292+00:00"}, {"validation_name": "col_1", "validation_type": NaN, "aggregation_type": NaN, "source_table_name": NaN, "source_column_name": NaN, "source_agg_value": NaN, "target_table_name": NaN, "target_column_name": NaN, "target_agg_value": NaN, "group_by_columns": "{\"col_1\": \"1\"}", "primary_keys": NaN, "num_random_rows": NaN, "difference": null, "pct_difference": null, "pct_threshold": 0, "validation_status": "success", "run_id": "14ceb734-6b68-4a43-97ad-8586769c47f3", "labels": [], "start_time": "2022-08-05 14:11:29.989923+00:00", "end_time": "2022-08-05 14:11:32.346292+00:00"}, {"validation_name": "col_1", "validation_type": NaN, "aggregation_type": NaN, "source_table_name": NaN, "source_column_name": NaN, "source_agg_value": NaN, "target_table_name": NaN, "target_column_name": NaN, "target_agg_value": NaN, "group_by_columns": "{\"col_1\": \"2\"}", "primary_keys": NaN, "num_random_rows": NaN, "difference": null, "pct_difference": null, "pct_threshold": 0, "validation_status": "success", "run_id": "14ceb734-6b68-4a43-97ad-8586769c47f3", "labels": [], "start_time": "2022-08-05 14:11:29.989923+00:00", "end_time": "2022-08-05 14:11:32.346292+00:00"}, {"validation_name": "col_1", "validation_type": NaN, "aggregation_type": NaN, "source_table_name": NaN, "source_column_name": NaN, "source_agg_value": NaN, "target_table_name": NaN, "target_column_name": NaN, "target_agg_value": NaN, "group_by_columns": "{\"col_1\": \"3\"}", "primary_keys": NaN, "num_random_rows": NaN, "difference": null, "pct_difference": null, "pct_threshold": 0, "validation_status": "success", "run_id": "14ceb734-6b68-4a43-97ad-8586769c47f3", "labels": [], "start_time": "2022-08-05 14:11:29.989923+00:00", "end_time": "2022-08-05 14:11:32.346292+00:00"}, {"validation_name": "col_1", "validation_type": NaN, "aggregation_type": NaN, "source_table_name": NaN, "source_column_name": NaN, "source_agg_value": NaN, "target_table_name": NaN, "target_column_name": NaN, "target_agg_value": NaN, "group_by_columns": "{\"col_1\": \"4\"}", "primary_keys": NaN, "num_random_rows": NaN, "difference": null, "pct_difference": null, "pct_threshold": 0, "validation_status": "success", "run_id": "14ceb734-6b68-4a43-97ad-8586769c47f3", "labels": [], "start_time": "2022-08-05 14:11:29.989923+00:00", "end_time": "2022-08-05 14:11:32.346292+00:00"}]
コンソール出力だとわかりにくいので、BigQuery の結果テーブルを確認してみます。
1 件だけ検証結果 NG が検出されました。 NG レコードに絞って、内容確認してみます。
準備した検証データの通り、datset_1
データセット(スキーマ)の validate_sample
テーブル col_3
カラムの値が一致していないことが検出できました。
まとめ(所感)
DWH 移行などの際には避けられないデータ検証作業ですが、汎用的に無料で使える公開ツールはあまりないのではないかと思います。
テーブル数やデータ件数が少ない場合は Excel などを駆使してデータ比較できると思いますが、データが多くなってくると、自前で検証用スクリプトを準備したくなるのが世の常かと。 ですが、検証用スクリプトを作るための工数がかかったり、検証スクリプトには十分なテスト工数を確保できない都合上、検証結果で NG が出た場合に原因が実データなのかスクリプトの不具合なのかの切り分けに苦労したり、またスクリプトを作った人と使う人が異なる場合に使い方の理解に時間がかかったり。。 いろいろと悩ましいのが、データ検証作業ではないかと思います。
その点、この Data Validation Tool は様々なソースデータに対応しており、GitHub レポジトリ内のドキュメントではインストール方法からサンプルコマンドなどの詳細な説明もあり、また Cloud run や Airflow で実行する場合のサンプルも準備されていて、至れり尽くせり!
BigQuery で異なるリージョン間のデータ移行が必要になった場合や、異なるDB/DWH 間のデータ検証やが必要になった場合には、今後も利用させていただこうと思ってます。