Pythonのboto3を実行してAmazon Kinesis Data Firehoseを触ってみた

Pythonのboto3を実行してAmazon Kinesis Data Firehoseを触ってみた

Clock Icon2022.06.14

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

Prismatixオペレーションチーム の高橋隆太です。

Prismatixオペレーションチームに参画して、2ヶ月が経ちました。 Prismatixの構成や運用にはPythonのboto3と一部Kinesisが利用されています。

今回はPythonとAmazon Kinesis Data Firehoseを利用してPythonを実行して、 Kinesis Data Firehoseからs3にデータを送信するという基本的な内容を確認してみます。

今回の想定は下記となります。

Kinesis Data Firehoseの設定をする

前提

まずは任意のs3バケットの作成をします。今回は「firehose-takahashi」としておきます。 このサービスはs3のパケットの任意の場所に置けたり他に暗号化するかやファイルを圧縮するか 保存期間等を設定することが可能ですが、今回はそこまでせず動作だけ確認します。

Kinesis Data Firehoseの配信ストリームを作成

①マネージメントコンソールからKinesisを選択する 2022/6/6現在「Get started」の表示があったので「Kinesis Data Firehose」を選択し 「Create delivery stream」を押下する。

②Source部分には「Amazon Kinesis Data Streams」と「DirectPut」が選択できるのですが、データの加工等は不要なのでそのまま配信データを送信する「DirectPut」を選択します

Destinationは今回はs3です

Delivery stream nameは「test_stream」としておきます。

 

③「Transform and convert records」の項目ではデータを都合のよい感じに変換したりすることができるのですが、 今回は不要なので特に設定は変更せずどちらも「Disabled」のままにしておきます。

④「Destination settings」の項目ではどのs3のどこに配置するか聞かれていますが、今回も特に設定はしません。 s3バケットを事前に作成していたものを利用します。

⑤残りの項目では暗号化の有無やs3のより細かい設定をしますが、今回はもう「Create Delivery stream」で作成します。

 

⑥作成したものを確認

⑦同じ画面から「Start sending demo data」を押すとテストデータを流せるみたいなので流してみます。 startしてある程度時間が経ったら「Stop sending demo data」を押下します。

⑧s3にデータがあること、そしてダウンロードしたら先ほどのテストデータであることを確認できました。

 

Pythonでboto3 を実行できる環境とコード

今回は以下環境で確認

PC: MacBook Pro (13-inch, 2018)

OSバージョン: macOS Monterey(12.3.1)

Pythonバージョン: 3.10.4

boto3バージョン: 1.24.2

コードは以下


import boto3
from boto3.session import Session
import json

session = Session(
aws_access_key_id='XXXXXXXXXXXXXXXXX',
aws_secret_access_key='YYYYYYYYYYYYYYYY',
region_name='ap-northeast-1')

client = session.client('firehose')
stream_name = 'test_stream'

data_list = [
  {
	"user_id": 1,
	"user_name": "takahashi"
  },
  {
	"user_id": 2,
	"user_name": "ryuta"
  }
]

response = client.put_record(
DeliveryStreamName = stream_name,
Record={'Data': (json.dumps(data_list[0]) + "\n").encode()})

response = client.put_record(
DeliveryStreamName = stream_name,
Record={Data: (json.dumps(data_list[1]) + "\n").encode()})

実際にコマンドの実行

s3バケットから確認できました!

まとめ

Kinesis Data Firehoseをboto3から実行した手順を書いてみました。 boto3部分は好奇心でやってみたかっただけですが、Kinesis Data Firehoseの動きが確認できました。

CloudWatchのログをFirehoseを利用してs3に送信してログ解析に利用したり、 今回のboto3等を定期実行してElasticsearchに送信したり色々使い方がありそうですね。

他のKinesisサービスの基本的な動きも簡単に確認してみたいですね。 あとPythonのSDKのboto3かなり読みやすいですね。。。 Prismatixでも利用しているので、一度しっかりとコードを読んでみようかなと思いました。

以上 Prismatixオペレーションチーム の高橋隆太でした!

アノテーション株式会社について

アノテーション株式会社は、クラスメソッド社のグループ企業として「オペレーション・エクセレンス」を担える企業を目指してチャレンジを続けています。「らしく働く、らしく生きる」のスローガンを掲げ、様々な背景をもつ多様なメンバーが自由度の高い働き方を通してお客様へサービスを提供し続けてきました。現在当社では一緒に会社を盛り上げていただけるメンバーを募集中です。少しでもご興味あれば、アノテーション株式会社WEBサイトをご覧ください。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.