Kinesis Data AnalyticsでRandom Cut Forest(異常検知)を試してみた
こんにちは!ブログ筋を鍛えたい入社12日目のおざわ(じ)です。
AWS学習とブログネタ探しも兼ねて、10月8日(土)に開催されたJAWS DAYS 2022に参加してきました。
私はオンライン参加でしたが、日本各地にサテライト会場も設置されており、各会場の様子もバーチャルで覗くことができました。セッションの合間にはノリノリのBGMで実行委員の皆さんの紹介もあり、オンラインでありながら会場にいるかのように感じられる素晴らしいイベントでした。
概要
さて、本記事の内容ですが、今回のイベントで個人的に興味を惹かれたソラコム株式会社 松下さんの発表内容を手元の環境で試してみました。
使用したサービスは「Kinesis Data Streams」と「Kinesis Data Analytics」の2つになります。 異常検知に関しては、発表と同じKinesis Data Analyticsに用意されているRandom Cut Forest(RCF)というアルゴリズムを使用します。
今回はAWSコンソールで異常検知できたことを確かめるのみでしたが、異常検知した後に何らかのアクションを行う場合は、Lambdaを使ったこちらの記事もご参照ください。
やったこと
- Kinesis Data Streamの設定
- スクリプトの準備
- Kinesis Data Analyticsの設定
- 正常値データを投入
- 異常値を投入
1. Kinesis Data Streamの設定
まずはデータ入力先のデータストリームをCLIで作成します。
aws kinesis create-stream \ --stream-name VolumeStream \ --shard-count 1 \ --region ap-northeast-1
2. スクリプトの準備
今回はストリーム用のデータとして、マイクからの音量をリアルタイムに流し込んでみようと思います。python-sounddeviceが手軽に使えそうだったので、今回はこちらを使用しました。
import json import datetime import boto3 import sounddevice as sd import numpy as np STREAM_NAME = "VolumeStream" DURATION = 15 kinesis = boto3.client("kinesis") def put_volume(indata, outdata, frames, time, status): volume = np.linalg.norm(indata) * DURATION data = { 'event_time': datetime.datetime.now().isoformat(), 'volume': volume } print (data) kinesis.put_record( StreamName=STREAM_NAME, Data=json.dumps(data), PartitionKey="123") with sd.Stream(samplerate=44100, callback=put_volume): sd.sleep(int(1000 * DURATION))
次のステップで「スキーマの検出」を行うので、先にデータを入れておきます。
> python sound.py {'event_time': '2022-10-11T16:13:02.150653', 'volume': 0.0} {'event_time': '2022-10-11T16:13:07.619224', 'volume': 0.19032749813050032} {'event_time': '2022-10-11T16:13:07.660909', 'volume': 1.829547956585884} {'event_time': '2022-10-11T16:13:07.700621', 'volume': 1.3599266856908798}
3. Kinesis Data Analyticsの設定
SQLアプリケーションを作成します。
新しいプロジェクトでは、SQLアプリケーションよりも新しいKinesis Data Analytics Studioを使用することが推奨されていますが、今回は練習も兼ねて以前のインターフェースからやってみます。 Kinesis Data Analytics Studioの利用方法に関しては、参考記事にリンクを載せていますので、そちらをご確認いただければと思います。
アプリケーションが作成できたら、ソースとなるストリームに先ほど設定したものを割り当てます。参照ボタンからストリームを選択して「スキーマを検出」ボタンをクリックするとスキーマが検出されます。
「変更を保存」をクリックしたら、次に「実行」をクリックします
SQLコードの設定
「リアルタイム分析」のタブをクリックして「設定」ボタンを押すと、下のような画面が表示されますので、ここからSQLコードの設定を行います。
--Creates a temporary stream. CREATE OR REPLACE STREAM "TEMP_STREAM" ( "volume" INTEGER, "ANOMALY_SCORE" DOUBLE); --Creates another stream for application output. CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "volume" INTEGER, "ANOMALY_SCORE" DOUBLE); -- Compute an anomaly score for each record in the input stream -- using Random Cut Forest CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "TEMP_STREAM" SELECT STREAM "volume", ANOMALY_SCORE FROM TABLE( RANDOM_CUT_FOREST( --inputStream CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"), --numberOfTrees 100, --subSampleSize 10, --timeDecay 100, --shingleSize 1 )); -- Sort records by descending anomaly score, insert into output stream CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM * FROM "TEMP_STREAM" ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;
SQLを入力したら「アプリケーションを保存して実行」をクリックします。
4. データを流す
マイクを挿してからスクリプトを実行してデータを流し込んでみます。
> python sound.py {'event_time': '2022-10-11T16:16:04.614177', 'volume': 1.556536853313446} {'event_time': '2022-10-11T16:16:04.655622', 'volume': 0.9072657488286495} {'event_time': '2022-10-11T16:16:04.693902', 'volume': 0.9062547236680984} {'event_time': '2022-10-11T16:16:04.732057', 'volume': 0.7885556481778622}
最初はなにも表示されていませんので自然音のみで放置してみます。
しばらくするとマイクからの音量と一緒にリアルタイムでANOMALY_SCOREが表示されました。 なにも音を立てない場合のスコアは0.6から1.2くらいの範囲になっていました。
5. 異常値を投入
うまく動作していることがわかったので、さっそくマイクに向かって叫んでみました。
「元気ですかー!!!」
宿題をしていた娘に怒られました。
しばらく待つとデータが更新され、ANOMALY_SCOREが1.8から3.13の範囲になっていたので、異常検知できているようです。スコアが高ければアラートを出すなどのアクションができると色々使えそうですね。
おわりに
今回はJAWS DAYS 2022で紹介されていたKinesis Data AnalyticsのRandom Cut Forestを試してみました。SQLアプリケーションにはHotSpotsというアルゴリズムも用意されているので、今度はそちらも試してみたいです。