Amazon SageMakerの機械学習モデルをSnowflakeから直接利用する
大阪オフィスの玉井です。
DWHにデータが大量に蓄積されてくると、やってみたくなることの1つが機械学習だと思います。Snowflakeはそれ自身に機械学習の機能を持ってはいませんが、他サービスと連携することで、Snowflakeのデータを使って機械学習を行うことが可能です。
今回はSnowflakeの外部関数を用いて、間接的にAmazon SageMakerと連携する方法をやってみました。
Snowflakeの外部関数について
下記をどうぞ。
今回やることの概要
全体像
- Snowflake: API Gatewayに対してリクエスト送信
- API Gateway: Snowflakeからのリクエストの権限を確認
- API Gateway: Lambda関数に(Snowflakeからの)リクエストを送信
- Lambda: 受信したデータを元に処理を行い、SageMakerにデータを送信
- SageMaker: 受信したデータを元に推論を実施し、結果をLambda関数に返却
- Lambda: 受信したデータを元に処理を行い、結果を出力
- Snowflake: Lambda関数からの出力結果を取得
公式情報
下記を実際にやってみます。
やってみる
SageMakerのモデルとエンドポイントを作成する
まずは、Snowflakeのデータを投げる(推論させる)機械学習モデルと、そのエンドポイントを用意しておく必要があります。
今回は、下記のサンプルを実行して、ランダムカットフォレストのモデルを作成します。ランダムカットフォレストを死ぬほどざっくり説明すると「データ内の異常値を検出するためのアルゴリズム」です。WebサイトのPV数とか気温とか、突然急激に上昇(または下降)することがあると思いますが、そういったものを検知するアルゴリズムです。
下記のサンプルでは、ニューヨークのタクシーの乗車率(半年間)を使って、異常検出するモデルを作成・学習させる内容になっています。このモデルに新しいデータを投げると、そのデータが異常値かどうかを示すスコアを返してくれます。
作業としては、上記のnotebookを淡々と実行していくだけです。実行する内容を自分なりに理解しつつ、進めていきます。これ以上ここで書くことは特に無いのですが、これだけだと寂しいので、作業中のスクリーンショットを貼っておきます。
モデルの作成と学習まで終わらせます。マネコン上でもモデルが存在するかどうか確認しておきます(エンドポイントも確認しておきましょう)。
LambdaからSageMakerへアクセスするためのIAMロールを作成する
SageMaker側の準備が整いました。実は、後はSnowflakeの外部関数を作成する流れとほとんど同じです。
ただ、今回はLambdaからSageMakerに対してアクセスが発生するため、Lambda関数にSageMakerへアクセスできる権限を付与する必要があります。というわけで、そのためのIAMロールを作成します。
IAMロールの作成画面にいき、「AWSサービス」「Lambda」を選びます。
今回は検証ですので、とりあえず「AmazonSageMakerFullAccess」のポリシーをつけました。
Sagemakerのエンドポイントを呼び出すLambda関数を作成する
IAMロールを用意できたところで、今回動かすLambda関数を作成します。
基本設定は下記の通り。先程作ったロールを指定するところが重要です。
コードは、本記事冒頭のドキュメントにあるサンプルを使います。ただ、自分の環境の情報に書き直すところが2箇所あるので、そこだけ示したものを載せておきます。
import logging import boto3 import json def lambda_handler(event, _context): logger = logging.getLogger(); logger.setLevel(logging.INFO) client = boto3.client("sagemaker-runtime", region_name="<モデルをデプロイしたリージョンの名前>") input_rows = json.loads(event["body"])["data"] input_values = [str(row[1]) for row in input_rows] input_body = "\n".join(input_values) response = client.invoke_endpoint( EndpointName = "<デプロイしたエンドポイントの名前>", Body = input_body, ContentType = "text/csv", Accept = "application/json" ) output_body = json.loads(response["Body"].read().decode()) scores = [[index, row["score"]] for index, row in enumerate(output_body["scores"])] output_json = json.dumps({"data" : scores}) return { "statusCode": 200, "body": output_json }
処理内容はめちゃくちゃシンプルです。SnowflakeのデータをSageMakerに投げて、返ってきた値をSnowflakeに渡すだけです。
- Snowflakeからデータを受け取る(API Gateway経由)
- 受け取ったデータ(JSON)を展開する(値を行毎にバラす)
- 展開したデータをcsvとしてSageMakerのモデルに渡す
- モデルが計算してくれた値(今回は異常値スコア)を受け取る
- 受け取ったデータをJSONにしてSnowflakeに返却する
2.受け取ったデータ(JSON)を展開する(値を行毎にバラす)
補足しておくと、Snowflakeの外部関数は、関数にわたす引数はJSON形式となります。だから、(今回は)JSONを展開する処理が必要なのですね。
API Gateway設定など
以降、下記の作業が続きますが…
- API Gatewayの設定
- SnowflakeのAPI統合オブジェクトの作成
- IAMロールの設定
これらは、下記と全く同じですので、本記事では省略します。
Snowflakeで外部関数を作成する
上記までの設定が全て終わったら、Snowflake側で(ランダムカットフォレストのモデルにデータを投げるための)外部関数を作成しましょう。
CREATE or REPLACE EXTERNAL FUNCTION <作成する外部関数の名前>(n integer) RETURNS number(38,10) API_INTEGRATION = <作成したAPI統合の名前> AS '<APIを呼び出すURL>';
実践
実際にSnowflakeのデータをSageMakerのモデルに投げてみましょう。
データの準備
下記のcsvデータをSnowflakeのテーブルにロードします。
ロードの手順の詳細は省きますが、下記のテーブル(nyc_taxi
)を作成しました。
このテーブルの値をSageMakerに投げます。今回作成した外部関数を実行するだけですね(sagemaker_rcf
が今回作成した外部関数の名前です)。
SELECT ts ,VALUE ,sagemaker_rcf(VALUE) score FROM nyc_taxi ORDER BY score DESC LIMIT 5
ちゃんと異常値スコアが返ってきました!
SnowflakeとSageMakerを連携する凄さ
(事前準備さえしておけば)SQLから直接、機械学習モデルを利用できるようになります。せっかくデータサイエンティストが用意してくれたモデルがあっても、それを簡単に利用できないとなると、宝の持ち腐れになってしまいます。Snowflakeで外部関数を用意しておけば、SnowflakeのSQLからそのまま機械学習モデルの推論結果を取得できるのは、非常に強力ですね。
また、SQLから直接利用できるということは、他のデータと一緒に推論結果を見られるということです。機械学習の推論結果と他のデータが並んで確認できるようなVIEW等を用意することもできます。そして、そのVIEWを利用して、BIツールでダッシュボードを構築することもできますね。機械学習がグッと身近になる連携だと思いました。
検討事項
今回は、あくまで「学習済のモデルにデータを投げて推論させる」ところしかやっていません。機械学習モデルの構築には(アルゴリズムによりますが)学習というフェーズもあります。学習にもデータが必要ですので、ここらへんSnowflakeといい感じに連携できれば、より強力なデータプラットフォームとなりそうですね。
おわりに
Snowflakeの外部関数も無限の可能性が秘められていて凄いのですが、それ以上に(超今さらながら)「SageMakerすげえ…」って思っちゃいました。インフラを準備することなくパッとJupyter Notebookが用意できて、そこでモデルを作成、学習、デプロイすれば、そのモデルを外部から簡単に利用できちゃいますからね…。そりゃあデータサイエンティストの方が盛り上がるわけですよ…。