Deequでデータ品質をテストする
Introduction
最近は大量のデータを扱う機会も多くなりました。
機械学習でも、モデル作成するときにデータはとても重要ですし、
データ分析をおこなってビジネス上の意思決定を行うこともあります。
そういったとき、データの量も重要ですが品質も大事になってきます。
プログラム開発をおこなうとき、プログラムに対してテストを記述して品質を担保します。
データに対してもテストを作成することでデータに対する品質を保証します。
本稿ではAmazon Deequを使用したデータのテスト方法について紹介します。
Deequ?
Deequとは、Amazonで開発されているOSSのデータ用テストツールです。
データに対してデータ品質メトリクス計算やデータ品質の制約チェックなどが可能です。
DeequはApache Spark上で動作し、大規模なデータセット(数十億レコード規模らしい)
に対してスケール可能です。
Apache Spark?
Apache Sparkは、大規模データを扱うための分散処理フレームワークです。
SparkはMapReduceの処理速度の改善を目的に開発され、現在はMapReduceの代替としても利用可能です。
現在はバージョン3.2がリリースされていて、ストリーミング処理や機械学習、
グラフ分析(Spark GraphX)も可能となっています。
Environment
- OS : MacOS 10.15.7
- Java : openjdk 11.0.2
- Python 3.9.4
なお、セットアップにはsdkmanをつかってます。
Setup
ではDeequを動かすための環境を設定します。
まずはsdkmanをつかってSparkのインストールをします。
また、有効なAWSのアクセスキーとシークレットキーがある前提です。
# Deequの対応がSpark 3.1なので % sdk install spark 3.1.2
spark-shell↓のようになればOK。
% spark-shell --version Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 3.1.2 /_/ Using Scala version 2.12.10, OpenJDK 64-Bit Server VM, 11.0.2 Branch HEAD Compiled by user centos on 2021-05-24T04:27:48Z
S3バケット作成&テスト用CSVアップロード
AWSコンソールでCLIでもよいので、S3にバケットを作成します。
テスト用のペンギンデータをkaggleから取得し、
penguins_size.csvをアップロードしましょう。
ライブラリを取得
今回動作確認をするのにいくつかライブラリが必要なので、
MVN Repositoryから下記jarを取得します。
- deequ-2.0.0-spark-3.1.jar
- aws-java-sdk-bundle-1.12.99.jar
- hadoop-aws-3.2.2.jar
Exeute Spark Shell
ではSpark Shellを起動してDeequを動かしてみましょう。
shell起動時にさきほど取得したjarを指定して起動します。
% spark-shell --conf spark.jars=./deequ-2.0.0-spark-3.1.jar,./aws-java-sdk-bundle-1.12.99.jar,./hadoop-aws-3.2.2.jar
起動したら下記コードを入力してみます。
S3からCSVを取得し、データの表示ができたらOKです。
※コンソールにコピペ
import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.types._ spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "<AWS_ACCESS_KEY>") spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "<AWS_SECRET_KEY>") spark.sparkContext.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") val customSchema = StructType(Array( StructField("species", StringType, false), StructField("island", StringType, true), StructField("culmen_length_mm", DoubleType, true), StructField("culmen_depth_mm", DoubleType, true), StructField("flipper_length_mm", DoubleType,true), StructField("body_mass_g", DoubleType, true), StructField("sex", StringType, true))) val df = spark.read.option("header", "true").option("inferSchema", "true") .schema(customSchema).csv("s3a://<bucket>/penguins_size.csv")
フィールドの型を定義したいので、StructTypeを使用してます。
dataframeのtakeメソッドを実行してみると、内容が取得できているのがわかります。
scala> df.take(2).foreach(x => println(x)) [Adelie,Torgersen,39.1,18.7,181.0,3750.0,MALE] [Adelie,Torgersen,39.5,17.4,186.0,3800.0,FEMALE]
AnalysisRunnerを使えばデータの分析が可能です。
import com.amazon.deequ.analyzers.{Compliance, Correlation, Size, Completeness, Mean, ApproxCountDistinct} import com.amazon.deequ.analyzers.runners.{AnalysisRunner, AnalyzerContext} import com.amazon.deequ.analyzers.runners.AnalyzerContext.successMetricsAsDataFrame val analysisResult: AnalyzerContext = { AnalysisRunner .onData(df) .addAnalyzer(Size()) .addAnalyzer(Completeness("species")) .addAnalyzer(ApproxCountDistinct("species")) .addAnalyzer(Mean("body_mass_g")) .addAnalyzer(Compliance("top body_mass_g", "body_mass_g >= 3500")) .addAnalyzer(Correlation("culmen_length_mm", "culmen_depth_mm")) .run() } val metrics = successMetricsAsDataFrame(spark, analysisResult)
showメソッドを使うと、分析結果を表示できます。
scala > metrics.show() +-----------+--------------------+-------------------+--------------------+ | entity| instance| name| value| +-----------+--------------------+-------------------+--------------------+ | Column| body_mass_g| Mean| 4201.754385964912| | Column| top body_mass_g| Compliance| 0.7877906976744186| | Dataset| *| Size| 344.0| |Mutlicolumn|culmen_length_mm,...| Correlation|-0.23505287035553338| | Column| species| Completeness| 1.0| | Column| species|ApproxCountDistinct| 3.0| +-----------+--------------------+-------------------+--------------------+
- body_mass_gの平均は4201.75
- speciesに欠損値なし、固有値は3つ
など、分析した情報が表示されています。
データの制約について、ある程度データに基づいた提案を行わせることも可能です。
Deequはデータ分布に基づき、使えそうな制約を提案してくれる機能を持っています。
import com.amazon.deequ.suggestions.{ConstraintSuggestionRunner, Rules} import spark.implicits._ val suggestionResult = { ConstraintSuggestionRunner() .onData(df) .addConstraintRules(Rules.DEFAULT) .run() } val suggestionDataFrame = suggestionResult.constraintSuggestions.flatMap { case (column, suggestions) => suggestions.map { constraint => (column, constraint.description, constraint.codeForConstraint) } }.toSeq.toDS()
scala> suggestionDataFrame.show() +-----------------+--------------------+--------------------+ | _1| _2| _3| +-----------------+--------------------+--------------------+ | species|'species' is not ...|.isComplete("spec...| | species|'species' has val...|.isContainedIn("s...| | body_mass_g|'body_mass_g' has...|.hasCompleteness(...| | body_mass_g|'body_mass_g' has...|.isNonNegative("b...| | sex| 'sex' is not null| .isComplete("sex")| | sex|'sex' has value r...|.isContainedIn("s...| | culmen_length_mm|'culmen_length_mm...|.hasCompleteness(...| | culmen_length_mm|'culmen_length_mm...|.isNonNegative("c...| | island|'island' is not null|.isComplete("isla...| | island|'island' has valu...|.isContainedIn("i...| | culmen_depth_mm|'culmen_depth_mm'...|.hasCompleteness(...| | culmen_depth_mm|'culmen_depth_mm'...|.isNonNegative("c...| |flipper_length_mm|'flipper_length_m...|.hasCompleteness(...| |flipper_length_mm|'flipper_length_m...|.isNonNegative("f...| +-----------------+--------------------+--------------------+
各フィールドについて、どういったデータであるかチェックするための提案が表示されています。
これをベースに制約を記述すると楽です。
なお、
suggestionDataFrame.show(truncate=false)
とすれば内容を省略せず表示してくれます。
ではデータに対してテストを実行してみます。
データ総数のテストやnullチェック、Containチェックなどを記述しています。
import com.amazon.deequ.{VerificationSuite, VerificationResult} import com.amazon.deequ.VerificationResult.checkResultsAsDataFrame import com.amazon.deequ.checks.{Check, CheckLevel} val verificationResult = { VerificationSuite() .onData(df) .addCheck( Check(CheckLevel.Error, "unit testing my data") .hasSize(_ >= 100) .isComplete("species") .isNonNegative("body_mass_g") .isComplete("body_mass_g") .isContainedIn("species", Array("Adelie", "Gentoo", "Chinstrap"))) .run() } val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult) resultDataFrame.show(truncate=false)
実行結果を表示してみます。
body_mass_gに数件の欠損値があるようで、body_mass_gのnullチェックがエラーになっています。
scala> resultDataFrame.show() +--------------------+-----------+------------+--------------------+-----------------+--------------------+ | check|check_level|check_status| constraint|constraint_status| constraint_message| +--------------------+-----------+------------+--------------------+-----------------+--------------------+ |unit testing my data| Error| Error|SizeConstraint(Si...| Success| | |unit testing my data| Error| Error|CompletenessConst...| Success| | |unit testing my data| Error| Error|ComplianceConstra...| Success| | |unit testing my data| Error| Error|CompletenessConst...| Failure|Value: 0.99418604...| |unit testing my data| Error| Error|ComplianceConstra...| Success| | +--------------------+-----------+------------+--------------------+-----------------+--------------------+
Summary
今回はDeequをつかってデータセットに対するテストを実行してみました。
Spark上に構築されているため、大量のデータに対しても実行可能ですし、
SagemakerなどのMLパイプラインに組み込むこともできます。
References
Appendix
python(pySpark)を使ったサンプルも試したのでついでに。
from pyspark import SparkContext from pyspark.sql import SparkSession from pyspark.sql.types import * from pydeequ.profiles import * from pyspark.conf import SparkConf import pydeequ import pandas as pd from pydeequ.checks import * from pydeequ.verification import * from pydeequ.analyzers import * access_key = '<ACCESS KEY>' secret_key = '<SECRET KEY>' spark= SparkSession.builder.appName("CsvReader")\ .config('spark.jars.packages', \ 'org.apache.hadoop:hadoop-aws:3.2.2,com.amazonaws:aws-java-sdk-bundle:1.12.96,'\ + pydeequ.deequ_maven_coord).config("spark.jars.excludes", pydeequ.f2j_maven_coord).getOrCreate() spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", access_key) spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key) spark._jsc.hadoopConfiguration().set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem") spark._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true") spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") customSchema = StructType([\ StructField("species", StringType(), False),\ StructField("island", StringType(), True),\ StructField("culmen_length_mm", DoubleType(), True),\ StructField("culmen_depth_mm", DoubleType(), True),\ StructField("flipper_length_mm", DoubleType(), True),\ StructField("body_mass_g", DoubleType(), True),\ StructField("sex", StringType(), True)\ ]) ps_df = spark.read.format('csv')\ .options(header='true', inferSchema='true')\ .schema(customSchema)\ .load('s3a://my-aws-datascience-example/csv/penguins_size.csv') # to pandas #df = ps_df.toPandas() #print(ps_df.dtypes) check = Check(spark, CheckLevel.Error, "Integrity checks") checkResult = VerificationSuite(spark)\ .onData(ps_df)\ .addCheck(check\ .isContainedIn('species', ['Adelie', 'Chinstrap','Gentoo'])\ .isComplete("culmen_depth_mm")\ .isNonNegative("body_mass_g"))\ .run() checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult) checkResult_df.show() spark.stop()