この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
Introduction
最近は大量のデータを扱う機会も多くなりました。
機械学習でも、モデル作成するときにデータはとても重要ですし、
データ分析をおこなってビジネス上の意思決定を行うこともあります。
そういったとき、データの量も重要ですが品質も大事になってきます。
プログラム開発をおこなうとき、プログラムに対してテストを記述して品質を担保します。
データに対してもテストを作成することでデータに対する品質を保証します。
本稿ではAmazon Deequを使用したデータのテスト方法について紹介します。
Deequ?
Deequとは、Amazonで開発されているOSSのデータ用テストツールです。
データに対してデータ品質メトリクス計算やデータ品質の制約チェックなどが可能です。
DeequはApache Spark上で動作し、大規模なデータセット(数十億レコード規模らしい)
に対してスケール可能です。
Apache Spark?
Apache Sparkは、大規模データを扱うための分散処理フレームワークです。
SparkはMapReduceの処理速度の改善を目的に開発され、現在はMapReduceの代替としても利用可能です。
現在はバージョン3.2がリリースされていて、ストリーミング処理や機械学習、
グラフ分析(Spark GraphX)も可能となっています。
Environment
- OS : MacOS 10.15.7
- Java : openjdk 11.0.2
- Python 3.9.4
なお、セットアップにはsdkmanをつかってます。
Setup
ではDeequを動かすための環境を設定します。
まずはsdkmanをつかってSparkのインストールをします。
また、有効なAWSのアクセスキーとシークレットキーがある前提です。
# Deequの対応がSpark 3.1なので
% sdk install spark 3.1.2
spark-shell↓のようになればOK。
% spark-shell --version
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.1.2
/_/
Using Scala version 2.12.10, OpenJDK 64-Bit Server VM, 11.0.2
Branch HEAD
Compiled by user centos on 2021-05-24T04:27:48Z
S3バケット作成&テスト用CSVアップロード
AWSコンソールでCLIでもよいので、S3にバケットを作成します。
テスト用のペンギンデータをkaggleから取得し、
penguins_size.csvをアップロードしましょう。
ライブラリを取得
今回動作確認をするのにいくつかライブラリが必要なので、
MVN Repositoryから下記jarを取得します。
- deequ-2.0.0-spark-3.1.jar
- aws-java-sdk-bundle-1.12.99.jar
- hadoop-aws-3.2.2.jar
Exeute Spark Shell
ではSpark Shellを起動してDeequを動かしてみましょう。
shell起動時にさきほど取得したjarを指定して起動します。
% spark-shell --conf spark.jars=./deequ-2.0.0-spark-3.1.jar,./aws-java-sdk-bundle-1.12.99.jar,./hadoop-aws-3.2.2.jar
起動したら下記コードを入力してみます。
S3からCSVを取得し、データの表示ができたらOKです。
※コンソールにコピペ
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types._
spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "<AWS_ACCESS_KEY>")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "<AWS_SECRET_KEY>")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
val customSchema = StructType(Array(
StructField("species", StringType, false),
StructField("island", StringType, true),
StructField("culmen_length_mm", DoubleType, true),
StructField("culmen_depth_mm", DoubleType, true),
StructField("flipper_length_mm", DoubleType,true),
StructField("body_mass_g", DoubleType, true),
StructField("sex", StringType, true)))
val df = spark.read.option("header", "true").option("inferSchema", "true")
.schema(customSchema).csv("s3a://<bucket>/penguins_size.csv")
フィールドの型を定義したいので、StructTypeを使用してます。
dataframeのtakeメソッドを実行してみると、内容が取得できているのがわかります。
scala> df.take(2).foreach(x => println(x))
[Adelie,Torgersen,39.1,18.7,181.0,3750.0,MALE]
[Adelie,Torgersen,39.5,17.4,186.0,3800.0,FEMALE]
AnalysisRunnerを使えばデータの分析が可能です。
import com.amazon.deequ.analyzers.{Compliance, Correlation, Size, Completeness, Mean, ApproxCountDistinct}
import com.amazon.deequ.analyzers.runners.{AnalysisRunner, AnalyzerContext}
import com.amazon.deequ.analyzers.runners.AnalyzerContext.successMetricsAsDataFrame
val analysisResult: AnalyzerContext = { AnalysisRunner
.onData(df)
.addAnalyzer(Size())
.addAnalyzer(Completeness("species"))
.addAnalyzer(ApproxCountDistinct("species"))
.addAnalyzer(Mean("body_mass_g"))
.addAnalyzer(Compliance("top body_mass_g", "body_mass_g >= 3500"))
.addAnalyzer(Correlation("culmen_length_mm", "culmen_depth_mm"))
.run()
}
val metrics = successMetricsAsDataFrame(spark, analysisResult)
showメソッドを使うと、分析結果を表示できます。
scala > metrics.show()
+-----------+--------------------+-------------------+--------------------+
| entity| instance| name| value|
+-----------+--------------------+-------------------+--------------------+
| Column| body_mass_g| Mean| 4201.754385964912|
| Column| top body_mass_g| Compliance| 0.7877906976744186|
| Dataset| *| Size| 344.0|
|Mutlicolumn|culmen_length_mm,...| Correlation|-0.23505287035553338|
| Column| species| Completeness| 1.0|
| Column| species|ApproxCountDistinct| 3.0|
+-----------+--------------------+-------------------+--------------------+
- body_mass_gの平均は4201.75
- speciesに欠損値なし、固有値は3つ
など、分析した情報が表示されています。
データの制約について、ある程度データに基づいた提案を行わせることも可能です。
Deequはデータ分布に基づき、使えそうな制約を提案してくれる機能を持っています。
import com.amazon.deequ.suggestions.{ConstraintSuggestionRunner, Rules}
import spark.implicits._
val suggestionResult = { ConstraintSuggestionRunner()
.onData(df)
.addConstraintRules(Rules.DEFAULT)
.run()
}
val suggestionDataFrame = suggestionResult.constraintSuggestions.flatMap {
case (column, suggestions) =>
suggestions.map { constraint =>
(column, constraint.description, constraint.codeForConstraint)
}
}.toSeq.toDS()
scala> suggestionDataFrame.show()
+-----------------+--------------------+--------------------+
| _1| _2| _3|
+-----------------+--------------------+--------------------+
| species|'species' is not ...|.isComplete("spec...|
| species|'species' has val...|.isContainedIn("s...|
| body_mass_g|'body_mass_g' has...|.hasCompleteness(...|
| body_mass_g|'body_mass_g' has...|.isNonNegative("b...|
| sex| 'sex' is not null| .isComplete("sex")|
| sex|'sex' has value r...|.isContainedIn("s...|
| culmen_length_mm|'culmen_length_mm...|.hasCompleteness(...|
| culmen_length_mm|'culmen_length_mm...|.isNonNegative("c...|
| island|'island' is not null|.isComplete("isla...|
| island|'island' has valu...|.isContainedIn("i...|
| culmen_depth_mm|'culmen_depth_mm'...|.hasCompleteness(...|
| culmen_depth_mm|'culmen_depth_mm'...|.isNonNegative("c...|
|flipper_length_mm|'flipper_length_m...|.hasCompleteness(...|
|flipper_length_mm|'flipper_length_m...|.isNonNegative("f...|
+-----------------+--------------------+--------------------+
各フィールドについて、どういったデータであるかチェックするための提案が表示されています。
これをベースに制約を記述すると楽です。
なお、
suggestionDataFrame.show(truncate=false)
とすれば内容を省略せず表示してくれます。
ではデータに対してテストを実行してみます。
データ総数のテストやnullチェック、Containチェックなどを記述しています。
import com.amazon.deequ.{VerificationSuite, VerificationResult}
import com.amazon.deequ.VerificationResult.checkResultsAsDataFrame
import com.amazon.deequ.checks.{Check, CheckLevel}
val verificationResult = { VerificationSuite()
.onData(df)
.addCheck(
Check(CheckLevel.Error, "unit testing my data")
.hasSize(_ >= 100)
.isComplete("species")
.isNonNegative("body_mass_g")
.isComplete("body_mass_g")
.isContainedIn("species", Array("Adelie", "Gentoo", "Chinstrap")))
.run()
}
val resultDataFrame = checkResultsAsDataFrame(spark, verificationResult)
resultDataFrame.show(truncate=false)
実行結果を表示してみます。
body_mass_gに数件の欠損値があるようで、body_mass_gのnullチェックがエラーになっています。
scala> resultDataFrame.show()
+--------------------+-----------+------------+--------------------+-----------------+--------------------+
| check|check_level|check_status| constraint|constraint_status| constraint_message|
+--------------------+-----------+------------+--------------------+-----------------+--------------------+
|unit testing my data| Error| Error|SizeConstraint(Si...| Success| |
|unit testing my data| Error| Error|CompletenessConst...| Success| |
|unit testing my data| Error| Error|ComplianceConstra...| Success| |
|unit testing my data| Error| Error|CompletenessConst...| Failure|Value: 0.99418604...|
|unit testing my data| Error| Error|ComplianceConstra...| Success| |
+--------------------+-----------+------------+--------------------+-----------------+--------------------+
Summary
今回はDeequをつかってデータセットに対するテストを実行してみました。
Spark上に構築されているため、大量のデータに対しても実行可能ですし、
SagemakerなどのMLパイプラインに組み込むこともできます。
References
Appendix
python(pySpark)を使ったサンプルも試したのでついでに。
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pydeequ.profiles import *
from pyspark.conf import SparkConf
import pydeequ
import pandas as pd
from pydeequ.checks import *
from pydeequ.verification import *
from pydeequ.analyzers import *
access_key = '<ACCESS KEY>'
secret_key = '<SECRET KEY>'
spark= SparkSession.builder.appName("CsvReader")\
.config('spark.jars.packages', \
'org.apache.hadoop:hadoop-aws:3.2.2,com.amazonaws:aws-java-sdk-bundle:1.12.96,'\
+ pydeequ.deequ_maven_coord).config("spark.jars.excludes", pydeequ.f2j_maven_coord).getOrCreate()
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", access_key)
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key)
spark._jsc.hadoopConfiguration().set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
spark._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")
spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
customSchema = StructType([\
StructField("species", StringType(), False),\
StructField("island", StringType(), True),\
StructField("culmen_length_mm", DoubleType(), True),\
StructField("culmen_depth_mm", DoubleType(), True),\
StructField("flipper_length_mm", DoubleType(), True),\
StructField("body_mass_g", DoubleType(), True),\
StructField("sex", StringType(), True)\
])
ps_df = spark.read.format('csv')\
.options(header='true', inferSchema='true')\
.schema(customSchema)\
.load('s3a://my-aws-datascience-example/csv/penguins_size.csv')
# to pandas
#df = ps_df.toPandas()
#print(ps_df.dtypes)
check = Check(spark, CheckLevel.Error, "Integrity checks")
checkResult = VerificationSuite(spark)\
.onData(ps_df)\
.addCheck(check\
.isContainedIn('species', ['Adelie', 'Chinstrap','Gentoo'])\
.isComplete("culmen_depth_mm")\
.isNonNegative("body_mass_g"))\
.run()
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show()
spark.stop()