Glue Data Qualityで品質検査ルールに不合格となったレコードを特定する

品質検査ルールで不合格のレコードをAmazon Athenaを使って特定します
2024.06.24

データアナリティクス事業本部インテグレーション部機械学習チーム・新納(にいの)です。

データのETLを担うサービスであるAWS Glueの機能群の一つ、Glue Data QualityではGUIベースの操作でデータの品質検査が可能です。データレイクやデータウェアハウスへのデータ投入前に、値の長さチェックや特定カラムがNULLでないかどうかなど、あらかじめ定義しておいたルールに合格しているかどうかを確認できます。

今回は、定義したルールに不合格となったレコードを特定する方法を検証してみました。

Glue Data Qualityについておさらい

ユーザーが定義したルールに従ってデータの品質検査をできるAWS Glueの機能のひとつです。AWSが開発したOSSであるDeequを使用しており、ルールの定義にはDQDL(Data Quality Definition Language)を利用します。

詳細は以下をご参照ください。

以下のエントリでは動画で実際の操作をご覧いただけます。

ルールに不合格となったレコードを特定する

シナリオ

今回は以下のような架空のユーザーデータを使って、nameがNULLのものはないか、credit_cardが全て16桁であるかチェックします。このルールに合致しないレコードをAmazon Athenaを使って特定します。

"name","name_hiragana","age","birthday","gender","email","phone_number","zipcode","address","company","credit_card","exp_date"
"鈴木 賢司","すずき けんじ",72,"1952-06-11","M","kenjisuzuki@example.jp","04-3537-7908","337-1051","埼玉県川口市南鳩ヶ谷2-3-10","","4684847007028235","06/27"
"小宮 真由子","こみや まゆこ",50,"1974-04-07","F","komiyamayuko@example.com","0567-38-4614","453-3750","愛知県春日井市高蔵寺町1-5-13","合同会社KANDICapitalManagement","3574517766884178","05/28"
"堀 勇希","ほり ゆうき",55,"1968-11-13","M","yuuki_hori@example.net","0228-18-2254","988-2801","宮城県仙台市泉区松陵1-2-5","株式会社ゼウス","3540484972504721","02/28"
"佐藤 涼","さとう りょう",31,"1992-12-02","M","ryou_sato@example.ne.jp","0823-74-1579","725-9090","広島県呉市本通2-3-13","有限会社ビクトリー","3564069950438371","05/26"
"長浜 裕子","ながはま ゆうこ",24,"2000-05-10","F","nagahama_yuuko@example.org","0466-71-5467","233-9894","神奈川県横浜市旭区笹野台1-1-17","合同会社エバレッジ","372675929782430","10/25"
"前田 雅也","まえだ まさや",45,"1979-04-05","M","maeda45@example.jp","03-1493-8474","118-4100","東京都台東区松が谷2-1-3ユーハウス106","","4378069700369912","01/26"
"片山 隼","かたやま はやと",30,"1993-12-29","M","katayama_1229@example.org","03-1912-4421","123-8031","東京都江東区亀戸2-1-6","合同会社安清","371168643360445","02/25"
...(中略)...
"蔵 めそ","くら めそ",30,"1993-11-30","M","kura_meso@example.co.jp","06-9636-4964","586-5852","大阪府大阪市中央区","","13584595506048971","09/28"

Glue Job作成

Data QualityはGlue Data CatalogとGlue Jobのどちらかから利用可能です。今回はGlue Job(Visual Job)のを利用し、S3に配置した上記のファイルを品質チェックしてその結果をS3バケットに出力します。

SourceにはAmazon S3を指定し、インプットデータのURIを指定します。

Evaluate Data Qualityノードを追加してデータ品質ルールを定義します。 Ruleset editorに以下の通りDQDLを定義します。

Rules = [
    ColumnLength "credit_card" = 16,
    IsComplete "name"
]

Data quality transform outputではデータ品質検査の出力について定義します。

項目 内容
Original data 有効にすると元の入力データを出力する。
Add new columns to indicate data quality errors Original dataを有効にした場合のみ選択可能。有効にすると以下のカラムを元の入力データに追加して出力する。
dataqualityrulespass:合格したルール
dataqualityrulefail:不合格になったルール
dataqualityrulesskip:スキップされたルール。ただし、次のルールはデータセットレベルで有効なため、不合格レコードを特定できない。 AggregateMatch・ColumnCount・ColumnExists・ColumnNamesMatchPattern・CustomSql・RowCount・RowCountMatch・StandardDeviation・Mean・ColumnCorrelation
dataqualityevaluationresult:ルールの合否(Failed/Passed)
Data quality results 有効にするとルールと合否ステータスを出力する。

上記の設定は全て有効にして、最終的なアウトプットにData Qualityの結果カラムを追加されるように設定しました。

Data quality actionsでは、ルールが不合格だった場合にジョブをどうするか設定可能です。「Continue with job(ジョブを続行する)」、「Fail job after loading target data(ターゲットデータをロードしてからジョブを異常終了する)」のどちらかを選んでください。「Fail job without loading target data(ターゲットデータをロードせずジョブを異常終了する)」を選択するとData Qualityの結果カラムが出力されなくなります。今回はロードしてから異常終了する方を選択しました。

Original dataを有効にするとrowLevelOutcomesノードが、Data quality resultsを有効にするとruleOutcomesノードがそれぞれ自動で追加されます。この2つのノードには手動で設定を追加する必要はありません。

それぞれの結果をS3バケットに保存するため、Data targetとしてAmazon S3ノードを追加します。rowLevelOutcomesの出力結果を使って不合格レコードを特定するため、わかりやすいようにruleOutcomesと出力先のフォルダは分けておきましょう。

設定が完了したらSaveからジョブを保存し、ジョブを実行します。今回は「Fail job after loading target data(ターゲットデータをロードしてからジョブを異常終了する)」を選択しており、ルールに適合しないレコードがあるため、Error Category: UNCLASSIFIED_ERROR; AssertionError: The job failed due to failing DQ rules for nodeというエラーでジョブ自体が異常終了します。

Data qualityタブを見ると、ColumnLength "credit_card" = 16のルールが不合格だったことが確認できます。どのレコードが不合格だったのか特定してみましょう。

rowLevelOutcomesの出力結果を保存したファイルを確認すると、元の入力データにData Qualityの結果カラムが追加されていることが確認できます。このままでは確認しづらいため、Amazon Athenaからクエリできるように設定してみましょう。

Amazon Athenaで不合格レコードを特定

今回はGlue crawlerを使って、rowLevelOutcomesの出力先からテーブルを作成しました。テーブル定義は以下の通り。

CREATE EXTERNAL TABLE `user_jpoutput`(
  `name` string, 
  `name_hiragana` string, 
  `age` string, 
  `birthday` string, 
  `gender` string, 
  `email` string, 
  `phone_number` string, 
  `zipcode` string, 
  `address` string, 
  `company` string, 
  `credit_card` string, 
  `exp_date` string, 
  `dataqualityrulespass` array<string>, 
  `dataqualityrulesfail` array<string>, 
  `dataqualityrulesskip` array<string>, 
  `dataqualityevaluationresult` string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://<rowLevelOutcomesの出力先S3バケット>/'
TBLPROPERTIES (
  'CrawlerSchemaDeserializerVersion'='1.0', 
  'CrawlerSchemaSerializerVersion'='1.0', 
  'UPDATED_BY_CRAWLER'='nino-dq-test', 
  'averageRecordSize'='197', 
  'classification'='parquet', 
  'compressionType'='none', 
  'objectCount'='2', 
  'recordCount'='2002', 
  'sizeKey'='221694', 
  'typeOfData'='file')

作成されたテーブルを見ると、dataqualityrulespassdataqualityrulefaildataqualityrulesskipdataqualityevaluationresultの4つのData Qualityの結果カラムが追加されていることが確認できます。

WHERE "dataqualityevaluationresult" = 'Failed'でクエリすればルールに不合格となったレコードを特定することができました。

最後に

データ品質ルールに不合格となったレコードを特定する方法をご紹介しました。この方法を使えば、データレイク・データウェアハウスへ投入する前に不具合のあるデータを検知し、不具合のあるレコードに関してすぐに対策を取ることができます。

参考資料