この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
re:invent 2015 で発表された Kinesis Firehose を使うと、管理画面の操作だけで Kinesis のデータを S3/Redshift に連携します。 Kiensis Firehose を使うとサーバーレスになる上、Kinesis Stream のシャードやイテレーターなどの面倒な操作から開発者を解放してくれます。
ただ残念なことに
- US East (N. Virginia)
- US West (Oregon)
- EU (Ireland)
のリージョンにしか対応しておらず、東京リージョンではまだ使えません。
仕方がないので Kinesis Stream と Lambda で同等の機能を実装してみました。
Kinesis Firehose の S3 連携について
Kinesis Firehose の設定画面は下図のようになっています。
- 連携先のバケット名(S3 bucket)
- S3のキーのプリフィックス(S3 prefix)
を指定すると、S3 バケットに次のように連携されます。
.
└── S3Prefix2015
└── 10
└── 16
├── 22
│ ├── StreamName-1-2015-10-16-22-52-25-ea7c6625-b9d1-413b-ac23-563825c0f8b7
│ └── StreamName-1-2015-10-16-22-57-26-1f89f472-28da-47ac-a6a5-362e0250e6db
└── 23
└── StreamName-4-2015-10-16-23-02-27-ed8fc4d7-f7cd-42e4-8417-dca6602f0b70
S3 オブジェクトのキー全体は
S3Prefix2015/10/16/22/StreamName-1-2015-10-16-22-52-25-ea7c6625-b9d1-413b-ac23-563825c0f8b7
というようになっており、
{S3 Prefix}YYYY/MM/DD/HH/{Firehose Stream Name}-{Shard Id?}-YYYY-MM-DD-HH-MM-SS-FF-{Hash}
というような形をしています。
- Shard Id? と思しき箇所は正確には何が使われているのかは不明です。
- Hash で何が使われているのかは不明です。
Lambda の Kinesis -> S3 連携について
実装方針
今回は Firehose の S3 連携処理を、データソースを Kinesis Stream にして Lambda を使って実現します。
- 連携先S3バケット名
- S3 Prefix
- Firehose Stream Name
- Shard Id
は Lambda 関数内で決め打ちし、Hash 関数には uuid v4 を採用します。
Lambda 関数について
Lambda のランタイムには re:invent 2015 ぽさを醸し出すために Python を使います。
Lambda 関数に Kinesis Stream のブループリントがすでに存在するため
- S3 キーの生成
- S3 への Put
を追加したのが以下です。
# vim: set fileencoding=utf8 :
import base64
import datetime
import uuid
import boto3
S3_BUCKET = 'YOUR_BUCKET_NAME'
S3_PREFIX = 'prefix_test'
STREAM_NAME = 'Stream_Name'
SHARD_ID = 1
LINE_TERMINATOR = '\r\n'
def get_s3_key():
today = datetime.datetime.utcnow()
return "{}{}{}-{}-{}-{}".format(
S3_PREFIX,
today.strftime('%Y/%m/%d/%H/'),
STREAM_NAME,
SHARD_ID,
today.strftime('%Y-%m-%d-%H-%S-%f')[:-4],
uuid.uuid4())
def decode(payload):
# TODO
# e.g.) string to JSON
return payload
def process(payload):
# TODO
# logic comes here
return payload
def put_to_s3(body):
s3_client = boto3.client('s3')
s3_client.put_object(Body=body, Bucket=S3_BUCKET, Key=get_s3_key())
def lambda_handler(event, context):
buff = []
for record in event['Records']:
#Kinesis data is base64 encoded so decode here
payload=base64.b64decode(record["kinesis"]["data"])
print "Decoded payload: " + payload
buff.append(process(decode(payload)))
put_to_s3(LINE_TERMINATOR.join(buff))
return 'Successfully processed {} records.'.format(len(event['Records']))
Kinesis Firehose と同じく、Kinesis のデータを加工せずに S3 に PutObject しているだけですが
- 文字列を他のフォーマット(JSONなど)に変換する関数(decode)
- データ処理する関数(process)
も用意しています。
このLambda関数の登録方法はドキュメントを参照ください。
利用しているモジュールのうち boto3 だけは標準ライブラリではありません。しかし、boto3 は Lambda の Python ランタイムにはインストール済みのため、boto3 モジュールの登録は不要です。
S3 連携を確認
Kinesis Stream にレコード追加
CLI を利用して Kinesis Stream にレコード追加します。
$ aws kinesis put-record --stream-name StreamName --partition-key 1 --data 1,2,3
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49555444218629747788391510344658619821542396257728724994"
}
$ aws kinesis put-record --stream-name StreamName --partition-key 2 --data 'hello world!'
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49555444218629747788391510344659828747362011642817675266"
}
S3 バケットの確認
連携先 S3 バケットをローカルに持ってきて中身を確認します。
$ aws s3 sync s3://BUCKET_NAME BUCKET_NAME
$ tree BUCKET_NAME
BUCKET_NAME
└── prefix_test2015
└── 10
└── 17
└── 00
├── StreamName-1-2015-10-17-00-01-99-bb8faeb8-8ae5-4e54-87b1-b0d00c773f5b
└── StreamName-1-2015-10-17-00-11-67-50b7087e-6e57-40bc-a99d-a4e2277e70b2
4 directories, 2 files
ファイルの中身も確認してみましょう。
$ cat BUCKET_NAME/prefix_test2015/10/17/00/StreamName-1-2015-10-17-00-01-99-bb8faeb8-8ae5-4e54-87b1-b0d00c773f5b
1,2,3
$ cat BUCKET_NAME/prefix_test2015/10/17/00/StreamName-1-2015-10-17-00-11-67-50b7087e-6e57-40bc-a99d-a4e2277e70b2
hello world!
Kinesis に Put したデータが無事 S3 に連携されていますね。
まとめ
この程度の処理であれば Lambda 関数を書くのもそれほど手間ではありませんが、さっさと Kinesis Firehose が東京リージョンにやってきて、画面ポチだけで S3 連携されるようになって欲しいですね!