この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
こんにちは!ブログ筋を鍛えたい入社12日目のおざわ(じ)です。
AWS学習とブログネタ探しも兼ねて、10月8日(土)に開催されたJAWS DAYS 2022に参加してきました。
私はオンライン参加でしたが、日本各地にサテライト会場も設置されており、各会場の様子もバーチャルで覗くことができました。セッションの合間にはノリノリのBGMで実行委員の皆さんの紹介もあり、オンラインでありながら会場にいるかのように感じられる素晴らしいイベントでした。
概要
さて、本記事の内容ですが、今回のイベントで個人的に興味を惹かれたソラコム株式会社 松下さんの発表内容を手元の環境で試してみました。
使用したサービスは「Kinesis Data Streams」と「Kinesis Data Analytics」の2つになります。 異常検知に関しては、発表と同じKinesis Data Analyticsに用意されているRandom Cut Forest(RCF)というアルゴリズムを使用します。
今回はAWSコンソールで異常検知できたことを確かめるのみでしたが、異常検知した後に何らかのアクションを行う場合は、Lambdaを使ったこちらの記事もご参照ください。
やったこと
- Kinesis Data Streamの設定
- スクリプトの準備
- Kinesis Data Analyticsの設定
- 正常値データを投入
- 異常値を投入
1. Kinesis Data Streamの設定
まずはデータ入力先のデータストリームをCLIで作成します。
aws kinesis create-stream \
--stream-name VolumeStream \
--shard-count 1 \
--region ap-northeast-1
2. スクリプトの準備
今回はストリーム用のデータとして、マイクからの音量をリアルタイムに流し込んでみようと思います。python-sounddeviceが手軽に使えそうだったので、今回はこちらを使用しました。
sound.py
import json
import datetime
import boto3
import sounddevice as sd
import numpy as np
STREAM_NAME = "VolumeStream"
DURATION = 15
kinesis = boto3.client("kinesis")
def put_volume(indata, outdata, frames, time, status):
volume = np.linalg.norm(indata) * DURATION
data = {
'event_time': datetime.datetime.now().isoformat(),
'volume': volume
}
print (data)
kinesis.put_record(
StreamName=STREAM_NAME,
Data=json.dumps(data),
PartitionKey="123")
with sd.Stream(samplerate=44100, callback=put_volume):
sd.sleep(int(1000 * DURATION))
次のステップで「スキーマの検出」を行うので、先にデータを入れておきます。
> python sound.py
{'event_time': '2022-10-11T16:13:02.150653', 'volume': 0.0}
{'event_time': '2022-10-11T16:13:07.619224', 'volume': 0.19032749813050032}
{'event_time': '2022-10-11T16:13:07.660909', 'volume': 1.829547956585884}
{'event_time': '2022-10-11T16:13:07.700621', 'volume': 1.3599266856908798}
3. Kinesis Data Analyticsの設定
SQLアプリケーションを作成します。
新しいプロジェクトでは、SQLアプリケーションよりも新しいKinesis Data Analytics Studioを使用することが推奨されていますが、今回は練習も兼ねて以前のインターフェースからやってみます。 Kinesis Data Analytics Studioの利用方法に関しては、参考記事にリンクを載せていますので、そちらをご確認いただければと思います。
アプリケーションが作成できたら、ソースとなるストリームに先ほど設定したものを割り当てます。参照ボタンからストリームを選択して「スキーマを検出」ボタンをクリックするとスキーマが検出されます。
「変更を保存」をクリックしたら、次に「実行」をクリックします
SQLコードの設定
「リアルタイム分析」のタブをクリックして「設定」ボタンを押すと、下のような画面が表示されますので、ここからSQLコードの設定を行います。
--Creates a temporary stream.
CREATE OR REPLACE STREAM "TEMP_STREAM" (
"volume" INTEGER,
"ANOMALY_SCORE" DOUBLE);
--Creates another stream for application output.
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
"volume" INTEGER,
"ANOMALY_SCORE" DOUBLE);
-- Compute an anomaly score for each record in the input stream
-- using Random Cut Forest
CREATE OR REPLACE PUMP "STREAM_PUMP" AS
INSERT INTO "TEMP_STREAM"
SELECT
STREAM "volume",
ANOMALY_SCORE
FROM TABLE(
RANDOM_CUT_FOREST(
--inputStream
CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
--numberOfTrees
100,
--subSampleSize
10,
--timeDecay
100,
--shingleSize
1
));
-- Sort records by descending anomaly score, insert into output stream
CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS
INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM * FROM "TEMP_STREAM"
ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;
SQLを入力したら「アプリケーションを保存して実行」をクリックします。
4. データを流す
マイクを挿してからスクリプトを実行してデータを流し込んでみます。
> python sound.py
{'event_time': '2022-10-11T16:16:04.614177', 'volume': 1.556536853313446}
{'event_time': '2022-10-11T16:16:04.655622', 'volume': 0.9072657488286495}
{'event_time': '2022-10-11T16:16:04.693902', 'volume': 0.9062547236680984}
{'event_time': '2022-10-11T16:16:04.732057', 'volume': 0.7885556481778622}
最初はなにも表示されていませんので自然音のみで放置してみます。
しばらくするとマイクからの音量と一緒にリアルタイムでANOMALY_SCOREが表示されました。 なにも音を立てない場合のスコアは0.6から1.2くらいの範囲になっていました。
5. 異常値を投入
うまく動作していることがわかったので、さっそくマイクに向かって叫んでみました。
「元気ですかー!!!」
宿題をしていた娘に怒られました。
しばらく待つとデータが更新され、ANOMALY_SCOREが1.8から3.13の範囲になっていたので、異常検知できているようです。スコアが高ければアラートを出すなどのアクションができると色々使えそうですね。
おわりに
今回はJAWS DAYS 2022で紹介されていたKinesis Data AnalyticsのRandom Cut Forestを試してみました。SQLアプリケーションにはHotSpotsというアルゴリズムも用意されているので、今度はそちらも試してみたいです。