Kinesis Streamsと別リージョンのFirehoseをLambdaで連携させてみた

2016.09.12

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

AWSチームのすずきです。

AWSがストリーミングデータ向けプラットフォームとして提供するAmazon Kinesis。

2014年に現在のKinesis Streamsがリリースされた後、2015年10月にS3、Redshift、ElastiserchServiceへの自動エクスポートを備えたKinesis Firehose、 2016年8月にはCEP(Complex Event Processing:複合イベント処理)エンジンとしてKinesis Analyticsがリリースされました。

ただ2016年9月現在、東京リージョンで利用する事のできるKinesisはStreamsのみ。 Firehoseと、AnalyticsはUS(オレゴン、バージニア)、EU(アイルランド)リージョンを利用する必要があります。

今回、東京リージョンのKinesis Streamsに届くデータを、オレゴンリージョンのFirehoseとAnalyticsでの処理すべく、Lambdaを利用した連携を行う機会がありましたので、 その内容について紹介させて頂きます。

概要図

  • 東京リージョンにLambdaを設置し、Kinesis Streamsのイベントトリガとして、Kinesisに登録されたデータをオレゴンリージョンのFirehoseに転送を実現しました。

kinesis-firehose-01

設定手順

IAM設定

  • Lambda関数にAWSのIAM権限を付与するため、「AWS Lambda」用のAWSサービスロールを作成します。
  • LambdaのBluePrint、「kinesis-process-record-python」用のロールを元にfirehoseのIAM権限の付与を行いました。
  • 今回、検証用の環境であったため対象リソースはワイルドカードを利用しましたが、他システムと共存する環境では、適切なARN指定を行う事をおすすめします。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "lambda:InvokeFunction",
"Resource": "*"
},
{
"Effect": "Allow",
"Action": "kinesis:ListStreams",
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"kinesis:DescribeStream",
"kinesis:GetRecords",
"kinesis:GetShardIterator"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": "logs:CreateLogGroup",
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": [
"*"
]
},
{
"Effect": "Allow",
"Action": [
"firehose:DescribeDeliveryStream",
"firehose:ListDeliveryStreams",
"firehose:PutRecord",
"firehose:PutRecordBatch"
],
"Resource": [
"*"
]
}
]
}

Firehose

  • 今回、Kinesis Firehose はオレゴン(us-west-2)を利用しました。
  • Firehoseの設定は、希望するデータ出力先に応じて実施してください。

S3

Redshift

Elasticsearch

Lambda

ファンクション作成

  • 「Lambda」のAWSコンソールより、「Create a Lambda function」を実施します。
  • BluePrintの指定はSkipします。

トリガ設定

  • 連携元となる「Kinesis Streams」を指定します。
  • Batch Sizeは500、Put-Records APIの上限に合わせます
  • 1件のレコードサイズが平均2KBを超える場合、1回のバッチPut容量が1MB以下になる様にBatch Sizeを減らす必要があります。
  • Stating Posionは「Trim Horizon」、過去に登録されたレコード(標準では24時間)もFirehoseの転送対象とします。

kinesis-firehose-02

  • Enable trigger は後ほど有効とします。

ファンクション設定

  • 任意のファンクション名、説明コメントを記載します
  • 今回、「Python」を利用したため、Runtimeは「Python2.7」を指定します。

kinesis-firehose-03

  • 「Code entry type」は、「Edit code inline」下記のコードを利用しました。
  • 出力先のFirehose情報(region、DeliveryStreamname)は適宜修正します。
lambda-kinesis-event-stream-to-firehose.py

  • Handlerは「Python2.7」デフォルトをそのまま利用します。
  • Roleは、先に作成したロールを指定します
  • 割当メモリは、デフォルト(128MB)とし、稼働状況をみて増強するものとします。
  • 通常1回の処理、500ms程度ですが、タイムアウトは1分まで延長しました。
  • VPCは利用しません。

kinesis-firehose-04

トリガー有効化

  • 「Enable」操作を実施します。

kinesis-firehose-06

動作確認

イベントトリガのステータス

  • 「Last result: OK」となる事を確認します

kinesis-firehose-07

CloudwatchLogs

  • CloudwatchLogs で、異常出力のない事を確認します

kinesis-firehose-08

Monitoring

  • Errors、Throttlesが発生していない事を確認します

kinesis-firehose-09

S3 (Firehose)

  • Firehoseの出力先として設定したS3のファイル存在と内容確認を行います。

kinesis-firehose-10

AWS費用試算

下記のほぼ毎秒、小容量のデータが絶え間なく到着する環境で、LambdaによるFirehose転送を実施した場合の費用を試算した所、日額$0.036、月額1.1$程度の計算になりました。

Lambdaには無料枠(毎月最初の 1,000,000 件、400,000 GB-秒のコンピューティング時間)も存在するので、低コストでの維持が出来ると予想できます。

1日コスト

  • Lambda割当メモリ : 128MB
  • 平均実行時間 : 300ms
  • 1日実行回数 : 10万回
  • 1日転送バイト : 20MB

Lambda

  • ($0.000000208(100 ミリ秒単位の価格) × 3(300ミリ秒分) + ($0.0000002(1リクエスト価格) × 100,000(1日リクエスト数)) = $0.020000624

Kinesis Firehose

  • ($0.035 (1GB単価) × 0.02(GB) = $0.0007
  • US価格

ネットワーク

  • $0.02 /GB × 0.04(GB) = $0.0008
  • 別AWSリージョンのOutbound通信費

CloudwatchLogs

  • ($0.76 (1GB単価)) × 0.002(GB) = $0.0152

まとめ

Lambdaを利用する事で、東京リージョンのKinesis Streamsと海外のオレゴンリージョンのKinesis Firehoseを簡単に連携させる事ができました。

ログファイルの圧縮やRedshift展開まで、フルマネージドで実施してくれるKinesis Firehoseや、強力なCEPエンジンとして利用できるKinesis Analyticsの評価に お試しください。