
Kinesis Data Analytics for Python Applicationのチュートリアルをやってみた
この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
こんにちは!データアナリティクス事業本部のおざわ(じ)です。 私は仙台からリモート勤務してますが、こちらは紅葉が綺麗になってきています。
今回は、AWSのドキュメントにあるチュートリアルを参考に、ストリームに流れてくるデータを集計して、結果をS3に保存するアプリケーションをデプロイしてみます。Kinesis Data Analytics Studioを使ってインタラクティブにストリーム処理を試してみたい方はこちらの記事を参考にしてください。
目次
- Kinesis Data Streamsの用意
 - データ投入用スクリプトの用意
 - サンプルプログラムの編集
 - KDAアプリケーションの作成
- Connectorのダウンロード
 - プログラム圧縮とS3へのアップロード
 - アプリケーション作成
 - アプリケーションの設定変更
 - IAMポリシーの編集
 
 - アプリケーションの実行
 
1. Kinesis Data Streamsの用意
コンソールで作成していただいてもOKです。
aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1
2. データ投入用スクリプトの用意
チュートリアルのままです。ダミーの株価データを生成して指定されたストリームに投入します。
import datetime
import json
import random
import boto3
import time
STREAM_NAME = "ExampleInputStream"
def get_data():
    return {
        'event_time': datetime.datetime.now().isoformat(),
        'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
        'price': round(random.random() * 100, 2)}
def generate(stream_name, kinesis_client):
    while True:
        data = get_data()
        print(data)
        kinesis_client.put_record(
            StreamName=stream_name,
            Data=json.dumps(data),
            PartitionKey="partitionkey")
        time.sleep(2)
if __name__ == '__main__':
    generate(STREAM_NAME, boto3.client('kinesis'))
3. サンプルプログラムの編集
ドキュメントに記載されているサンプルのリポジトリをクローンしてきます。
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples
今回はリポジトリのpythonフォルダの下にあるTumbling Windowsを選択しました。
タンブリングウィンドウについての説明は、こちらのブログ記事をご確認いただければと思います。今回のプログラムでは、入力ストリームに流れてくる株価データを重複しない形の10秒間の枠で区切り、銘柄ごとにPRICEを合計しています。下では処理を行っているメソッドと変更点のみを記載しています。
# ----- 省略 ----- #
def perform_tumbling_window_aggregation(input_table_name):
    # use SQL Table in the Table API
    input_table = table_env.from_path(input_table_name)
    tumbling_window_table = (
        input_table.window(
            Tumble.over("10.seconds").on("EVENT_TIME").alias("ten_second_window")
        )
        .group_by("TICKER, ten_second_window")
        .select("TICKER, PRICE.sum as PRICE, ten_second_window.end as EVENT_TIME")
    )
    return tumbling_window_table
# ----- 省略 ----- #
def create_table_output_s3(table_name, stream_name):
    return """ CREATE TABLE {0} (
                TICKER VARCHAR(6),
                PRICE DOUBLE,
                EVENT_TIME TIMESTAMP(3)
              )
              PARTITIONED BY (TICKER)
              WITH (
                'connector' = 'filesystem',
                'path' = 's3a://{1}/',
                'format' = 'json',
                'sink.partition-commit.policy.kind'='success-file',
                'sink.partition-commit.delay' = '1 min'
              ) """.format(
        table_name, stream_name
    )
def main():
        # ----- 省略 ----- #
        # 3. Creates a sink table writing to a Kinesis Data Stream
    #table_env.execute_sql(
    #    create_table(output_table_name, output_stream, output_region, stream_initpos)
    #)
    table_env.execute_sql(
        create_table_output_s3(output_table_name, output_stream)
    )
        # ----- 省略 ----- #
元のプログラムでは結果の出力先に別ストリームが使われていますが、今回はS3にデータを格納しています。
4. KDAアプリケーションの作成
デプロイするプログラムの用意ができましたので、アプリケーションを作成します。
4.1. Connectorのダウンロード
チュートリアルで指定されているFlink Connectorをダウンロードして、プログラムと同じフォルダに保存します。私は一瞬迷ったのですが、スクリーンショットの赤矢印で示したjarファイルのリンクからConnectorをダウンロードすることができます。

4.2. プログラムの圧縮とS3へのアップロード
設定用json、Connectorのjarファイル、サンプルプログラムを同じフォルダに入れます。 今回はローカルでの実行は行いませんので、jsonを変更する必要はなく、アプリケーションの設定は後述のランタイムプロパティで行います。
> ls application_properties.json flink-sql-connector-kinesis_2.12-1.13.2.jar tumbling-windows.py > zip myapp ./* adding: application_properties.json (deflated 59%) adding: flink-sql-connector-kinesis_2.12-1.13.2.jar (deflated 9%) adding: tumbling-windows.py (deflated 68%)
zipできたらS3にアップロードしてください。
4.3. アプリケーション作成
「ストリーミングアプリケーションの作成」をクリックしてアプリケーションを作成します。

Apache Flinkバージョンには1.13を指定して、アプリケーションには任意の名前をつけます。
この時に作成されるIAMロールは後ほど変更しますので、名前を覚えておきます。 また、今回はテストなので「アプリケーション設定のテンプレート」には「開発」を選択します。

アプリケーションが作成されたら「設定」から設定変更を行います。
4.4. アプリケーションの設定変更
「アプリケーションコードの場所」にはzipを格納したS3バケットとパスを追加します。

次にランタイムプロパティを変更します。
ここではアプリケーションで使用するライブラリ、入力ストリーム、出力先S3バケット等のプロパティを追加していきます。
input.stream.name、output.stream.nameは先ほど作成したストリームと任意のS3バケットに変更してください。

| グループID | キー | 値 | 
|---|---|---|
| consumer.config.0 | input.stream.name | 入力ストリーム | 
| consumer.config.0 | aws.region | ap-northeast-1 | 
| consumer.config.0 | flink.stream.initpos | LATEST | 
| producer.config.0 | output.stream.name | S3バケット | 
| producer.config.0 | aws.region | ap-northeast-1 | 
| producer.config.0 | shard.count | 1 | 
| kinesis.analytics.flink.run.options | python | tumbling-windows.py | 
| kinesis.analytics.flink.run.options | jarfile | flink-sql-connector-kinesis_2.12-1.13.2.jar(※1) | 
※1 チュートリアルで指定されているFlink Connectorのバージョンと2022年10月27日時点でダウンロードできるjarのバージョンが異なりますので、ランタイムプロパティで指定するConnectorのバージョンも「2.12-1.13.2」に変更してください(Feedbackは送信済み)。
4.5. IAMポリシーの編集
アプリケーションで使用するIAMロールのポリシーに下記を追加します。
入力元のストリームのARNとS3バケットの部分は書き換えてください。
        ,
        {
            "Sid": "ReadInputStream",
            "Effect": "Allow",
            "Action": "kinesis:*",
            "Resource": "<ストリームのARN>"
        },
        {
            "Sid": "WriteOutputS3",
            "Effect": "Allow",
            "Action": [
                "s3:AbortMultipartUpload",
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:ListBucketMultipartUploads",
                "s3:PutObject"
            ],
            "Resource": [
                "arn:aws:s3:::<S3バケット>",
                "arn:aws:s3:::<S3バケット>/*"
            ]
        }
5. 実行
アプリケーションの更新が終わったら「実行」を押します。

アプリケーションが実行中になったら「Apache Flinkダッシュボードを開く」をクリックするとFlinkのダッシュボードが確認できます。Running Jobsが1になっていて、Jobが動いていますね!

画面左側のRunning Jobsをクリックすると、Overviewのタブにプログラムの内容が表示されています。

データを投入
すこし長くなりましたが、これでアプリケーションの準備が整いました。
さっそくストリームにデータを入れてみます。
> python stock.py
{'TICKER': 'INTC', 'PRICE': 9.62, 'EVENT_TIME': '2022-10-27T16:01:37.321681'}
{'TICKER': 'INTC', 'PRICE': 79.37, 'EVENT_TIME': '2022-10-27T16:01:44.761172'}
{'TICKER': 'AAPL', 'PRICE': 36.03, 'EVENT_TIME': '2022-10-27T16:01:46.795298'}
{'TICKER': 'AAPL', 'PRICE': 14.09, 'EVENT_TIME': '2022-10-27T16:01:48.827209'}
{'TICKER': 'AMZN', 'PRICE': 84.46, 'EVENT_TIME': '2022-10-27T16:01:50.861608'}
しばらく待つとダッシュボードの表示が変わって処理済みのレコード数などが表示されてきました!

S3も確認してみると、アプリケーションから結果が届いています!


最後に
今回は公式チュートリアルを参考にKinesis Data Analytics Applicationをデプロイしてみました。ご紹介したリポジトリには、他にもさまざまなサンプルが用意されていますので、ぜひ気になったものを試してみてください。







