話題の記事

ストリーミングズンドコをKinesis Data Analyticsでキヨシ判定してみた

2020.03.17

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

ズンドコキヨシ(プログラム)とは?

今回はこのズンドコキヨシを、Kinesis Data AnalyticsSQL Applicationsでやってみました。

入力はKinesis Data Streamsに別のプログラムとして送っており、キヨシ判定後に終了しないため、もしかすると純粋ズンドコキヨシではないのかもしれませんが、あらかじめご承知おきください。

What if...

世界は変化し、あらゆるプログラムがズンドコを送るようになりました。

これに対応するため、あなたは多数のズンドコ入力を判定し、キヨシを出力しなくてはならなくなりました。

ズンドコ入力は、ファイルでズンドコがたくさん書かれたものが送られくるのではなく、「ズン」と「ドコ」が個別かつ連続的に発生し続ける、ストリーミングデータとして送られてきます。

「ズンズンズンズンドコ」の順序でズンドコ入力が流れてきた場合に対応して、別のストリーミングデータとして「キヨシ」をその都度出力する必要があります。

キヨシは合いの手であるため、ズンズンズンズンドコが流れてきたら最大でも数秒以内に、つまりニアリアルタイムに、キヨシを出力する必要があります。また、もちろんズンズンズンズンドコが流れてきたら、見逃しなくキヨシを返さなくてはなりません。

幸いなことに、入力のズンドコはKinesis Data Streamsに送られているようになっています。シャード数の調整は必要となりますが、マネージドサービスであるので、数えられないくらいのズンドコ入力があっても、あなたはデータの入り口の管理は手間をかからずに済んでいます。

あなたは、ズンドコ判定部分もマネージドサービスやサーバーレスアーキテクチャによって作り、なるべく管理の楽でスケールしやすいシステムにしたいと考えました。

しかし、ステートレスなサービスであるLambdaで、過去の連続したデータを見るというステートフルなサービスを実装するのは難しそうです。DynamoDBなどをくっつけたとしてもうまい具合に処理する方法が思いつきません。Kinesis Data StreamsからLambdaを蹴る際、大きなバッチサイズにして多数のレコードを見ることを考えましたが、バッチサイズを大きくするとそれだけ遅延が発生し、またいずれにせよ取りこぼしを防ぐために必要となるバッチ間をまたぐズンドコ判定処理の設計が、非常に難しいとあなたは感じています。

Kinesis Data FirehoseでS3に出力し、GlueかAthenaでバッチ処理的にズンドコ判定しようとも考えましたが、最大でも数秒以内というニアリアルタイムの要件を満たすことができませんでした。

どうすればいいのか困り果てていたあなたに、そっとKinesis Data Analyticsが手を差し伸べてきました…。

Kinesis Data Analyticsとは?

Kinesisのストリーミングデータに対して、ステートフルなアプリケーションを通して、別のKinesisストリームを作成することのできるサービスです。

ステートフルな、というのがポイントです。今回であれば、「過去4回分のズンドコと今来たズンドコを連結し、"ズンズンズンズンドコ"と比較する」という処理をするために、過去の状態を持つ必要があるからです。

過去のいくらかのデータを参照するウィンドウ処理に適したサービスになっています。例えば、IoTセンサーの過去100回の値の平均を取ったり、1分ごとにすべてのIoT機器の値の最大値を取るといった集計処理にも使うことが出来ます。

また、ニアリアルタイムに処理を行うことができ、ストリーミングデータに即応したストリーミングデータを生成することが可能です。

Kinesis Data Analyticsのアプリケーションは、SQLか、Apache Flink(Java)で記述することができます。

SQLでのサンプルアプリケーションの動かし方は、次の記事が参考になると思います。

【新機能】Kinesis Analyticsが利用可能になりました!

現在(2020年3月)では、東京(ap-northeast-1)リージョンでも利用可能です。

今回はSQLを利用して、ズンドコ判定をKinesis Data Analyticsアプリケーションとして作成しました。

 ストリーミングズンドコキヨシ

今回作成したズンドコキヨシの全体図は次のとおりです。

ランダムなズンドコをLambdaからKinesis Data Streamsへ送り、そこからKinesis Data Analyticsで判定し、出力のキヨシを別のKinesis Data Streamsに流して、そこからLambdaを起動してCloudwatch logsに記録しています。

ランダムのズンドコ入力のための、Lambda関数は次のとおりです。

import json
import boto3
import random
import datetime
import time

kinesis = boto3.client('kinesis')

def lambda_handler(event, context):
    zundoko_array = [""] * 5
    
    while True:
        zundoko = random.choice(['ズン', 'ドコ'])
    
        del zundoko_array[0]
        zundoko_array.append(zundoko)
        is_zun_zun_zun_zun_doko = zundoko_array == ['ズン', 'ズン', 'ズン', 'ズン', 'ドコ']
        if is_zun_zun_zun_zun_doko:
            kiyoshi_time = datetime.datetime.utcnow()
            print(kiyoshi_time)
    
        data = json.dumps({"zundoko": zundoko})
        kinesis.put_record(
            StreamName="zundokoInputStream",
            Data=data,
            PartitionKey="zundoko")
    
        time.sleep(0.05)

ズンズンズンズンドコだった場合に時刻(入力キヨシタイム)を記録して、ランダムなズンドコをKinesis Data Streamsに突っ込んでいるだけです。

出力のLambdaは、次のとおりです。

import json
import datetime

def lambda_handler(event, context):
    print(json.dumps(event))
    print(datetime.datetime.utcnow())
    return

本当に単に送られてくるレコードの入ったeventと時刻(出力キヨシタイム)をprintで記録しているだけです。この出力Lambdaをキヨシ出力Kinesis Data Streamsから起動させるために、次のように1つデータが来るたびに出力Lambdaが起動するように設定します。

本丸であるKinesis Data AnalyticsのSQLアプリケーションは次のとおりです。

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (kiyoshi CHAR(3));
CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
    INSERT INTO "DESTINATION_SQL_STREAM"
    SELECT STREAM 
        'キヨシ' AS kiyoshi
    FROM (
        SELECT STREAM
            LAG("zundoko", 4) OVER FOUR_ROWS AS zun_1,
            LAG("zundoko", 3) OVER FOUR_ROWS AS zun_2,
            LAG("zundoko", 2) OVER FOUR_ROWS AS zun_3,
            LAG("zundoko", 1) OVER FOUR_ROWS AS zun_4,
            "zundoko"
        FROM "SOURCE_SQL_STREAM_001"
        WINDOW FOUR_ROWS AS (ROWS 4 PRECEDING);
    )
    WHERE (zun_1 || zun_2 || zun_3 || zun_4 || "zundoko") = 'ズンズンズンズンドコ'

アプリケーションと言っても、ご覧の通り単なるSQLクエリであることが見て取れると思います。

クエリの説明をすると、WINDOW句で今来た入力から4つ前の入力まで見るということ以外は、普通のSQLクエリかと思います。あとはLAG関数によって、今来た入力と1~4つ前の値をカラムにして、WHERE句でズンドコ判定しています。キヨシの場合は、出力に文字列でキヨシを設定しています。

少しハマったのが、入力はPythonのプログラム通り{"zundoko": zundoko}という形式で送られてくるため、カラム名は小文字なのですが、単に「zundoko」とKinesis Data AnalyticsのSQLに入力すると勝手に大文字に変換されてしまうため、ダブルクオーテーションで囲って「"zundoko"」としてあります。

Kinesis Data Analyticsの設定は次のとおりです。

入力と出力のKinesis Data Streamsを指定しているだけです。SQLはWebコンソールで入力することができます。Kinesis Data Analyticsの設定は実のところ権限設定を除けば、この入出力を決めてSQLを上げるだけです。マネージドサービスであるので、Lambda同様にほぼ単にコードを上げるだけで済みます。またオートスケーリングも兼ね備えているため、これだけで(最大値の増加申請とお金を出せば)いくらでも動いてしまいます。

「What if...」 のところに書いた、状態をどう管理しなくてならないか、DynamoDBのようなデータストアにどう一時記録すればよいのかを考えなくてもいいのです。4つ前までの値をKinesis Data Streamsへの入力プログラムのように、配列に入れて来るたびに消して...のような処理をする設定は必要ありません、本当にこのSQLクエリだけでいいのです。あとはKinesis Data Analytics側でよしなにやってくれます。

実際に動かしてみた

すべてのAWSリソースは、同じ東京リージョン内に作成して行いました。

入力Lambdaを起動したあと、出力LambdaのCloudwatch Logsのログを見てみました。次のように出力Kinesis Data Streamsのレコードが来ていました。

partitionKeyがおそらく「キヨシ」がそのまま入っているためか文字化けしてしまっていますが、dataの

eyJLSVlPU0hJIjoi44Kt44Oo44K3In0=

をbase64でdecodeすると、

{"KIYOSHI":"キヨシ"}

となり、しっかりとキヨシが送られていることが確認できます。

Cloudwatch Logsに出力されている、キヨシの回数と時刻を入出力で比較してみましょう。

個数も一致し、入力から出力までおおむね1秒程度となっています。正確に言うと、入力Lambdaで入力Kinesis Data Streamsに送る直前の時間と、Kinesis Data Analyticsの処理を挟んで、出力Kinesis Data Streamsを通って出力Lambdaが起動し、その直後にprintしている時間になります。

もちろん全部同じリージョン内でやってるので、ネットワーク的に近いというのはありますが、判定処理自体はニアリアルタイムとして使えるほどの時間内で行えることが確認できていると思います。

今回のズンドコキヨシの制限事項

今回のアーキテクチャでも、Kinesis Data Streamsのシャード数とKinesis Data AnalyticsのKPU(Kinesis Processing Unit、課金単位)の最大値増加申請さえ気を使って、あとは料金を払えばかなりスケールすると思います。つまり、本当に大量のプログラムからズンドコが送られるようになっても大丈夫なハズです。

ただし、例えば世界中からズンドコが送られてきた場合の、肝心の判定処理に関しては注意書きが必要となります。

Kinesis Data Analyticsに限らず、集約的にストリーミング処理をする場合に、順序付けるための時間(タイムスタンプ)をどうすればいいのか、という問題があります。

今回は、単にKinesis Data Analyticsに到着した時刻を元にズンドコの順序を見ています。なので、東京リージョンのKinesisに対して世界中からズンドコが送られてきた場合、例えば日本から送られてきたものと、ブラジルから送られてきたものでは、ズンドコが発生した時刻とズンドコがKinesis Data Analyticsに到着した時刻が前後してしまうかもしれません。これを、ズンドコが発生した時刻でズンドコ判定したいとなると、ちょっと設計が難しくなってきます。

これはこれだけで記事が書けそうなくらいなので、一旦ここまでにさせてください。詳しく気になる方は、次のドキュメントを参照していただければと思います。

https://docs.aws.amazon.com/ja_jp/kinesisanalytics/latest/dev/timestamps-rowtime-concepts.html

おわりに

この記事では、Kinesis Data Analyticsを、実際にどういう使い方がすればいいのかというところで、ストリーミングズンドコキヨシをしてみました。

このズンドコキヨシも、シンプルながら興味深いプログラムだと思いますが、世の中には更にKinesis Data Analyticsが適したシステムがあると思います。

ここぞというときの、あなたの解決策の道具の一つとして、Kinesis Data Analyticsが加わっていただければ幸いです。