この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
はじめに
こんにちは、中山です。
GitHub Enterprise(以下GHE) on AWSシリーズの第7弾です。今回はGHEのログをAmazon Kinesis Streams(以下Kinesis Streams)を利用してS3にアーカイブする方法をご紹介します。実施内容としてはAmazon Kinesis Firehose(以下Kinesis Firehose)を利用した第6弾目のエントリとほぼ同じです。
今回はKinesis FirehoseではなくKinesis StreamsとLambda関数を連携させてほぼ同等のことをしてみます。というのも執筆時点(2016/12/24)ではKinesis Firehoseが東京リージョンで利用できません。GHEは社内で利用するケースがほとんどだと思うので、日本のユーザが利用するリージョンも必然的に東京リージョンになると思います。上記理由がKinesis Firehoseの導入ネックになる可能性があります。そのため、今回は代替案の一つとしてご紹介させていただきます。
今回動作検証した主要なソフトウェア/OSのバージョンは以下の通りです。バージョンによって設定方法が変更される可能性があります。その点ご了承ください。
- GHE: 2.8.4
- Amazon Linux: 2016.09.1 (HVM), SSD Volume Type - ami-9be6f38c
- fluentd: 0.12.29
- aws-fluent-plugin-kinesis: 1.1.2
- Serverless Framework: 1.4.0
構成
今回は以下のような構成を作成します。
GHEからログをフォワーディングさせaws-fluent-plugin-kinesisで受け取るところまでは一緒です(利用するプラグインは kinesis_streams
に変わっていますが)。ログの転送先としてKinesis Streamsを利用し、コンシューマとしてLambda関数を実装します。そこから最終的にS3へログをアーカイブするという流れです。
今回は弊社の以下エントリを参考に各種設定をしました。合わせて参照いただくと理解が深まると思います。
- Kinesis FirehoseのS3連携をKinesis StreamとLambdaで実装する #reinvent
- fluent-plugin-kinesisでKinesis Streamsにログを送信する
やってみる
前述のようにGHEとVPCなどのネットワーク周りの設定、及びプロデューサとしてのEC2構築までは以前のエントリと同じです。その部分については本エントリでは割愛させていただきます。ただし、以前はKinesis Firehoseを利用していたためEC2インスタンスに関連付けるIAM RoleにKinesis Firehoseのフルアクセス権限を付けていました。今回はKinesis Streamsなので arn:aws:iam::aws:policy/AmazonKinesisFullAccess
を付けてください(もちろんインラインポリシーで権限を絞ることも可能です)。
今回は各種リソースの作成にAWS CLIを適宜利用しつつ、Kinesis Streams及びLambda関数の作成はServerless Frameworkを利用した方法をご紹介します。
1. Kinesis Streams及びLambda関数の作成
前述の通りServerless Frameworkを利用してこれらのリソースを作成していきます。最終的なディレクトリ構造は以下の通りです。
$ tree .
.
├── env
│ └── dev
│ ├── custom.yml
│ └── resources.yml
├── functions
│ └── dev
│ └── archive.py
└── serverless.yml
4 directories, 4 files
各ファイルの内容についてご紹介します。
serverless.yml
Serverless Frameworkの大本となる設定ファイルです。
service: s3-archive-with-kinesis-streams
provider:
name: aws
runtime: python2.7
stage: dev
region: ap-northeast-1
memorySize: 128
timeout: 10
custom: ${file(env/${self:provider.stage}/custom.yml)}
functions:
archive:
handler: functions/${self:provider.stage}/archive.handler
role: IamPolicyLambdaExecution
environment:
S3Bucket:
Ref: S3Bucket
StreamName:
Ref: KinesisStream
events:
- stream:
arn: arn:aws:kinesis:ap-northeast-1:${self:custom.AccountId}:stream/${self:custom.StreamName}
batchSize: 100
startingPosition: LATEST
enabled: true
resources: ${file(env/${self:provider.stage}/resources.yml)}
本当はEvent Source Mappingを以下のように書きたかったのですが、執筆時点(2016/12/24)ではServerless Frameworkのバグらしく、ARNをハードコードする必要があるようです。今後のアップデートに期待しましょう。こちらのPRがマージされたら上手く動作するかもしれません。
- stream:
arn:
Fn::GetAtt:
- KinesisStream
- Arn
batchSize: 100
startingPosition: LATEST
enabled: true
それに伴い AWS::Kinesis::Stream
リソースの Name
プロパティでストリームの名前を明示的に指定しています。こちらのドキュメントにあるように、このプロパティを指定した場合はリソースのアップデートではなくリプレースという動作になります。利用する際にはご注意ください。該当ドキュメントを引用しておきます。
Important
If you specify a name, you cannot do updates that require this resource to be replaced. You can still do updates that require no or some interruption. If you must replace the resource, specify a new name.
あと、Lambda関数に関連付けるIAM Roleの論理リソース名を IamPolicyLambdaExecution
にしないとどうしても以下のようなエラーが出てしまいました。。。
Serverless Error ---------------------------------------
Template format error: Unresolved resource dependencies
[IamPolicyLambdaExecution] in the Resources block of
the template
.serverless
以下のCloudFormationテンプレートを確認すると存在しないはずのリソース IamPolicyLambdaExecution
を DependsOn
してしまっているためのようです。この辺り今後の改善が待たれますね。。。AWS Serverless Application Modelを使うという選択肢もありそうです。。。
env/dev/custom.yml
変数を格納しておくファイルです。 custom
プロパティで serverless.yml
に直接変数を書くこともできるのですが、アカウントIDなどのあまり公開したくない情報が含まれている場合はファイルに記述しておき、 .gitignore
などでリポジトリの管理外にしておいた方が良いかと思います。
BucketName: <_YOUR_S3_BUCKET_NAME_>
StreamName: <_YOUR_KINESIS_STREAM_NAME_>
AccountId: <_YOUR_AWS_ACCOUNT_ID_>
env/dev/resources.yml
Lambda関数に関連付けるIAM RoleやKinesis Streamsなどを作成するCloudFormationテンプレートです。
---
AWSTemplateFormatVersion : '2010-09-09'
Description: S3 Archive with Kinesis Streams
Resources:
S3Bucket:
Type: AWS::S3::Bucket
Properties:
BucketName: ${self:custom.BucketName}
KinesisStream:
Type: AWS::Kinesis::Stream
Properties:
Name: ${self:custom.StreamName}
ShardCount: 1
IamPolicyLambdaExecution:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Sid: LambdaAssumeRole
Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
Path: /
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole
Policies:
- PolicyName: LambdaPutS3Policy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- s3:PutObject
Resource:
- Fn::Join:
- ""
- - "arn:aws:s3:::"
- Ref: S3Bucket
- Fn::Join:
- ""
- - "arn:aws:s3:::"
- Ref: S3Bucket
- "/*"
今回はテスト用にシャード数を1にしていますが、実際に運用する際は適切な値に設定してください。こちらのエントリが参考になります。
functions/dev/archive.py
Kinesis StreamsのコンシューマとしてのLambda関数です。内容は参考にしたエントリ内のソースコードを拝借しています。環境変数でS3バケット名とストリーム名を渡しているためその部分などを主に変更しています。
from __future__ import print_function
import os
import uuid
import boto3
import base64
import datetime
S3_BUCKET = os.environ['S3Bucket']
S3_PREFIX = 'prefix_test'
STREAM_NAME = os.environ['StreamName']
SHARD_ID = 1
LINE_TERMINATOR = '\r\n'
def get_s3_key():
today = datetime.datetime.utcnow()
return "{}{}{}-{}-{}-{}".format(
S3_PREFIX,
today.strftime('%Y/%m/%d/%H/'),
STREAM_NAME,
SHARD_ID,
today.strftime('%Y-%m-%d-%H-%S-%f')[:-4],
uuid.uuid4())
def decode(payload):
# TODO
# e.g.) string to JSON
return payload
def process(payload):
# TODO
# logic comes here
return payload
def put_to_s3(body):
s3_client = boto3.client('s3')
s3_client.put_object(Body=body, Bucket=S3_BUCKET, Key=get_s3_key())
def handler(event, context):
buff = []
for record in event['Records']:
#Kinesis data is base64 encoded so decode here
payload=base64.b64decode(record['kinesis']['data'])
print('Decoded payload: {}'.format(payload))
buff.append(process(decode(payload)))
put_to_s3(LINE_TERMINATOR.join(buff))
return 'Successfully processed {} records.'.format(len(event['Records']))
それではデプロイしましょう。以下のコマンドを実行します。
# デプロイ
$ sls deploy -v
Serverless: Creating Stack...
Serverless: Checking Stack create progress...
<snip>
Lambda関数のEvent SourceとしてKinesis Streamsがマッピングされていることを確認してみます。以下のように表示されたらOKです。
$ aws lambda list-event-source-mappings
{
"EventSourceMappings": [
{
"UUID": "************************************",
"StateTransitionReason": "User action",
"LastModified": 1482555120.0,
"BatchSize": 100,
"State": "Enabled",
"FunctionArn": "arn:aws:lambda:ap-northeast-1:************:function:s3-archive-with-kinesis-streams-dev-archive",
"EventSourceArn": "arn:aws:kinesis:ap-northeast-1:************:stream/<_YOUR_KINESIS_STREAM_NAME_>",
"LastProcessingResult": "No records processed"
}
]
}
この時点でKinesisから取得したデータを正常にS3へputできるか軽く確認してみましょう。
# レコードのput
$ aws kinesis put-record \
--stream-name <_YOUR_KINESIS_STREAM_NAME_> \
--data "This is a test. final" \
--partition-key shardId-000000000000
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49568850636304694563098914338449631619959391121469079554"
}
# Lambda関数がInvokeされていることを確認
$ sls logs -f archive -l
START RequestId: 9d4cc68a-06d7-438a-b991-8ec29b64ca63 Version: $LATEST
Decoded payload: This is a test. final
END RequestId: 9d4cc68a-06d7-438a-b991-8ec29b64ca63
REPORT RequestId: 9d4cc68a-06d7-438a-b991-8ec29b64ca63 Duration: 2353.22 ms Billed Duration: 2400 ms Memory Size: 128 MB Max Memory Used: 40 MB
# S3バケットにputしたレコードが出力されていることを確認
$ aws s3 ls s3://<_YOUR_S3_BUCKET_NAME_> --recursive
2016-12-24 14:04:19 21 prefix_test2016/12/24/05/<_YOUR_KINESIS_STREAM_NAME_>-1-2016-12-24-05-17-27-e59d2995-63bc-4c12-984d-0808341cffe0
$ aws s3 cp s3://<_YOUR_S3_BUCKET_NAME_>/prefix_test2016/12/24/05/<_YOUR_KINESIS_STREAM_NAME_>-1-2016-12-24-05-17-27-e59d2995-63bc-4c12-984d-0808341cffe0 -
This is a test. final%
上記のように出力されたら成功です。
2. aws-fluent-plugin-kinesisの設定
最後にプロデューサとなるEC2インスタンスのセットアップを行います。
# SSHログイン
$ ssh -i path/to/key ec2-user@<_YOUR_EC2_PUBLIC_IP_>
<snip>
# fluentdのインストール
[ec2-user@ip-172-31-14-94 ~]$ curl -L https://toolbelt.treasuredata.com/sh/install-redhat-td-agent2.sh | sh
<snip>
# aws-fluent-plugin-kinesisのインストール
[ec2-user@ip-172-31-14-94 td-agent]$ sudo td-agent-gem install fluent-plugin-kinesis
<snip>
# 設定ファイルの作成
[ec2-user@ip-172-31-14-94 ~]$ cd /etc/td-agent
[ec2-user@ip-172-31-14-94 td-agent]$ sudo mv -iv td-agent.conf td-agent.conf.orig
‘td-agent.conf’ -> ‘td-agent.conf.orig’
[ec2-user@ip-172-31-14-94 td-agent]$ cat <<'EOT' > td-agent.conf
> <source>
> @type syslog
> port 514
> bind 0.0.0.0
> tag ghe
> </source>
> <match ghe.**>
> @type kinesis_streams
> stream_name <_YOUR_KINESIS_STREAM_NAME_>
> region ap-northeast-1
> random_partition_key true
> </match>
> EOT
# UDP:514でListenさせるためにtd-agent実行ユーザとグループをrootに変更
[ec2-user@ip-172-31-14-94 ~]$ sudo sed -E -i \
-e 's/(TD_AGENT_USER=)td-agent/\1root/' \
-e 's/(TD_AGENT_GROUP=)td-agent/\1root/' \
/etc/init.d/td-agent
# td-agentの起動
[ec2-user@ip-172-31-14-94 td-agent]$ sudo service td-agent start
td-agent td-agent: [ OK ]
しばらく待った後、以下のようにS3バケットにログが出力されていれば成功です。
$ aws s3 ls s3://<_YOUR_S3_BUCKET_NAME_> --recursive
2016-12-24 14:29:02 9250 prefix_test2016/12/24/05/<_YOUR_KINESIS_STREAM_NAME_>-1-2016-12-24-05-01-10-97eeec6c-73d7-42a4-9824-faaa04be3b9e
2016-12-24 14:04:19 21 prefix_test2016/12/24/05/<_YOUR_KINESIS_STREAM_NAME_>-1-2016-12-24-05-17-27-e59d2995-63bc-4c12-984d-0808341cffe0
2016-12-24 14:12:24 15 prefix_test2016/12/24/05/<_YOUR_KINESIS_STREAM_NAME_>-1-2016-12-24-05-22-97-75f7a0b5-ceaf-4626-b13b-f769e603646f
2016-12-24 14:30:01 16648 prefix_test2016/12/24/05/<_YOUR_KINESIS_STREAM_NAME_>-1-2016-12-24-05-59-89-e31d0f6a-b4dd-481c-81d0-9c3e12f962c8
2016-12-24 14:29:01 19728 prefix_test2016/12/24/05/<_YOUR_KINESIS_STREAM_NAME_>-1-2016-12-24-05-59-97-3d2c18c9-fd9b-41ae-ac8f-e33e7efaefc5
$ aws s3 cp s3:/<_YOUR_S3_BUCKET_NAME_>/prefix_test2016/12/24/05/<_YOUR_KINESIS_STREAM_NAME_>-1-2016-12-24-05-59-97-3d2c18c9-fd9b-41ae-ac8f-e33e7efaefc5 -
{"host":"******************************","ident":"syslog-ng","pid":"25086","message":"Syslog connection broken; fd='9', server='AF_INET(172.31.14.94:514)', time_reopen='60'"}
{"host":"******************************","ident":"enterprise_manage_unicorn","message":"[2016-12-24T05:27:02.854249 #5654] INFO -- : 127.0.0.1 - - [24/Dec/2016 05:27:02] \"GET / HTTP/1.0\" 302 - 0.0006"}
{"host":"******************************","ident":"enterprise_manage_access","message":"- - [24/Dec/2016:05:27:02 +0000] \"GET / HTTP/1.0\" 302 44 \"-\" \"-\" \"-\" 0.001 0.001 ."}
<snip>
まとめ
いかがだったでしょうか。
Kinesis StreamsとLambda関数を連携させたログのアーカイブ方法についてご紹介しました。やはりLambda関数を自分で実装する必要がある分一手間かかるなという印象ですね。早くKinesis Firehose東京リージョンに来て欲しい!
本エントリがみなさんの参考になったら幸いに思います。