Kinesis FirehoseのS3連携をKinesis StreamとLambdaで実装する #reinvent

2015.10.17

re:invent 2015 で発表された Kinesis Firehose を使うと、管理画面の操作だけで Kinesis のデータを S3/Redshift に連携します。 Kiensis Firehose を使うとサーバーレスになる上、Kinesis Stream のシャードやイテレーターなどの面倒な操作から開発者を解放してくれます。

ただ残念なことに

  • US East (N. Virginia)
  • US West (Oregon)
  • EU (Ireland)

のリージョンにしか対応しておらず、東京リージョンではまだ使えません。

仕方がないので Kinesis Stream と Lambda で同等の機能を実装してみました。

Kinesis Firehose の S3 連携について

Kinesis Firehose の設定画面は下図のようになっています。

aws-kinesis-firehose-configuration

  • 連携先のバケット名(S3 bucket)
  • S3のキーのプリフィックス(S3 prefix)

を指定すると、S3 バケットに次のように連携されます。

.
└── S3Prefix2015
    └── 10
        └── 16
            ├── 22
            │   ├── StreamName-1-2015-10-16-22-52-25-ea7c6625-b9d1-413b-ac23-563825c0f8b7
            │   └── StreamName-1-2015-10-16-22-57-26-1f89f472-28da-47ac-a6a5-362e0250e6db
            └── 23
                └── StreamName-4-2015-10-16-23-02-27-ed8fc4d7-f7cd-42e4-8417-dca6602f0b70

S3 オブジェクトのキー全体は

S3Prefix2015/10/16/22/StreamName-1-2015-10-16-22-52-25-ea7c6625-b9d1-413b-ac23-563825c0f8b7

というようになっており、

{S3 Prefix}YYYY/MM/DD/HH/{Firehose Stream Name}-{Shard Id?}-YYYY-MM-DD-HH-MM-SS-FF-{Hash}

というような形をしています。

  • Shard Id? と思しき箇所は正確には何が使われているのかは不明です。
  • Hash で何が使われているのかは不明です。

Lambda の Kinesis -> S3 連携について

実装方針

今回は Firehose の S3 連携処理を、データソースを Kinesis Stream にして Lambda を使って実現します。

  • 連携先S3バケット名
  • S3 Prefix
  • Firehose Stream Name
  • Shard Id

は Lambda 関数内で決め打ちし、Hash 関数には uuid v4 を採用します。

Lambda 関数について

Lambda のランタイムには re:invent 2015 ぽさを醸し出すために Python を使います。

Lambda 関数に Kinesis Stream のブループリントがすでに存在するため

  • S3 キーの生成
  • S3 への Put

を追加したのが以下です。

# vim: set fileencoding=utf8 :

import base64
import datetime
import uuid
import boto3

S3_BUCKET = 'YOUR_BUCKET_NAME'
S3_PREFIX = 'prefix_test'
STREAM_NAME = 'Stream_Name'
SHARD_ID = 1
LINE_TERMINATOR = '\r\n'

def get_s3_key():
    today = datetime.datetime.utcnow()
    return "{}{}{}-{}-{}-{}".format(
           S3_PREFIX,
           today.strftime('%Y/%m/%d/%H/'),
           STREAM_NAME,
           SHARD_ID,
           today.strftime('%Y-%m-%d-%H-%S-%f')[:-4],
           uuid.uuid4())

def decode(payload):
    # TODO
    # e.g.) string to JSON
    return payload
    
def process(payload):
    # TODO
    # logic comes here
    return payload

def put_to_s3(body):
    s3_client = boto3.client('s3')
    s3_client.put_object(Body=body, Bucket=S3_BUCKET, Key=get_s3_key())

def lambda_handler(event, context):
    buff = []
    for record in event['Records']:
        #Kinesis data is base64 encoded so decode here
        payload=base64.b64decode(record["kinesis"]["data"])
        print "Decoded payload: " + payload
        buff.append(process(decode(payload)))
    put_to_s3(LINE_TERMINATOR.join(buff))
    return 'Successfully processed {} records.'.format(len(event['Records']))

Kinesis Firehose と同じく、Kinesis のデータを加工せずに S3 に PutObject しているだけですが

  • 文字列を他のフォーマット(JSONなど)に変換する関数(decode)
  • データ処理する関数(process)

も用意しています。

このLambda関数の登録方法はドキュメントを参照ください

利用しているモジュールのうち boto3 だけは標準ライブラリではありません。しかし、boto3 は Lambda の Python ランタイムにはインストール済みのため、boto3 モジュールの登録は不要です。

S3 連携を確認

Kinesis Stream にレコード追加

CLI を利用して Kinesis Stream にレコード追加します。

$ aws kinesis put-record --stream-name StreamName --partition-key 1 --data 1,2,3
{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "49555444218629747788391510344658619821542396257728724994"
}
$ aws kinesis put-record --stream-name StreamName --partition-key 2 --data 'hello world!'
{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "49555444218629747788391510344659828747362011642817675266"
}

S3 バケットの確認

連携先 S3 バケットをローカルに持ってきて中身を確認します。

$ aws s3 sync s3://BUCKET_NAME BUCKET_NAME
$ tree BUCKET_NAME
BUCKET_NAME
└── prefix_test2015
    └── 10
        └── 17
            └── 00
                ├── StreamName-1-2015-10-17-00-01-99-bb8faeb8-8ae5-4e54-87b1-b0d00c773f5b
                └── StreamName-1-2015-10-17-00-11-67-50b7087e-6e57-40bc-a99d-a4e2277e70b2

4 directories, 2 files

ファイルの中身も確認してみましょう。

$ cat BUCKET_NAME/prefix_test2015/10/17/00/StreamName-1-2015-10-17-00-01-99-bb8faeb8-8ae5-4e54-87b1-b0d00c773f5b
1,2,3
$ cat BUCKET_NAME/prefix_test2015/10/17/00/StreamName-1-2015-10-17-00-11-67-50b7087e-6e57-40bc-a99d-a4e2277e70b2
hello world!

Kinesis に Put したデータが無事 S3 に連携されていますね。

まとめ

この程度の処理であれば Lambda 関数を書くのもそれほど手間ではありませんが、さっさと Kinesis Firehose が東京リージョンにやってきて、画面ポチだけで S3 連携されるようになって欲しいですね!