[Boto3 Adv-Cal DAY13]CloudTrailのログ転送設定を行ってみた。

boto3 で楽しむ AWS PythonLife 一人AdventCalendarです。13日目はCloudTrailにてS3へのログ出力設定をやってみました。
2018.12.13

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

boto3 で楽しむ AWS PythonLife 一人AdventCalendarです。

boto3のドキュメントを通して、サービス別にどういった事が出来るのかを理解したり、管理コンソールを通さずにTerminalだけで完結できるように検証していくことが目的になります。

13日目はCloudTrailのS3へのログ転送設定を行ってみます。

目次

boto3を通してCloudTrailでできること

ドキュメントは下記リンク先です。

ざっくりと以下のことができます。

  • 指定条件でのログをS3へ転送するTrail設定の操作(作成・削除・更新)
  • Trailへのタグ操作(追加・削除)
  • Trailのロギング操作(開始・中断)
  • Trailが動作するイベント設定操作

今回の手順

以下の内容を実施します。

  1. Trailの作成
  2. Trailへのイベント設定追加
  3. ロギング開始

必要な転送先バケットの作成とバケットポリシーについては、バケットポリシーの入力がTerminal上ではし難いこともあるため、管理コンソール上から新規バケットを作成した上で「アクセス権限 > バケットポリシー」にて行ってください。

S3へのBucket作成

ログ転送用のBucketです。特に名称に指定はありません。リージョンも特に制約はありませんが、念の為「アジアパシフィック(東京)」にしておきます。

BucketPolicyの設定

AWS Policy Generatorを利用して生成可能です。

入力すべき値が不明な場合は、AWSドキュメントのバケットのポリシーより、S3 バケットポリシーのサンプルを弄って利用します。

サンプルを使う場合、変更すべき点は以下の箇所です。

  • Sid
  • Resource

Sid

サンプルの指定値前にバケット名を含める等して、重複を防いでください。

Resource

Actionがs3:PutObjectのResourceについては、とりあえず以下の内容にしておきます。

arn:aws:s3:::<BucketName>/*

実行手順

% python main.py
Input Profile name [default]>>

Input trail name >> test

Select bucket
[0] XXXXXXXXXXXXXXXXXXXXXXXx
>> 0

main.py

import boto3
import datetime
import re
from pprint import pprint

class CloudTrailWizard:
    _client_name = 'cloudtrail'
    _session = None
    _trail_name = None
    _bucket_name = None
    def __init__(self, profile_name):
        self._session = boto3.Session(profile_name=profile_name)

    @property
    def session(self):
        return self._session

    def get_client(self, client_name=None):
        if not client_name:
            client_name = self._client_name
        return self.session.client(client_name)

    @property
    def client_name(self):
        return self._client_name

    @property
    def trail_name(self):
        return self._trail_name

    @property
    def bucket_name(self):
        return self._bucket_name

    def get_bucket_list(self):
        buckets = self.get_client('s3').list_buckets()
        bucket_names = list()
        for bucket in buckets['Buckets']:
            bucket_names.append(bucket['Name'])
        return bucket_names

    def create_trail(self):
        params = {
            'Name': self.trail_name,
            'S3BucketName': self.bucket_name
        }
        return self.get_client().create_trail(**params)
    
    def put_event_selectors(self):
        params = {
            'TrailName': self.trail_name,
            'EventSelectors': [
                {
                    'ReadWriteType': 'All',
                    'IncludeManagementEvents': False,
                    'DataResources': [
                        {
                            'Type': 'AWS::S3::Object',
                            'Values': [
                                "arn:aws:s3:::{}/Trail".format(self.bucket_name),
                            ]
                        },
                    ]
                }
            ]
        }
        return self.get_client().put_event_selectors(**params)

    def start_logging(self):
        params = {
            'Name': self.trail_name
        }
        self.get_client().start_logging(**params)

    def prompt_bucket(self):
        name = None
        names = self.get_bucket_list()
        while True:
            print('\nSelect bucket')
            for name in names:
                print('[{}] {}'.format(names.index(name), name))
            index  = input('>> ')
            if len(index) != 0 and int(index) < len(names):
                name = names[int(index)]
                break
        self._bucket_name = name


    def prompt_base(self, message):
        value = None
        while True:
            value = input("\n{} >> ".format(message))
            if value and len(value) != 0:
                break
        return value

    def prompt_trail_name(self):
        self._trail_name = self.prompt_base('Input trail name')

    def prompt_resource(self):
        self._resource = self.prompt_base('Input resource')

    @staticmethod
    def prompt():
        default_profile_name = 'default'
        profile_name = input('Input Profile name [{}]>> '.format(default_profile_name))
        if len(profile_name) == 0:
            profile_name = default_profile_name
        wizard = CloudTrailWizard(profile_name)

        wizard.prompt_trail_name()
        wizard.prompt_bucket()
        wizard.create_trail()
        wizard.put_event_selectors()
        wizard.start_logging()

if __name__ == '__main__':
    CloudTrailWizard.prompt()

まとめ

CloudTrailそのものについてはそこまで手間も掛からないと思います。

S3のBucketPolicyについては、状況によって設定項目が変わってきます。迷ったらAWSドキュメント 「バケットのポリシー」の参照をお勧めします。