この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
1 はじめに
IoT事業部の平内(SIN)です。
ストリームに流れるレコードを、そのままS3へ保存するような場合、Amazon Kinesis Data Firehoseが非常に便利に利用されると思います。そして、レコードが、JSONのようなテキスト形式だった場合に、レコード毎の区切りが分かるように改行等を入れて保存すると、利用しやすいかも知れません。
このことから、IoT Coreのルールでは、Amazon Kinesis Data Firehoseへ送る際、改行等のセパレータが指定可能になっています。
今回は、Amazon Kinesis Data Streamsを入力ソースとしたAmazon Kinesis Data Firehoseで、S3へ保存する前に、レコード毎に改行を挿入するLambdaを作成してみました。
レコードデータは、手元のMacからboto3で送信しています。
2 Lambda
(1) コード
Lamnbdaのコードは、以下の通りです。
Lambdaには、複数のレコードが到着しますが、レコード毎にデコードして、改行を追加し、再びエンコードします。Lambdaのレスポンスとしては、受信時に取得したRecordId と result を付与したものになります。
今回は、改行の挿入だけですが、ここでは、レコードの変換を自由に行うことができます。また、レコードがJSON形式の場合は、一旦、パースして、JSONとしてその内容を扱うことも可能です。
import json
import base64
def lambda_handler(event, context):
results = []
records = event["records"]
for record in records:
recordId = record["recordId"]
data = record["data"]
# デコード
decoded_data = base64.b64decode(data).decode("utf-8")
# JSONで処理する場合は、jsonへのパースを行う
#payload = json.loads(decoded_data)
#decoded_data = json.dumps(payload)
# 改行を追加する
decoded_data = decoded_data + '\n';
# エンコード
data = base64.b64encode(decoded_data.encode())
results.append({
"result":"Ok",
"recordId":recordId,
"data":data
})
return {
"records":results
}
(2) タイムアウト
Lambdaのタイムアウトは、最低1分としないと、アラートが出てました。
(3) アクセス権
Lambdaには、特にアクセス権を追加する必要はありません。
3 Kinesis Data Firehose
(1) Transform source records with AWS Lambda
Source record transformationをEnableとし、作成したLambdaを設定します。
(2) Amazon S3 destination
送り先は、S3バケットとし、今回は、バッファサイズ、1MiB、インターバルは、60secとしています。
4 動作確認
動作確認のために使用したコードは以下の通りです。手元のMacから20レコードほど送信しました。
import json
import random
import datetime
from boto3.session import Session
profile = 'developer'
session = Session(profile_name = profile)
kinesis = session.client('kinesis')
stream_name = 'test'
for i in range(20):
data = {
"index": i,
"timestamp": datetime.datetime.now().isoformat(sep=' ', timespec='milliseconds'),
"message": "hello"
}
try:
response = kinesis.put_record(
StreamName = stream_name,
PartitionKey = str(random.randrange(0,100)),
Data = json.dumps(data)
)
except Exception as e:
print("Exception: {}", e.args)
print('put_record SequenceNumber:{}'.format(response['SequenceNumber']))
S3バケットに保存されたデータです。
中身を確認すると、各レコードごと改行されていることが確認できます。
5 最後に
今回は、Amazon Kinesis Data Firehoseで、Lambdaによるレコードへの改行追加を行ってみました。
Pythonによる、Transform source records with AWS Lambdaのコードスニペットとして、個人的には重宝しそうです。