
Deequでデータ品質をテストする
この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
Introduction
最近は大量のデータを扱う機会も多くなりました。
機械学習でも、モデル作成するときにデータはとても重要ですし、
データ分析をおこなってビジネス上の意思決定を行うこともあります。
そういったとき、データの量も重要ですが品質も大事になってきます。
プログラム開発をおこなうとき、プログラムに対してテストを記述して品質を担保します。
データに対してもテストを作成することでデータに対する品質を保証します。
本稿ではAmazon Deequを使用したデータのテスト方法について紹介します。
Deequ?
Deequとは、Amazonで開発されているOSSのデータ用テストツールです。
データに対してデータ品質メトリクス計算やデータ品質の制約チェックなどが可能です。
DeequはApache Spark上で動作し、大規模なデータセット(数十億レコード規模らしい)
に対してスケール可能です。
Apache Spark?
Apache Sparkは、大規模データを扱うための分散処理フレームワークです。
SparkはMapReduceの処理速度の改善を目的に開発され、現在はMapReduceの代替としても利用可能です。
現在はバージョン3.2がリリースされていて、ストリーミング処理や機械学習、
グラフ分析(Spark GraphX)も可能となっています。
Environment
- OS : MacOS 10.15.7
- Java : openjdk 11.0.2
- Python 3.9.4
なお、セットアップにはsdkmanをつかってます。
Setup
ではDeequを動かすための環境を設定します。
まずはsdkmanをつかってSparkのインストールをします。
また、有効なAWSのアクセスキーとシークレットキーがある前提です。
# Deequの対応がSpark 3.1なので % sdk install spark 3.1.2
spark-shell↓のようになればOK。
% spark-shell --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.1.2
      /_/
Using Scala version 2.12.10, OpenJDK 64-Bit Server VM, 11.0.2
Branch HEAD
Compiled by user centos on 2021-05-24T04:27:48Z
S3バケット作成&テスト用CSVアップロード
AWSコンソールでCLIでもよいので、S3にバケットを作成します。
テスト用のペンギンデータをkaggleから取得し、
penguins_size.csvをアップロードしましょう。
ライブラリを取得
今回動作確認をするのにいくつかライブラリが必要なので、
MVN Repositoryから下記jarを取得します。
- deequ-2.0.0-spark-3.1.jar
- aws-java-sdk-bundle-1.12.99.jar
- hadoop-aws-3.2.2.jar
Exeute Spark Shell
ではSpark Shellを起動してDeequを動かしてみましょう。
shell起動時にさきほど取得したjarを指定して起動します。
% spark-shell --conf spark.jars=./deequ-2.0.0-spark-3.1.jar,./aws-java-sdk-bundle-1.12.99.jar,./hadoop-aws-3.2.2.jar
起動したら下記コードを入力してみます。
S3からCSVを取得し、データの表示ができたらOKです。
※コンソールにコピペ
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types._
spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "<AWS_ACCESS_KEY>")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "<AWS_SECRET_KEY>")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
val customSchema = StructType(Array(
    StructField("species", StringType, false),
    StructField("island", StringType, true),
    StructField("culmen_length_mm", DoubleType, true),
    StructField("culmen_depth_mm", DoubleType, true),
    StructField("flipper_length_mm", DoubleType,true),
    StructField("body_mass_g", DoubleType, true),
    StructField("sex", StringType, true)))
val df = spark.read.option("header", "true").option("inferSchema", "true")
    .schema(customSchema).csv("s3a://<bucket>/penguins_size.csv")
フィールドの型を定義したいので、StructTypeを使用してます。
dataframeのtakeメソッドを実行してみると、内容が取得できているのがわかります。
scala> df.take(2).foreach(x => println(x)) [Adelie,Torgersen,39.1,18.7,181.0,3750.0,MALE] [Adelie,Torgersen,39.5,17.4,186.0,3800.0,FEMALE]
AnalysisRunnerを使えばデータの分析が可能です。
import com.amazon.deequ.analyzers.{Compliance, Correlation, Size, Completeness, Mean, ApproxCountDistinct}
import com.amazon.deequ.analyzers.runners.{AnalysisRunner, AnalyzerContext}
import com.amazon.deequ.analyzers.runners.AnalyzerContext.successMetricsAsDataFrame
val analysisResult: AnalyzerContext = { AnalysisRunner
  .onData(df)
  .addAnalyzer(Size())
  .addAnalyzer(Completeness("species"))
  .addAnalyzer(ApproxCountDistinct("species"))
  .addAnalyzer(Mean("body_mass_g"))
  .addAnalyzer(Compliance("top body_mass_g", "body_mass_g >= 3500"))
  .addAnalyzer(Correlation("culmen_length_mm", "culmen_depth_mm"))
  .run()
}
val metrics = successMetricsAsDataFrame(spark, analysisResult)
showメソッドを使うと、分析結果を表示できます。
scala > metrics.show() +-----------+--------------------+-------------------+--------------------+ | entity| instance| name| value| +-----------+--------------------+-------------------+--------------------+ | Column| body_mass_g| Mean| 4201.754385964912| | Column| top body_mass_g| Compliance| 0.7877906976744186| | Dataset| *| Size| 344.0| |Mutlicolumn|culmen_length_mm,...| Correlation|-0.23505287035553338| | Column| species| Completeness| 1.0| | Column| species|ApproxCountDistinct| 3.0| +-----------+--------------------+-------------------+--------------------+
- body_mass_gの平均は4201.75
- speciesに欠損値なし、固有値は3つ
など、分析した情報が表示されています。
データの制約について、ある程度データに基づいた提案を行わせることも可能です。
Deequはデータ分布に基づき、使えそうな制約を提案してくれる機能を持っています。
import com.amazon.deequ.suggestions.{ConstraintSuggestionRunner, Rules}
import spark.implicits._
val suggestionResult = { ConstraintSuggestionRunner()
  .onData(df)
  .addConstraintRules(Rules.DEFAULT)
  .run()
}
val suggestionDataFrame = suggestionResult.constraintSuggestions.flatMap {
  case (column, suggestions) =>
    suggestions.map { constraint =>
      (column, constraint.description, constraint.codeForConstraint)
    }
}.toSeq.toDS()
scala> suggestionDataFrame.show()
+-----------------+--------------------+--------------------+
|               _1|                  _2|                  _3|
+-----------------+--------------------+--------------------+
|          species|'species' is not ...|.isComplete("spec...|
|          species|'species' has val...|.isContainedIn("s...|
|      body_mass_g|'body_mass_g' has...|.hasCompleteness(...|
|      body_mass_g|'body_mass_g' has...|.isNonNegative("b...|
|              sex|   'sex' is not null|  .isComplete("sex")|
|              sex|'sex' has value r...|.isContainedIn("s...|
| culmen_length_mm|'culmen_length_mm...|.hasCompleteness(...|
| culmen_length_mm|'culmen_length_mm...|.isNonNegative("c...|
|           island|'island' is not null|.isComplete("isla...|
|           island|'island' has valu...|.isContainedIn("i...|
|  culmen_depth_mm|'culmen_depth_mm'...|.hasCompleteness(...|
|  culmen_depth_mm|'culmen_depth_mm'...|.isNonNegative("c...|
|flipper_length_mm|'flipper_length_m...|.hasCompleteness(...|
|flipper_length_mm|'flipper_length_m...|.isNonNegative("f...|
+-----------------+--------------------+--------------------+
各フィールドについて、どういったデータであるかチェックするための提案が表示されています。
これをベースに制約を記述すると楽です。
なお、
suggestionDataFrame.show(truncate=false)
とすれば内容を省略せず表示してくれます。
ではデータに対してテストを実行してみます。
データ総数のテストやnullチェック、Containチェックなどを記述しています。
import com.amazon.deequ.{VerificationSuite, VerificationResult}
import com.amazon.deequ.VerificationResult.checkResultsAsDataFrame
import com.amazon.deequ.checks.{Check, CheckLevel}
val verificationResult = { VerificationSuite()
  .onData(df)
  .addCheck(
    Check(CheckLevel.Error, "unit testing my data")
      .hasSize(_ >= 100)
      .isComplete("species")
      .isNonNegative("body_mass_g")
      .isComplete("body_mass_g")
      .isContainedIn("species", Array("Adelie", "Gentoo", "Chinstrap")))
    .run()
    }
val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult)
resultDataFrame.show(truncate=false)
実行結果を表示してみます。
body_mass_gに数件の欠損値があるようで、body_mass_gのnullチェックがエラーになっています。
scala> resultDataFrame.show() +--------------------+-----------+------------+--------------------+-----------------+--------------------+ | check|check_level|check_status| constraint|constraint_status| constraint_message| +--------------------+-----------+------------+--------------------+-----------------+--------------------+ |unit testing my data| Error| Error|SizeConstraint(Si...| Success| | |unit testing my data| Error| Error|CompletenessConst...| Success| | |unit testing my data| Error| Error|ComplianceConstra...| Success| | |unit testing my data| Error| Error|CompletenessConst...| Failure|Value: 0.99418604...| |unit testing my data| Error| Error|ComplianceConstra...| Success| | +--------------------+-----------+------------+--------------------+-----------------+--------------------+
Summary
今回はDeequをつかってデータセットに対するテストを実行してみました。
Spark上に構築されているため、大量のデータに対しても実行可能ですし、
SagemakerなどのMLパイプラインに組み込むこともできます。
References
Appendix
python(pySpark)を使ったサンプルも試したのでついでに。
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pydeequ.profiles import *
from pyspark.conf import SparkConf
import pydeequ
import pandas as pd
from pydeequ.checks import *
from pydeequ.verification import *
from pydeequ.analyzers import *
access_key = '<ACCESS KEY>'
secret_key = '<SECRET KEY>'
spark= SparkSession.builder.appName("CsvReader")\
  .config('spark.jars.packages', \
  'org.apache.hadoop:hadoop-aws:3.2.2,com.amazonaws:aws-java-sdk-bundle:1.12.96,'\
   + pydeequ.deequ_maven_coord).config("spark.jars.excludes", pydeequ.f2j_maven_coord).getOrCreate()
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", access_key)
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key)
spark._jsc.hadoopConfiguration().set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
spark._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")
spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
customSchema = StructType([\
    StructField("species", StringType(), False),\
    StructField("island", StringType(), True),\
    StructField("culmen_length_mm", DoubleType(), True),\
    StructField("culmen_depth_mm", DoubleType(), True),\
    StructField("flipper_length_mm", DoubleType(), True),\
    StructField("body_mass_g", DoubleType(), True),\
    StructField("sex", StringType(), True)\
])
ps_df = spark.read.format('csv')\
    .options(header='true', inferSchema='true')\
    .schema(customSchema)\
    .load('s3a://my-aws-datascience-example/csv/penguins_size.csv')
# to pandas
#df = ps_df.toPandas()
#print(ps_df.dtypes)
check = Check(spark, CheckLevel.Error, "Integrity checks")
checkResult = VerificationSuite(spark)\
    .onData(ps_df)\
    .addCheck(check\
        .isContainedIn('species', ['Adelie', 'Chinstrap','Gentoo'])\
        .isComplete("culmen_depth_mm")\
        .isNonNegative("body_mass_g"))\
    .run()
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show()
spark.stop()











