Amazon SageMaker Feature StoreのExampleを実行してみた

こんにちは、小澤です。

現在、絶賛開催中のre:Invent 2020にて、Amazon SageMakerに関連する新機能が続々と発表されました。 今回は、その1つであるFeature Storeを実際に利用してみたいと思います。

サンプルを動かしてみる

SageMaker Feature Storeのサンプルとして、クレジットカードの不正利用の検出に利用するデータセットを利用したものが公式Exampleに公開されています。

今回は、このExampleを利用して実際の動きを確認していきます。

環境の設定とデータセットの用意

まずは、Feature Storeを利用するのに必要な情報の設定を行います。

Feature Store用のSessionを生成します。 これは、通常のSageMaker利用時と同じような流れですね。

import boto3
import sagemaker
from sagemaker.session import Session
from sagemaker import get_execution_role

region = boto3.Session().region_name

boto_session = boto3.Session(region_name=region)

sagemaker_client = boto_session.client(service_name='sagemaker', region_name=region)
featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region)

# boto3のSessionとboto3から取得したSageMaker, Feature Storeのクライアント利用してFeature StoreのSessionを取得する。
# 以降は、SageMaker同様、Python SDKを使って処理が実装可能となる。
feature_store_session = Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime
)

続いて、Feature Storeで利用するS3の設定とこの後利用するロールの取得を行います。

# sagemaker-<resion_name>-<account_id>/sagemaker-featurestore-demo 以下に特徴データを格納していく
default_s3_bucket_name = feature_store_session.default_bucket()
prefix = 'sagemaker-featurestore-demo'

role = get_execution_role()

続いて、データセットが格納されているS3からデータを取得して、PandasのDataFrame形式で保持します。

import pandas as pd
import io

s3_client = boto3.client('s3', region_name=region)

fraud_detection_bucket_name = 'sagemaker-sample-files'
identity_file_key = 'datasets/tabular/fraud_detection/synthethic_fraud_detection_SA/sampled_identity.csv'
transaction_file_key = 'datasets/tabular/fraud_detection/synthethic_fraud_detection_SA/sampled_transactions.csv'

identity_data_object = s3_client.get_object(Bucket=fraud_detection_bucket_name, Key=identity_file_key)
transaction_data_object = s3_client.get_object(Bucket=fraud_detection_bucket_name, Key=transaction_file_key)

identity_data = pd.read_csv(io.BytesIO(identity_data_object['Body'].read()))
transaction_data = pd.read_csv(io.BytesIO(transaction_data_object['Body'].read()))

実際のデータを確認すると以下のような内容になっています。

このデータに対していくつかの前処理を行います。 まずは、小数点以下を5桁に丸めて欠損値を0で補完します。

identity_data = identity_data.round(5)
transaction_data = transaction_data.round(5)

identity_data = identity_data.fillna(0)
transaction_data = transaction_data.fillna(0)

カードブランド名と種類(クレジットかデビットか)はいくつかの値の中のいずれかを取るカテゴリカルな変数です。 そのため、これらをOne-Hot Encodingに変換します。

encoded_card_bank = pd.get_dummies(transaction_data['card4'], prefix = 'card_bank')
encoded_card_type = pd.get_dummies(transaction_data['card6'], prefix = 'card_type')

それぞれ別なデータとして生成されるため、これを元のデータと結合したのち、列名にスペースが含まれるものを「_」に変換します。

transformed_transaction_data = pd.concat([transaction_data, encoded_card_type, encoded_card_bank], axis=1)
transformed_transaction_data = transformed_transaction_data.rename(columns={"card_bank_american express": "card_bank_american_express"})

ここまでで、Feature Storeに格納するデータの準備が整いました。

Feature Storeを利用する

さて、ここからいよいよFeature Storeを利用していきます。

公式ドキュメントによるとFeature Storeは以下の図のような役割を担うようです。

Feature Storeでは、Feature Groupというものを定義して、以下のような設定を行います。

  • データセットの名前や説明
  • Online Store/Offline Store
  • データの格納先
  • データ内に含まれる列の情報
  • データを一意に特定するIDと時間に関する情報

Online Store/Offline Storeは特徴量をどのように利用するかの設定となります。 格納されているデータに対してバッチ処理する場合や、学習時にまとめて利用するものに関してはOffline Storeを利用して、まとまった情報を一括で取得できるようにしておきます。 一方リアルタイムに低レイテンシでの処理が要求される場合にはOnline Storeを利用します。 これらはどちらか片方、あるいは両方の指定が可能です。

データ内に含まれる列の情報は列名と型に関する情報を設定します。 型に関する情報はString(文字列)、Integral(数値)、Fractional(カテゴリカル)のいずれかから選択します。 一般的なプログラムにおける型の表現ではなく、機械学習に与える際のデータの種類を基準としているようです。

データには行を一意に特定するID(RECORD IDENTIFIER NAME)とそのデータが発生した時間(EVENT TIME FEATURE NAME)を表す情報が含まれる列をそれぞれ指定します。 Feature Storeではその役割からデータをアップデートするのではなく、変更があった場合でも最新の情報として新たに追加されることで、データのバージョニング(任意の日時に置ける情報の取得)を可能にしているようです。

では、実際の処理の流れを見てみましょう。

まずは今回利用する2つのデータに関してのFeature Groupを作成していきます。

from time import gmtime, strftime, sleep
from sagemaker.feature_store.feature_group import FeatureGroup

# Feature Group名の設定
identity_feature_group_name = 'identity-feature-group-' + strftime('%d-%H-%M-%S', gmtime())
transaction_feature_group_name = 'transaction-feature-group-' + strftime('%d-%H-%M-%S', gmtime())

# Sessionからインスタンスの作成(この段階ではまだFeature Groupそのものの作成はされていません)
identity_feature_group = FeatureGroup(name=identity_feature_group_name, sagemaker_session=feature_store_session)
transaction_feature_group = FeatureGroup(name=transaction_feature_group_name, sagemaker_session=feature_store_session)

次に、データをFeature Storeで利用するためにに必要ないくつかの変換処理を行います。

import time

current_time_sec = int(round(time.time()))

# DataFrameのobject型をstring型に変更する
def cast_object_to_string(data_frame):
    for label in data_frame.columns:
        if data_frame.dtypes[label] == 'object':
            data_frame[label] = data_frame[label].astype("str").astype("string")

# Feature Storeに入れるデータに対してその処理を行う
cast_object_to_string(identity_data)
cast_object_to_string(transformed_transaction_data)

# Feature Storeで必要なRECORD IDENTIFIER NAMEとなる列の名前とEVENT TIME FEATURE NAMEとなる列の名前
record_identifier_feature_name = "TransactionID"
event_time_feature_name = "EventTime"

# 今回利用するデータは静的なものであるため、EVENT TIME FEATURE NAMEに相当する列が含まれていない
# そのため、現在時刻に基づいた値を含む列を別途生成する
identity_data[event_time_feature_name] = pd.Series([current_time_sec]*len(identity_data), dtype="float64")
transformed_transaction_data[event_time_feature_name] = pd.Series([current_time_sec]*len(transaction_data), dtype="float64")

Feature Groupに対して、列名や型の情報を定義します。ここでは、実際のデータを与えることでそれを決定しています。 この段階では、列情報を手動ではなくデータから決めるために渡していますが、ここで実際にこのデータが処理されるわけではないようなのでご注意ください。

identity_feature_group.load_feature_definitions(data_frame=identity_data)
transaction_feature_group.load_feature_definitions(data_frame=transformed_transaction_data)

Feature Groupの作成に必要な情報が出そろったので、作成の処理を行います。

なお、作成時にはFeature Storeに対するIAMの権限が与えられている必要があります。 これはFeature Storeを利用するための権限となるため、ここまでの処理を実行しているIAMロールにたいして AmazonSageMakerFullAccess とあわせて AmazonSageMakerFeatureStoreAccess も別途付与する必要があります。

Online, Offline双方のStoreで利用する設定でFeature Groupを作成します。

identity_feature_group.create(
    s3_uri=f"s3://{default_s3_bucket_name}/{prefix}",
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name=event_time_feature_name,
    role_arn=role,
    enable_online_store=True
)

transaction_feature_group.create(
    s3_uri=f"s3://{default_s3_bucket_name}/{prefix}",
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name=event_time_feature_name,
    role_arn=role,
    enable_online_store=True
)

作成完了まで1分程度ですが、時間がかかるので完了まで待つ処理を実行しておきます。

def wait_for_feature_group_creation_complete(feature_group):
    status = feature_group.describe().get("FeatureGroupStatus")
    while status == "Creating": # 作成中は無限ループで待つ
        print("Waiting for Feature Group Creation")
        time.sleep(5)
        status = feature_group.describe().get("FeatureGroupStatus")
    if status != "Created": # 失敗したら例外を投げる
        raise RuntimeError(f"Failed to create feature group {feature_group.name}")
    # 完了
    print(f"FeatureGroup {feature_group.name} successfully created.")

wait_for_feature_group_creation_complete(feature_group=identity_feature_group)
wait_for_feature_group_creation_complete(feature_group=transaction_feature_group)

作成されたFeature Storeは describe で確認したり、 list_feature_groups で一覧取得が可能です。

また、この段階でSageMaker Studioからも確認できる状態になっています。

Feature Groupの作成が完了したら、いよいよここにデータを投入します。 データの投入には ingest 関数を利用します。

identity_feature_group.ingest(
    data_frame=identity_data, # 対象のデータ
    max_workers=3, # 取り込み処理に利用するワーカー数
    wait=True # Trueの場合完了まで待つ
)

transaction_feature_group.ingest(
    data_frame=transformed_transaction_data, max_workers=5, wait=True
)

取り込んだデータはOnline Storeであれば、Feature Storeから featurestore_runtime.get_record を使って取得できます。 この時取得対象となるFeature GroupとRECORD IDENTIFIER NAMEの値を文字列で指定します。

Offline Storeの場合は、データがS3に保存され、Glueカタログに情報が登録されます。 テーブルの作成はFeature Group作成時に行われますが、 S3へのデータの投入は ingest によってFeature Storeにデータが取り込まれた後で非同期で行われます。

そのため、 wait=True を指定していてもS3への出力完了までは別途待つ必要がある点にご注意ください。

データが格納されたのちは、SQLを利用してデータの取得が可能です。

# Athenaのテーブルにクエリを投げるためのインスタンスを生成
identity_query = identity_feature_group.athena_query()
transaction_query = transaction_feature_group.athena_query()

# 実際に発行するクエリを作成するためにテーブル名を取得
identity_table = identity_query.table_name
transaction_table = transaction_query.table_name

# クエリ文字列の作成
query_string = 'SELECT * FROM "'+transaction_table+'" LEFT JOIN "'+identity_table+'" ON "'+transaction_table+'".transactionid = "'+identity_table+'".transactionid'
print('Running ' + query_string)

# クエリを実行してDataFrameに格納する
dataset = pd.DataFrame()
identity_query.run(query_string=query_string, output_location='s3://'+default_s3_bucket_name+'/'+prefix+'/query_results/')
identity_query.wait()
dataset = identity_query.as_dataframe()

dataset

athena_query でインスタンスを取得したのちはAthenaに対する操作ですね。

なお、SageMaker Studio上で確認できるFeature Groupの中には過去に遡ってのデータの取得などを行うためのいくつかのサンプルクエリが提供されています。

公式Exampleでは、この後にここで取得したデータを使ってSageMakerでの学習と推論を行っていますが、 これらに関しましては、従来のSageMaker利用方法と同様ですので割愛します。

おわりに

今回はre:Inventで発表されたAmazon SageMaker Feature Storeを公式Exampleを参照しながら実際に実行してみました。 MLOpsを実現するための重要な要素となる仕組みですが、非常に簡単に使えるようになっており今後のSageMaker活用の幅がより一層広まりそうです。

また、同時に発表されたData WranglerやPipelineと組み合わせることでさらに便利になりそうな気がしています。