Deequでデータ品質をテストする

2021.11.04

Introduction

最近は大量のデータを扱う機会も多くなりました。
機械学習でも、モデル作成するときにデータはとても重要ですし、
データ分析をおこなってビジネス上の意思決定を行うこともあります。
そういったとき、データの量も重要ですが品質も大事になってきます。

プログラム開発をおこなうとき、プログラムに対してテストを記述して品質を担保します。
データに対してもテストを作成することでデータに対する品質を保証します。

本稿ではAmazon Deequを使用したデータのテスト方法について紹介します。

Deequ?

Deequとは、Amazonで開発されているOSSのデータ用テストツールです。
データに対してデータ品質メトリクス計算やデータ品質の制約チェックなどが可能です。
DeequはApache Spark上で動作し、大規模なデータセット(数十億レコード規模らしい)
に対してスケール可能です。

Apache Spark?

Apache Sparkは、大規模データを扱うための分散処理フレームワークです。
SparkはMapReduceの処理速度の改善を目的に開発され、現在はMapReduceの代替としても利用可能です。
現在はバージョン3.2がリリースされていて、ストリーミング処理や機械学習、
グラフ分析(Spark GraphX)も可能となっています。

Environment

  • OS : MacOS 10.15.7
  • Java : openjdk 11.0.2
  • Python 3.9.4

なお、セットアップにはsdkmanをつかってます。

Setup

ではDeequを動かすための環境を設定します。
まずはsdkmanをつかってSparkのインストールをします。
また、有効なAWSのアクセスキーとシークレットキーがある前提です。

# Deequの対応がSpark 3.1なので
% sdk install spark 3.1.2

spark-shell↓のようになればOK。

% spark-shell --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.1.2
      /_/

Using Scala version 2.12.10, OpenJDK 64-Bit Server VM, 11.0.2
Branch HEAD
Compiled by user centos on 2021-05-24T04:27:48Z

S3バケット作成&テスト用CSVアップロード

AWSコンソールでCLIでもよいので、S3にバケットを作成します。
テスト用のペンギンデータをkaggleから取得し、
penguins_size.csvをアップロードしましょう。

ライブラリを取得

今回動作確認をするのにいくつかライブラリが必要なので、
MVN Repositoryから下記jarを取得します。

  • deequ-2.0.0-spark-3.1.jar
  • aws-java-sdk-bundle-1.12.99.jar
  • hadoop-aws-3.2.2.jar

Exeute Spark Shell

ではSpark Shellを起動してDeequを動かしてみましょう。
shell起動時にさきほど取得したjarを指定して起動します。

% spark-shell --conf spark.jars=./deequ-2.0.0-spark-3.1.jar,./aws-java-sdk-bundle-1.12.99.jar,./hadoop-aws-3.2.2.jar

起動したら下記コードを入力してみます。
S3からCSVを取得し、データの表示ができたらOKです。
※コンソールにコピペ

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types._

spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "<AWS_ACCESS_KEY>")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "<AWS_SECRET_KEY>")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

val customSchema = StructType(Array(
    StructField("species", StringType, false),
    StructField("island", StringType, true),
    StructField("culmen_length_mm", DoubleType, true),
    StructField("culmen_depth_mm", DoubleType, true),
    StructField("flipper_length_mm", DoubleType,true),
    StructField("body_mass_g", DoubleType, true),
    StructField("sex", StringType, true)))

val df = spark.read.option("header", "true").option("inferSchema", "true")
    .schema(customSchema).csv("s3a://<bucket>/penguins_size.csv")

フィールドの型を定義したいので、StructTypeを使用してます。
dataframeのtakeメソッドを実行してみると、内容が取得できているのがわかります。

scala> df.take(2).foreach(x => println(x))
[Adelie,Torgersen,39.1,18.7,181.0,3750.0,MALE]
[Adelie,Torgersen,39.5,17.4,186.0,3800.0,FEMALE]

AnalysisRunnerを使えばデータの分析が可能です。

import com.amazon.deequ.analyzers.{Compliance, Correlation, Size, Completeness, Mean, ApproxCountDistinct}
import com.amazon.deequ.analyzers.runners.{AnalysisRunner, AnalyzerContext}
import com.amazon.deequ.analyzers.runners.AnalyzerContext.successMetricsAsDataFrame

val analysisResult: AnalyzerContext = { AnalysisRunner
  .onData(df)
  .addAnalyzer(Size())
  .addAnalyzer(Completeness("species"))
  .addAnalyzer(ApproxCountDistinct("species"))
  .addAnalyzer(Mean("body_mass_g"))
  .addAnalyzer(Compliance("top body_mass_g", "body_mass_g >= 3500"))
  .addAnalyzer(Correlation("culmen_length_mm", "culmen_depth_mm"))
  .run()
}

val metrics = successMetricsAsDataFrame(spark, analysisResult)

showメソッドを使うと、分析結果を表示できます。

scala > metrics.show()
+-----------+--------------------+-------------------+--------------------+
|     entity|            instance|               name|               value|
+-----------+--------------------+-------------------+--------------------+
|     Column|         body_mass_g|               Mean|   4201.754385964912|
|     Column|     top body_mass_g|         Compliance|  0.7877906976744186|
|    Dataset|                   *|               Size|               344.0|
|Mutlicolumn|culmen_length_mm,...|        Correlation|-0.23505287035553338|
|     Column|             species|       Completeness|                 1.0|
|     Column|             species|ApproxCountDistinct|                 3.0|
+-----------+--------------------+-------------------+--------------------+
  • body_mass_gの平均は4201.75
  • speciesに欠損値なし、固有値は3つ

など、分析した情報が表示されています。

データの制約について、ある程度データに基づいた提案を行わせることも可能です。
Deequはデータ分布に基づき、使えそうな制約を提案してくれる機能を持っています。

import com.amazon.deequ.suggestions.{ConstraintSuggestionRunner, Rules}
import spark.implicits._

val suggestionResult = { ConstraintSuggestionRunner()
  .onData(df)
  .addConstraintRules(Rules.DEFAULT)
  .run()
}

val suggestionDataFrame = suggestionResult.constraintSuggestions.flatMap {
  case (column, suggestions) =>
    suggestions.map { constraint =>
      (column, constraint.description, constraint.codeForConstraint)
    }
}.toSeq.toDS()
scala> suggestionDataFrame.show()
+-----------------+--------------------+--------------------+
|               _1|                  _2|                  _3|
+-----------------+--------------------+--------------------+
|          species|'species' is not ...|.isComplete("spec...|
|          species|'species' has val...|.isContainedIn("s...|
|      body_mass_g|'body_mass_g' has...|.hasCompleteness(...|
|      body_mass_g|'body_mass_g' has...|.isNonNegative("b...|
|              sex|   'sex' is not null|  .isComplete("sex")|
|              sex|'sex' has value r...|.isContainedIn("s...|
| culmen_length_mm|'culmen_length_mm...|.hasCompleteness(...|
| culmen_length_mm|'culmen_length_mm...|.isNonNegative("c...|
|           island|'island' is not null|.isComplete("isla...|
|           island|'island' has valu...|.isContainedIn("i...|
|  culmen_depth_mm|'culmen_depth_mm'...|.hasCompleteness(...|
|  culmen_depth_mm|'culmen_depth_mm'...|.isNonNegative("c...|
|flipper_length_mm|'flipper_length_m...|.hasCompleteness(...|
|flipper_length_mm|'flipper_length_m...|.isNonNegative("f...|
+-----------------+--------------------+--------------------+

各フィールドについて、どういったデータであるかチェックするための提案が表示されています。
これをベースに制約を記述すると楽です。
なお、
suggestionDataFrame.show(truncate=false)
とすれば内容を省略せず表示してくれます。

ではデータに対してテストを実行してみます。
データ総数のテストやnullチェック、Containチェックなどを記述しています。

import com.amazon.deequ.{VerificationSuite, VerificationResult}
import com.amazon.deequ.VerificationResult.checkResultsAsDataFrame
import com.amazon.deequ.checks.{Check, CheckLevel}

val verificationResult = { VerificationSuite()
  .onData(df)
  .addCheck(
    Check(CheckLevel.Error, "unit testing my data")
      .hasSize(_ >= 100)
      .isComplete("species")
      .isNonNegative("body_mass_g")
      .isComplete("body_mass_g")
      .isContainedIn("species", Array("Adelie", "Gentoo", "Chinstrap")))
    .run()
    }

val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult)
resultDataFrame.show(truncate=false)

実行結果を表示してみます。
body_mass_gに数件の欠損値があるようで、body_mass_gのnullチェックがエラーになっています。

scala> resultDataFrame.show()
+--------------------+-----------+------------+--------------------+-----------------+--------------------+
|               check|check_level|check_status|          constraint|constraint_status|  constraint_message|
+--------------------+-----------+------------+--------------------+-----------------+--------------------+
|unit testing my data|      Error|       Error|SizeConstraint(Si...|          Success|                    |
|unit testing my data|      Error|       Error|CompletenessConst...|          Success|                    |
|unit testing my data|      Error|       Error|ComplianceConstra...|          Success|                    |
|unit testing my data|      Error|       Error|CompletenessConst...|          Failure|Value: 0.99418604...|
|unit testing my data|      Error|       Error|ComplianceConstra...|          Success|                    |
+--------------------+-----------+------------+--------------------+-----------------+--------------------+

Summary

今回はDeequをつかってデータセットに対するテストを実行してみました。
Spark上に構築されているため、大量のデータに対しても実行可能ですし、
SagemakerなどのMLパイプラインに組み込むこともできます。

References

Appendix

python(pySpark)を使ったサンプルも試したのでついでに。

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pydeequ.profiles import *

from pyspark.conf import SparkConf
import pydeequ
import pandas as pd
from pydeequ.checks import *
from pydeequ.verification import *
from pydeequ.analyzers import *

access_key = '<ACCESS KEY>'
secret_key = '<SECRET KEY>'

spark= SparkSession.builder.appName("CsvReader")\
  .config('spark.jars.packages', \
  'org.apache.hadoop:hadoop-aws:3.2.2,com.amazonaws:aws-java-sdk-bundle:1.12.96,'\
   + pydeequ.deequ_maven_coord).config("spark.jars.excludes", pydeequ.f2j_maven_coord).getOrCreate()

spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", access_key)
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key)
spark._jsc.hadoopConfiguration().set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
spark._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")
spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")

customSchema = StructType([\
    StructField("species", StringType(), False),\
    StructField("island", StringType(), True),\
    StructField("culmen_length_mm", DoubleType(), True),\
    StructField("culmen_depth_mm", DoubleType(), True),\
    StructField("flipper_length_mm", DoubleType(), True),\
    StructField("body_mass_g", DoubleType(), True),\
    StructField("sex", StringType(), True)\
])

ps_df = spark.read.format('csv')\
    .options(header='true', inferSchema='true')\
    .schema(customSchema)\
    .load('s3a://my-aws-datascience-example/csv/penguins_size.csv')

# to pandas
#df = ps_df.toPandas()
#print(ps_df.dtypes)

check = Check(spark, CheckLevel.Error, "Integrity checks")
checkResult = VerificationSuite(spark)\
    .onData(ps_df)\
    .addCheck(check\
        .isContainedIn('species', ['Adelie', 'Chinstrap','Gentoo'])\
        .isComplete("culmen_depth_mm")\
        .isNonNegative("body_mass_g"))\
    .run()

checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show()

spark.stop()