Kinesis Data Analytics for Python Applicationのチュートリアルをやってみた
こんにちは!データアナリティクス事業本部のおざわ(じ)です。 私は仙台からリモート勤務してますが、こちらは紅葉が綺麗になってきています。
今回は、AWSのドキュメントにあるチュートリアルを参考に、ストリームに流れてくるデータを集計して、結果をS3に保存するアプリケーションをデプロイしてみます。Kinesis Data Analytics Studioを使ってインタラクティブにストリーム処理を試してみたい方はこちらの記事を参考にしてください。
目次
- Kinesis Data Streamsの用意
- データ投入用スクリプトの用意
- サンプルプログラムの編集
- KDAアプリケーションの作成
- Connectorのダウンロード
- プログラム圧縮とS3へのアップロード
- アプリケーション作成
- アプリケーションの設定変更
- IAMポリシーの編集
- アプリケーションの実行
1. Kinesis Data Streamsの用意
コンソールで作成していただいてもOKです。
aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1
2. データ投入用スクリプトの用意
チュートリアルのままです。ダミーの株価データを生成して指定されたストリームに投入します。
import datetime import json import random import boto3 import time STREAM_NAME = "ExampleInputStream" def get_data(): return { 'event_time': datetime.datetime.now().isoformat(), 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") time.sleep(2) if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis'))
3. サンプルプログラムの編集
ドキュメントに記載されているサンプルのリポジトリをクローンしてきます。
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples
今回はリポジトリのpythonフォルダの下にあるTumbling Windows
を選択しました。
タンブリングウィンドウについての説明は、こちらのブログ記事をご確認いただければと思います。今回のプログラムでは、入力ストリームに流れてくる株価データを重複しない形の10秒間の枠で区切り、銘柄ごとにPRICEを合計しています。下では処理を行っているメソッドと変更点のみを記載しています。
# ----- 省略 ----- # def perform_tumbling_window_aggregation(input_table_name): # use SQL Table in the Table API input_table = table_env.from_path(input_table_name) tumbling_window_table = ( input_table.window( Tumble.over("10.seconds").on("EVENT_TIME").alias("ten_second_window") ) .group_by("TICKER, ten_second_window") .select("TICKER, PRICE.sum as PRICE, ten_second_window.end as EVENT_TIME") ) return tumbling_window_table # ----- 省略 ----- # def create_table_output_s3(table_name, stream_name): return """ CREATE TABLE {0} ( TICKER VARCHAR(6), PRICE DOUBLE, EVENT_TIME TIMESTAMP(3) ) PARTITIONED BY (TICKER) WITH ( 'connector' = 'filesystem', 'path' = 's3a://{1}/', 'format' = 'json', 'sink.partition-commit.policy.kind'='success-file', 'sink.partition-commit.delay' = '1 min' ) """.format( table_name, stream_name ) def main(): # ----- 省略 ----- # # 3. Creates a sink table writing to a Kinesis Data Stream #table_env.execute_sql( # create_table(output_table_name, output_stream, output_region, stream_initpos) #) table_env.execute_sql( create_table_output_s3(output_table_name, output_stream) ) # ----- 省略 ----- #
元のプログラムでは結果の出力先に別ストリームが使われていますが、今回はS3にデータを格納しています。
4. KDAアプリケーションの作成
デプロイするプログラムの用意ができましたので、アプリケーションを作成します。
4.1. Connectorのダウンロード
チュートリアルで指定されているFlink Connectorをダウンロードして、プログラムと同じフォルダに保存します。私は一瞬迷ったのですが、スクリーンショットの赤矢印で示したjarファイルのリンクからConnectorをダウンロードすることができます。
4.2. プログラムの圧縮とS3へのアップロード
設定用json、Connectorのjarファイル、サンプルプログラムを同じフォルダに入れます。 今回はローカルでの実行は行いませんので、jsonを変更する必要はなく、アプリケーションの設定は後述のランタイムプロパティで行います。
> ls application_properties.json flink-sql-connector-kinesis_2.12-1.13.2.jar tumbling-windows.py > zip myapp ./* adding: application_properties.json (deflated 59%) adding: flink-sql-connector-kinesis_2.12-1.13.2.jar (deflated 9%) adding: tumbling-windows.py (deflated 68%)
zipできたらS3にアップロードしてください。
4.3. アプリケーション作成
「ストリーミングアプリケーションの作成」をクリックしてアプリケーションを作成します。
Apache Flinkバージョンには1.13を指定して、アプリケーションには任意の名前をつけます。
この時に作成されるIAMロールは後ほど変更しますので、名前を覚えておきます。 また、今回はテストなので「アプリケーション設定のテンプレート」には「開発」を選択します。
アプリケーションが作成されたら「設定」から設定変更を行います。
4.4. アプリケーションの設定変更
「アプリケーションコードの場所」にはzipを格納したS3バケットとパスを追加します。
次にランタイムプロパティを変更します。
ここではアプリケーションで使用するライブラリ、入力ストリーム、出力先S3バケット等のプロパティを追加していきます。
input.stream.name
、output.stream.name
は先ほど作成したストリームと任意のS3バケットに変更してください。
グループID | キー | 値 |
---|---|---|
consumer.config.0 | input.stream.name | 入力ストリーム |
consumer.config.0 | aws.region | ap-northeast-1 |
consumer.config.0 | flink.stream.initpos | LATEST |
producer.config.0 | output.stream.name | S3バケット |
producer.config.0 | aws.region | ap-northeast-1 |
producer.config.0 | shard.count | 1 |
kinesis.analytics.flink.run.options | python | tumbling-windows.py |
kinesis.analytics.flink.run.options | jarfile | flink-sql-connector-kinesis_2.12-1.13.2.jar(※1) |
※1 チュートリアルで指定されているFlink Connectorのバージョンと2022年10月27日時点でダウンロードできるjarのバージョンが異なりますので、ランタイムプロパティで指定するConnectorのバージョンも「2.12-1.13.2」に変更してください(Feedbackは送信済み)。
4.5. IAMポリシーの編集
アプリケーションで使用するIAMロールのポリシーに下記を追加します。
入力元のストリームのARNとS3バケットの部分は書き換えてください。
, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "<ストリームのARN>" }, { "Sid": "WriteOutputS3", "Effect": "Allow", "Action": [ "s3:AbortMultipartUpload", "s3:GetBucketLocation", "s3:GetObject", "s3:ListBucket", "s3:ListBucketMultipartUploads", "s3:PutObject" ], "Resource": [ "arn:aws:s3:::<S3バケット>", "arn:aws:s3:::<S3バケット>/*" ] }
5. 実行
アプリケーションの更新が終わったら「実行」を押します。
アプリケーションが実行中になったら「Apache Flinkダッシュボードを開く」をクリックするとFlinkのダッシュボードが確認できます。Running Jobsが1になっていて、Jobが動いていますね!
画面左側のRunning Jobsをクリックすると、Overviewのタブにプログラムの内容が表示されています。
データを投入
すこし長くなりましたが、これでアプリケーションの準備が整いました。
さっそくストリームにデータを入れてみます。
> python stock.py {'TICKER': 'INTC', 'PRICE': 9.62, 'EVENT_TIME': '2022-10-27T16:01:37.321681'} {'TICKER': 'INTC', 'PRICE': 79.37, 'EVENT_TIME': '2022-10-27T16:01:44.761172'} {'TICKER': 'AAPL', 'PRICE': 36.03, 'EVENT_TIME': '2022-10-27T16:01:46.795298'} {'TICKER': 'AAPL', 'PRICE': 14.09, 'EVENT_TIME': '2022-10-27T16:01:48.827209'} {'TICKER': 'AMZN', 'PRICE': 84.46, 'EVENT_TIME': '2022-10-27T16:01:50.861608'}
しばらく待つとダッシュボードの表示が変わって処理済みのレコード数などが表示されてきました!
S3も確認してみると、アプリケーションから結果が届いています!
最後に
今回は公式チュートリアルを参考にKinesis Data Analytics Applicationをデプロイしてみました。ご紹介したリポジトリには、他にもさまざまなサンプルが用意されていますので、ぜひ気になったものを試してみてください。