この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
こんにちは!データアナリティクス事業本部のおざわ(じ)です。 私は仙台からリモート勤務してますが、こちらは紅葉が綺麗になってきています。
今回は、AWSのドキュメントにあるチュートリアルを参考に、ストリームに流れてくるデータを集計して、結果をS3に保存するアプリケーションをデプロイしてみます。Kinesis Data Analytics Studioを使ってインタラクティブにストリーム処理を試してみたい方はこちらの記事を参考にしてください。
目次
- Kinesis Data Streamsの用意
- データ投入用スクリプトの用意
- サンプルプログラムの編集
- KDAアプリケーションの作成
- Connectorのダウンロード
- プログラム圧縮とS3へのアップロード
- アプリケーション作成
- アプリケーションの設定変更
- IAMポリシーの編集
- アプリケーションの実行
1. Kinesis Data Streamsの用意
コンソールで作成していただいてもOKです。
aws kinesis create-stream \
--stream-name ExampleInputStream \
--shard-count 1
2. データ投入用スクリプトの用意
チュートリアルのままです。ダミーの株価データを生成して指定されたストリームに投入します。
stock.py
import datetime
import json
import random
import boto3
import time
STREAM_NAME = "ExampleInputStream"
def get_data():
return {
'event_time': datetime.datetime.now().isoformat(),
'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
'price': round(random.random() * 100, 2)}
def generate(stream_name, kinesis_client):
while True:
data = get_data()
print(data)
kinesis_client.put_record(
StreamName=stream_name,
Data=json.dumps(data),
PartitionKey="partitionkey")
time.sleep(2)
if __name__ == '__main__':
generate(STREAM_NAME, boto3.client('kinesis'))
3. サンプルプログラムの編集
ドキュメントに記載されているサンプルのリポジトリをクローンしてきます。
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples
今回はリポジトリのpythonフォルダの下にあるTumbling Windows
を選択しました。
タンブリングウィンドウについての説明は、こちらのブログ記事をご確認いただければと思います。今回のプログラムでは、入力ストリームに流れてくる株価データを重複しない形の10秒間の枠で区切り、銘柄ごとにPRICEを合計しています。下では処理を行っているメソッドと変更点のみを記載しています。
tumbling-windows.py
# ----- 省略 ----- #
def perform_tumbling_window_aggregation(input_table_name):
# use SQL Table in the Table API
input_table = table_env.from_path(input_table_name)
tumbling_window_table = (
input_table.window(
Tumble.over("10.seconds").on("EVENT_TIME").alias("ten_second_window")
)
.group_by("TICKER, ten_second_window")
.select("TICKER, PRICE.sum as PRICE, ten_second_window.end as EVENT_TIME")
)
return tumbling_window_table
# ----- 省略 ----- #
def create_table_output_s3(table_name, stream_name):
return """ CREATE TABLE {0} (
TICKER VARCHAR(6),
PRICE DOUBLE,
EVENT_TIME TIMESTAMP(3)
)
PARTITIONED BY (TICKER)
WITH (
'connector' = 'filesystem',
'path' = 's3a://{1}/',
'format' = 'json',
'sink.partition-commit.policy.kind'='success-file',
'sink.partition-commit.delay' = '1 min'
) """.format(
table_name, stream_name
)
def main():
# ----- 省略 ----- #
# 3. Creates a sink table writing to a Kinesis Data Stream
#table_env.execute_sql(
# create_table(output_table_name, output_stream, output_region, stream_initpos)
#)
table_env.execute_sql(
create_table_output_s3(output_table_name, output_stream)
)
# ----- 省略 ----- #
元のプログラムでは結果の出力先に別ストリームが使われていますが、今回はS3にデータを格納しています。
4. KDAアプリケーションの作成
デプロイするプログラムの用意ができましたので、アプリケーションを作成します。
4.1. Connectorのダウンロード
チュートリアルで指定されているFlink Connectorをダウンロードして、プログラムと同じフォルダに保存します。私は一瞬迷ったのですが、スクリーンショットの赤矢印で示したjarファイルのリンクからConnectorをダウンロードすることができます。
4.2. プログラムの圧縮とS3へのアップロード
設定用json、Connectorのjarファイル、サンプルプログラムを同じフォルダに入れます。 今回はローカルでの実行は行いませんので、jsonを変更する必要はなく、アプリケーションの設定は後述のランタイムプロパティで行います。
> ls
application_properties.json
flink-sql-connector-kinesis_2.12-1.13.2.jar
tumbling-windows.py
> zip myapp ./*
adding: application_properties.json (deflated 59%)
adding: flink-sql-connector-kinesis_2.12-1.13.2.jar (deflated 9%)
adding: tumbling-windows.py (deflated 68%)
zipできたらS3にアップロードしてください。
4.3. アプリケーション作成
「ストリーミングアプリケーションの作成」をクリックしてアプリケーションを作成します。
Apache Flinkバージョンには1.13を指定して、アプリケーションには任意の名前をつけます。
この時に作成されるIAMロールは後ほど変更しますので、名前を覚えておきます。 また、今回はテストなので「アプリケーション設定のテンプレート」には「開発」を選択します。
アプリケーションが作成されたら「設定」から設定変更を行います。
4.4. アプリケーションの設定変更
「アプリケーションコードの場所」にはzipを格納したS3バケットとパスを追加します。
次にランタイムプロパティを変更します。
ここではアプリケーションで使用するライブラリ、入力ストリーム、出力先S3バケット等のプロパティを追加していきます。
input.stream.name
、output.stream.name
は先ほど作成したストリームと任意のS3バケットに変更してください。
グループID | キー | 値 |
---|---|---|
consumer.config.0 | input.stream.name | 入力ストリーム |
consumer.config.0 | aws.region | ap-northeast-1 |
consumer.config.0 | flink.stream.initpos | LATEST |
producer.config.0 | output.stream.name | S3バケット |
producer.config.0 | aws.region | ap-northeast-1 |
producer.config.0 | shard.count | 1 |
kinesis.analytics.flink.run.options | python | tumbling-windows.py |
kinesis.analytics.flink.run.options | jarfile | flink-sql-connector-kinesis_2.12-1.13.2.jar(※1) |
※1 チュートリアルで指定されているFlink Connectorのバージョンと2022年10月27日時点でダウンロードできるjarのバージョンが異なりますので、ランタイムプロパティで指定するConnectorのバージョンも「2.12-1.13.2」に変更してください(Feedbackは送信済み)。
4.5. IAMポリシーの編集
アプリケーションで使用するIAMロールのポリシーに下記を追加します。
入力元のストリームのARNとS3バケットの部分は書き換えてください。
,
{
"Sid": "ReadInputStream",
"Effect": "Allow",
"Action": "kinesis:*",
"Resource": "<ストリームのARN>"
},
{
"Sid": "WriteOutputS3",
"Effect": "Allow",
"Action": [
"s3:AbortMultipartUpload",
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:PutObject"
],
"Resource": [
"arn:aws:s3:::<S3バケット>",
"arn:aws:s3:::<S3バケット>/*"
]
}
5. 実行
アプリケーションの更新が終わったら「実行」を押します。
アプリケーションが実行中になったら「Apache Flinkダッシュボードを開く」をクリックするとFlinkのダッシュボードが確認できます。Running Jobsが1になっていて、Jobが動いていますね!
画面左側のRunning Jobsをクリックすると、Overviewのタブにプログラムの内容が表示されています。
データを投入
すこし長くなりましたが、これでアプリケーションの準備が整いました。
さっそくストリームにデータを入れてみます。
> python stock.py
{'TICKER': 'INTC', 'PRICE': 9.62, 'EVENT_TIME': '2022-10-27T16:01:37.321681'}
{'TICKER': 'INTC', 'PRICE': 79.37, 'EVENT_TIME': '2022-10-27T16:01:44.761172'}
{'TICKER': 'AAPL', 'PRICE': 36.03, 'EVENT_TIME': '2022-10-27T16:01:46.795298'}
{'TICKER': 'AAPL', 'PRICE': 14.09, 'EVENT_TIME': '2022-10-27T16:01:48.827209'}
{'TICKER': 'AMZN', 'PRICE': 84.46, 'EVENT_TIME': '2022-10-27T16:01:50.861608'}
しばらく待つとダッシュボードの表示が変わって処理済みのレコード数などが表示されてきました!
S3も確認してみると、アプリケーションから結果が届いています!
最後に
今回は公式チュートリアルを参考にKinesis Data Analytics Applicationをデプロイしてみました。ご紹介したリポジトリには、他にもさまざまなサンプルが用意されていますので、ぜひ気になったものを試してみてください。