Kinesis Data Analytics for Python Applicationのチュートリアルをやってみた

公式チュートリアルを参考にKinesis Data Analytics Applicationをデプロイしてみました
2022.10.31

こんにちは!データアナリティクス事業本部のおざわ(じ)です。 私は仙台からリモート勤務してますが、こちらは紅葉が綺麗になってきています。

今回は、AWSのドキュメントにあるチュートリアルを参考に、ストリームに流れてくるデータを集計して、結果をS3に保存するアプリケーションをデプロイしてみます。Kinesis Data Analytics Studioを使ってインタラクティブにストリーム処理を試してみたい方はこちらの記事を参考にしてください。

目次

  1. Kinesis Data Streamsの用意
  2. データ投入用スクリプトの用意
  3. サンプルプログラムの編集
  4. KDAアプリケーションの作成
    1. Connectorのダウンロード
    2. プログラム圧縮とS3へのアップロード
    3. アプリケーション作成
    4. アプリケーションの設定変更
    5. IAMポリシーの編集
  5. アプリケーションの実行

1. Kinesis Data Streamsの用意

コンソールで作成していただいてもOKです。

aws kinesis create-stream \
--stream-name ExampleInputStream \
--shard-count 1

2. データ投入用スクリプトの用意

チュートリアルのままです。ダミーの株価データを生成して指定されたストリームに投入します。

stock.py

import datetime
import json
import random
import boto3
import time

STREAM_NAME = "ExampleInputStream"

def get_data():
    return {
        'event_time': datetime.datetime.now().isoformat(),
        'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
        'price': round(random.random() * 100, 2)}

def generate(stream_name, kinesis_client):
    while True:
        data = get_data()
        print(data)
        kinesis_client.put_record(
            StreamName=stream_name,
            Data=json.dumps(data),
            PartitionKey="partitionkey")
        time.sleep(2)

if __name__ == '__main__':
    generate(STREAM_NAME, boto3.client('kinesis'))

3. サンプルプログラムの編集

ドキュメントに記載されているサンプルのリポジトリをクローンしてきます。

git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples

今回はリポジトリのpythonフォルダの下にあるTumbling Windowsを選択しました。

タンブリングウィンドウについての説明は、こちらのブログ記事をご確認いただければと思います。今回のプログラムでは、入力ストリームに流れてくる株価データを重複しない形の10秒間の枠で区切り、銘柄ごとにPRICEを合計しています。下では処理を行っているメソッドと変更点のみを記載しています。

tumbling-windows.py

# ----- 省略 ----- #
def perform_tumbling_window_aggregation(input_table_name):
    # use SQL Table in the Table API
    input_table = table_env.from_path(input_table_name)

    tumbling_window_table = (
        input_table.window(
            Tumble.over("10.seconds").on("EVENT_TIME").alias("ten_second_window")
        )
        .group_by("TICKER, ten_second_window")
        .select("TICKER, PRICE.sum as PRICE, ten_second_window.end as EVENT_TIME")
    )

    return tumbling_window_table
# ----- 省略 ----- #
def create_table_output_s3(table_name, stream_name):
    return """ CREATE TABLE {0} (
                TICKER VARCHAR(6),
                PRICE DOUBLE,
                EVENT_TIME TIMESTAMP(3)
              )
              PARTITIONED BY (TICKER)
              WITH (
                'connector' = 'filesystem',
                'path' = 's3a://{1}/',
                'format' = 'json',
                'sink.partition-commit.policy.kind'='success-file',
                'sink.partition-commit.delay' = '1 min'
              ) """.format(
        table_name, stream_name
    )

def main():
        # ----- 省略 ----- #
        # 3. Creates a sink table writing to a Kinesis Data Stream
    #table_env.execute_sql(
    #    create_table(output_table_name, output_stream, output_region, stream_initpos)
    #)
    table_env.execute_sql(
        create_table_output_s3(output_table_name, output_stream)
    )
        # ----- 省略 ----- #

元のプログラムでは結果の出力先に別ストリームが使われていますが、今回はS3にデータを格納しています。

4. KDAアプリケーションの作成

デプロイするプログラムの用意ができましたので、アプリケーションを作成します。

4.1. Connectorのダウンロード

チュートリアルで指定されているFlink Connectorをダウンロードして、プログラムと同じフォルダに保存します。私は一瞬迷ったのですが、スクリーンショットの赤矢印で示したjarファイルのリンクからConnectorをダウンロードすることができます。

4.2. プログラムの圧縮とS3へのアップロード

設定用json、Connectorのjarファイル、サンプルプログラムを同じフォルダに入れます。 今回はローカルでの実行は行いませんので、jsonを変更する必要はなく、アプリケーションの設定は後述のランタイムプロパティで行います。

> ls
application_properties.json
flink-sql-connector-kinesis_2.12-1.13.2.jar
tumbling-windows.py

> zip myapp ./*
adding: application_properties.json (deflated 59%)
adding: flink-sql-connector-kinesis_2.12-1.13.2.jar (deflated 9%)
adding: tumbling-windows.py (deflated 68%)

zipできたらS3にアップロードしてください。

4.3. アプリケーション作成

「ストリーミングアプリケーションの作成」をクリックしてアプリケーションを作成します。

Apache Flinkバージョンには1.13を指定して、アプリケーションには任意の名前をつけます。

この時に作成されるIAMロールは後ほど変更しますので、名前を覚えておきます。 また、今回はテストなので「アプリケーション設定のテンプレート」には「開発」を選択します。

アプリケーションが作成されたら「設定」から設定変更を行います。

4.4. アプリケーションの設定変更

「アプリケーションコードの場所」にはzipを格納したS3バケットとパスを追加します。

次にランタイムプロパティを変更します。

ここではアプリケーションで使用するライブラリ、入力ストリーム、出力先S3バケット等のプロパティを追加していきます。 input.stream.nameoutput.stream.nameは先ほど作成したストリームと任意のS3バケットに変更してください。

グループID キー
consumer.config.0 input.stream.name 入力ストリーム
consumer.config.0 aws.region ap-northeast-1
consumer.config.0 flink.stream.initpos LATEST
producer.config.0 output.stream.name S3バケット
producer.config.0 aws.region ap-northeast-1
producer.config.0 shard.count 1
kinesis.analytics.flink.run.options python tumbling-windows.py
kinesis.analytics.flink.run.options jarfile flink-sql-connector-kinesis_2.12-1.13.2.jar(※1)

※1 チュートリアルで指定されているFlink Connectorのバージョンと2022年10月27日時点でダウンロードできるjarのバージョンが異なりますので、ランタイムプロパティで指定するConnectorのバージョンも「2.12-1.13.2」に変更してください(Feedbackは送信済み)。

4.5. IAMポリシーの編集

アプリケーションで使用するIAMロールのポリシーに下記を追加します。

入力元のストリームのARNとS3バケットの部分は書き換えてください。

        ,
        {
            "Sid": "ReadInputStream",
            "Effect": "Allow",
            "Action": "kinesis:*",
            "Resource": "<ストリームのARN>"
        },
        {
            "Sid": "WriteOutputS3",
            "Effect": "Allow",
            "Action": [
                "s3:AbortMultipartUpload",
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:ListBucketMultipartUploads",
                "s3:PutObject"
            ],
            "Resource": [
                "arn:aws:s3:::<S3バケット>",
                "arn:aws:s3:::<S3バケット>/*"
            ]
        }

5. 実行

アプリケーションの更新が終わったら「実行」を押します。

アプリケーションが実行中になったら「Apache Flinkダッシュボードを開く」をクリックするとFlinkのダッシュボードが確認できます。Running Jobsが1になっていて、Jobが動いていますね!

画面左側のRunning Jobsをクリックすると、Overviewのタブにプログラムの内容が表示されています。

データを投入

すこし長くなりましたが、これでアプリケーションの準備が整いました。

さっそくストリームにデータを入れてみます。

> python stock.py
{'TICKER': 'INTC', 'PRICE': 9.62, 'EVENT_TIME': '2022-10-27T16:01:37.321681'}
{'TICKER': 'INTC', 'PRICE': 79.37, 'EVENT_TIME': '2022-10-27T16:01:44.761172'}
{'TICKER': 'AAPL', 'PRICE': 36.03, 'EVENT_TIME': '2022-10-27T16:01:46.795298'}
{'TICKER': 'AAPL', 'PRICE': 14.09, 'EVENT_TIME': '2022-10-27T16:01:48.827209'}
{'TICKER': 'AMZN', 'PRICE': 84.46, 'EVENT_TIME': '2022-10-27T16:01:50.861608'}

しばらく待つとダッシュボードの表示が変わって処理済みのレコード数などが表示されてきました!

S3も確認してみると、アプリケーションから結果が届いています!

最後に

今回は公式チュートリアルを参考にKinesis Data Analytics Applicationをデプロイしてみました。ご紹介したリポジトリには、他にもさまざまなサンプルが用意されていますので、ぜひ気になったものを試してみてください。

参考URL