GitHub EnterpriseをAWSで使おう – Amazon Kinesis StreamsとLambda関数を利用したログのアーカイブ

2016.12.26

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

こんにちは、中山です。

GitHub Enterprise(以下GHE) on AWSシリーズの第7弾です。今回はGHEのログをAmazon Kinesis Streams(以下Kinesis Streams)を利用してS3にアーカイブする方法をご紹介します。実施内容としてはAmazon Kinesis Firehose(以下Kinesis Firehose)を利用した第6弾目のエントリとほぼ同じです。

今回はKinesis FirehoseではなくKinesis StreamsとLambda関数を連携させてほぼ同等のことをしてみます。というのも執筆時点(2016/12/24)ではKinesis Firehoseが東京リージョンで利用できません。GHEは社内で利用するケースがほとんどだと思うので、日本のユーザが利用するリージョンも必然的に東京リージョンになると思います。上記理由がKinesis Firehoseの導入ネックになる可能性があります。そのため、今回は代替案の一つとしてご紹介させていただきます。

今回動作検証した主要なソフトウェア/OSのバージョンは以下の通りです。バージョンによって設定方法が変更される可能性があります。その点ご了承ください。

  • GHE: 2.8.4
  • Amazon Linux: 2016.09.1 (HVM), SSD Volume Type - ami-9be6f38c
  • fluentd: 0.12.29
  • aws-fluent-plugin-kinesis: 1.1.2
  • Serverless Framework: 1.4.0

構成

今回は以下のような構成を作成します。

image2

GHEからログをフォワーディングさせaws-fluent-plugin-kinesisで受け取るところまでは一緒です(利用するプラグインは kinesis_streams に変わっていますが)。ログの転送先としてKinesis Streamsを利用し、コンシューマとしてLambda関数を実装します。そこから最終的にS3へログをアーカイブするという流れです。

今回は弊社の以下エントリを参考に各種設定をしました。合わせて参照いただくと理解が深まると思います。

やってみる

前述のようにGHEとVPCなどのネットワーク周りの設定、及びプロデューサとしてのEC2構築までは以前のエントリと同じです。その部分については本エントリでは割愛させていただきます。ただし、以前はKinesis Firehoseを利用していたためEC2インスタンスに関連付けるIAM RoleにKinesis Firehoseのフルアクセス権限を付けていました。今回はKinesis Streamsなので arn:aws:iam::aws:policy/AmazonKinesisFullAccess を付けてください(もちろんインラインポリシーで権限を絞ることも可能です)。

今回は各種リソースの作成にAWS CLIを適宜利用しつつ、Kinesis Streams及びLambda関数の作成はServerless Frameworkを利用した方法をご紹介します。

1. Kinesis Streams及びLambda関数の作成

前述の通りServerless Frameworkを利用してこれらのリソースを作成していきます。最終的なディレクトリ構造は以下の通りです。

$ tree .
.
├── env
│   └── dev
│       ├── custom.yml
│       └── resources.yml
├── functions
│   └── dev
│       └── archive.py
└── serverless.yml

4 directories, 4 files

各ファイルの内容についてご紹介します。

  • serverless.yml

Serverless Frameworkの大本となる設定ファイルです。

service: s3-archive-with-kinesis-streams

provider:
  name: aws
  runtime: python2.7
  stage: dev
  region: ap-northeast-1
  memorySize: 128
  timeout: 10

custom: ${file(env/${self:provider.stage}/custom.yml)}

functions:
  archive:
    handler: functions/${self:provider.stage}/archive.handler
    role: IamPolicyLambdaExecution
    environment:
      S3Bucket:
        Ref: S3Bucket
      StreamName:
        Ref: KinesisStream
    events:
      - stream:
          arn: arn:aws:kinesis:ap-northeast-1:${self:custom.AccountId}:stream/${self:custom.StreamName}
          batchSize: 100
          startingPosition: LATEST
          enabled: true

resources: ${file(env/${self:provider.stage}/resources.yml)}

本当はEvent Source Mappingを以下のように書きたかったのですが、執筆時点(2016/12/24)ではServerless Frameworkのバグらしく、ARNをハードコードする必要があるようです。今後のアップデートに期待しましょう。こちらのPRがマージされたら上手く動作するかもしれません。

      - stream:
          arn:
            Fn::GetAtt:
              - KinesisStream
              - Arn
          batchSize: 100
          startingPosition: LATEST
          enabled: true

それに伴い AWS::Kinesis::Stream リソースの Name プロパティでストリームの名前を明示的に指定しています。こちらのドキュメントにあるように、このプロパティを指定した場合はリソースのアップデートではなくリプレースという動作になります。利用する際にはご注意ください。該当ドキュメントを引用しておきます。

Important

If you specify a name, you cannot do updates that require this resource to be replaced. You can still do updates that require no or some interruption. If you must replace the resource, specify a new name.

あと、Lambda関数に関連付けるIAM Roleの論理リソース名を IamPolicyLambdaExecution にしないとどうしても以下のようなエラーが出てしまいました。。。

  Serverless Error ---------------------------------------

     Template format error: Unresolved resource dependencies
     [IamPolicyLambdaExecution] in the Resources block of
     the template

.serverless 以下のCloudFormationテンプレートを確認すると存在しないはずのリソース IamPolicyLambdaExecutionDependsOn してしまっているためのようです。この辺り今後の改善が待たれますね。。。AWS Serverless Application Modelを使うという選択肢もありそうです。。。

  • env/dev/custom.yml

変数を格納しておくファイルです。 custom プロパティで serverless.yml に直接変数を書くこともできるのですが、アカウントIDなどのあまり公開したくない情報が含まれている場合はファイルに記述しておき、 .gitignore などでリポジトリの管理外にしておいた方が良いかと思います。

BucketName: <_YOUR_S3_BUCKET_NAME_>
StreamName: <_YOUR_KINESIS_STREAM_NAME_>
AccountId: <_YOUR_AWS_ACCOUNT_ID_>
  • env/dev/resources.yml

Lambda関数に関連付けるIAM RoleやKinesis Streamsなどを作成するCloudFormationテンプレートです。

---
AWSTemplateFormatVersion : '2010-09-09'
Description: S3 Archive with Kinesis Streams

Resources:
  S3Bucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: ${self:custom.BucketName}

  KinesisStream:
    Type: AWS::Kinesis::Stream
    Properties:
      Name: ${self:custom.StreamName}
      ShardCount: 1

  IamPolicyLambdaExecution:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Sid: LambdaAssumeRole
            Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: sts:AssumeRole
      Path: /
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole
      Policies:
        - PolicyName: LambdaPutS3Policy
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - s3:PutObject
                Resource:
                  - Fn::Join:
                      - ""
                      - - "arn:aws:s3:::"
                        - Ref: S3Bucket
                  - Fn::Join:
                      - ""
                      - - "arn:aws:s3:::"
                        - Ref: S3Bucket
                        - "/*"

今回はテスト用にシャード数を1にしていますが、実際に運用する際は適切な値に設定してください。こちらのエントリが参考になります。

  • functions/dev/archive.py

Kinesis StreamsのコンシューマとしてのLambda関数です。内容は参考にしたエントリ内のソースコードを拝借しています。環境変数でS3バケット名とストリーム名を渡しているためその部分などを主に変更しています。

from __future__ import print_function
import os
import uuid
import boto3
import base64
import datetime

S3_BUCKET = os.environ['S3Bucket']
S3_PREFIX = 'prefix_test'
STREAM_NAME = os.environ['StreamName']
SHARD_ID = 1
LINE_TERMINATOR = '\r\n'


def get_s3_key():
    today = datetime.datetime.utcnow()
    return "{}{}{}-{}-{}-{}".format(
           S3_PREFIX,
           today.strftime('%Y/%m/%d/%H/'),
           STREAM_NAME,
           SHARD_ID,
           today.strftime('%Y-%m-%d-%H-%S-%f')[:-4],
           uuid.uuid4())


def decode(payload):
    # TODO
    # e.g.) string to JSON
    return payload


def process(payload):
    # TODO
    # logic comes here
    return payload


def put_to_s3(body):
    s3_client = boto3.client('s3')
    s3_client.put_object(Body=body, Bucket=S3_BUCKET, Key=get_s3_key())


def handler(event, context):
    buff = []
    for record in event['Records']:
        #Kinesis data is base64 encoded so decode here
        payload=base64.b64decode(record['kinesis']['data'])
        print('Decoded payload: {}'.format(payload))
        buff.append(process(decode(payload)))
    put_to_s3(LINE_TERMINATOR.join(buff))
    return 'Successfully processed {} records.'.format(len(event['Records']))

それではデプロイしましょう。以下のコマンドを実行します。

# デプロイ
$ sls deploy -v
Serverless: Creating Stack...
Serverless: Checking Stack create progress...
<snip>

Lambda関数のEvent SourceとしてKinesis Streamsがマッピングされていることを確認してみます。以下のように表示されたらOKです。

$ aws lambda list-event-source-mappings
{
    "EventSourceMappings": [
        {
            "UUID": "************************************",
            "StateTransitionReason": "User action",
            "LastModified": 1482555120.0,
            "BatchSize": 100,
            "State": "Enabled",
            "FunctionArn": "arn:aws:lambda:ap-northeast-1:************:function:s3-archive-with-kinesis-streams-dev-archive",
            "EventSourceArn": "arn:aws:kinesis:ap-northeast-1:************:stream/<_YOUR_KINESIS_STREAM_NAME_>",
            "LastProcessingResult": "No records processed"
        }
    ]
}

この時点でKinesisから取得したデータを正常にS3へputできるか軽く確認してみましょう。

# レコードのput
$ aws kinesis put-record \
  --stream-name <_YOUR_KINESIS_STREAM_NAME_> \
  --data "This is a test. final" \
  --partition-key shardId-000000000000
{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "49568850636304694563098914338449631619959391121469079554"
}
# Lambda関数がInvokeされていることを確認
$ sls logs -f archive -l
START RequestId: 9d4cc68a-06d7-438a-b991-8ec29b64ca63 Version: $LATEST
Decoded payload: This is a test. final
END RequestId: 9d4cc68a-06d7-438a-b991-8ec29b64ca63
REPORT RequestId: 9d4cc68a-06d7-438a-b991-8ec29b64ca63  Duration: 2353.22 ms    Billed Duration: 2400 ms        Memory Size: 128 MB     Max Memory Used: 40 MB
# S3バケットにputしたレコードが出力されていることを確認
$ aws s3 ls s3://<_YOUR_S3_BUCKET_NAME_> --recursive
2016-12-24 14:04:19         21 prefix_test2016/12/24/05/<_YOUR_KINESIS_STREAM_NAME_>-1-2016-12-24-05-17-27-e59d2995-63bc-4c12-984d-0808341cffe0
$ aws s3 cp s3://<_YOUR_S3_BUCKET_NAME_>/prefix_test2016/12/24/05/<_YOUR_KINESIS_STREAM_NAME_>-1-2016-12-24-05-17-27-e59d2995-63bc-4c12-984d-0808341cffe0 -
This is a test. final%

上記のように出力されたら成功です。

2. aws-fluent-plugin-kinesisの設定

最後にプロデューサとなるEC2インスタンスのセットアップを行います。

# SSHログイン
$ ssh -i path/to/key ec2-user@<_YOUR_EC2_PUBLIC_IP_>
<snip>
# fluentdのインストール
[ec2-user@ip-172-31-14-94 ~]$ curl -L https://toolbelt.treasuredata.com/sh/install-redhat-td-agent2.sh | sh
<snip>
# aws-fluent-plugin-kinesisのインストール
[ec2-user@ip-172-31-14-94 td-agent]$ sudo td-agent-gem install fluent-plugin-kinesis
<snip>
# 設定ファイルの作成
[ec2-user@ip-172-31-14-94 ~]$ cd /etc/td-agent
[ec2-user@ip-172-31-14-94 td-agent]$ sudo mv -iv td-agent.conf td-agent.conf.orig
‘td-agent.conf’ -> ‘td-agent.conf.orig’
[ec2-user@ip-172-31-14-94 td-agent]$ cat <<'EOT' > td-agent.conf
> <source>
>   @type syslog
>   port 514
>   bind 0.0.0.0
>   tag ghe
> </source>
> <match ghe.**>
>   @type kinesis_streams
>   stream_name <_YOUR_KINESIS_STREAM_NAME_>
>   region ap-northeast-1
>   random_partition_key true
> </match>
> EOT
# UDP:514でListenさせるためにtd-agent実行ユーザとグループをrootに変更
[ec2-user@ip-172-31-14-94 ~]$ sudo sed -E -i \
  -e 's/(TD_AGENT_USER=)td-agent/\1root/' \
  -e 's/(TD_AGENT_GROUP=)td-agent/\1root/' \
  /etc/init.d/td-agent
# td-agentの起動
[ec2-user@ip-172-31-14-94 td-agent]$ sudo service td-agent start
td-agent td-agent:                                         [  OK  ]

しばらく待った後、以下のようにS3バケットにログが出力されていれば成功です。

$ aws s3 ls s3://<_YOUR_S3_BUCKET_NAME_> --recursive
2016-12-24 14:29:02       9250 prefix_test2016/12/24/05/<_YOUR_KINESIS_STREAM_NAME_>-1-2016-12-24-05-01-10-97eeec6c-73d7-42a4-9824-faaa04be3b9e
2016-12-24 14:04:19         21 prefix_test2016/12/24/05/<_YOUR_KINESIS_STREAM_NAME_>-1-2016-12-24-05-17-27-e59d2995-63bc-4c12-984d-0808341cffe0
2016-12-24 14:12:24         15 prefix_test2016/12/24/05/<_YOUR_KINESIS_STREAM_NAME_>-1-2016-12-24-05-22-97-75f7a0b5-ceaf-4626-b13b-f769e603646f
2016-12-24 14:30:01      16648 prefix_test2016/12/24/05/<_YOUR_KINESIS_STREAM_NAME_>-1-2016-12-24-05-59-89-e31d0f6a-b4dd-481c-81d0-9c3e12f962c8
2016-12-24 14:29:01      19728 prefix_test2016/12/24/05/<_YOUR_KINESIS_STREAM_NAME_>-1-2016-12-24-05-59-97-3d2c18c9-fd9b-41ae-ac8f-e33e7efaefc5
$ aws s3 cp s3:/<_YOUR_S3_BUCKET_NAME_>/prefix_test2016/12/24/05/<_YOUR_KINESIS_STREAM_NAME_>-1-2016-12-24-05-59-97-3d2c18c9-fd9b-41ae-ac8f-e33e7efaefc5 -
{"host":"******************************","ident":"syslog-ng","pid":"25086","message":"Syslog connection broken; fd='9', server='AF_INET(172.31.14.94:514)', time_reopen='60'"}
{"host":"******************************","ident":"enterprise_manage_unicorn","message":"[2016-12-24T05:27:02.854249 #5654]  INFO -- : 127.0.0.1 - - [24/Dec/2016 05:27:02] \"GET / HTTP/1.0\" 302 - 0.0006"}
{"host":"******************************","ident":"enterprise_manage_access","message":"- - [24/Dec/2016:05:27:02 +0000] \"GET / HTTP/1.0\" 302 44 \"-\" \"-\" \"-\" 0.001 0.001 ."}
<snip>

まとめ

いかがだったでしょうか。

Kinesis StreamsとLambda関数を連携させたログのアーカイブ方法についてご紹介しました。やはりLambda関数を自分で実装する必要がある分一手間かかるなという印象ですね。早くKinesis Firehose東京リージョンに来て欲しい!

本エントリがみなさんの参考になったら幸いに思います。