AWS Glue・Amazon Athena・Amazon QuickSightを使ってAWS CloudTrailログをETL後に可視化する

2018.09.28

AWSビッグデータブログに、AWS CloudTrailログのインプットに対してAWS GlueでETL処理を行い、Amazon Athena・Amazon QuickSightで可視化するウォークスルーがあります。

Visualize AWS Cloudtrail Logs Using AWS Glue and Amazon QuickSight | AWS Big Data Blog

AWS CloudFormation を使ってサクッとソリューションを構築しつつ、AWS Glue を使った典型的な ETL パイプラインを学べます。

当エントリではその内容について実際に動かしてみたものを、順を追って見ていきたいと思います。

ソリューションの概要

  1. AWS の操作履歴が AWS CloudTrail Logsによって S3 に出力されます。
  2. S3への書き込みイベントが Lambda 関数を呼び出し、簡単なデータ加工を行い、 S3 に出力します。
  3. このデータを AWS Glue Crawler が AWS Glue Data Catalog にカタログ化します。
  4. AWS Glue ジョブがデータを平坦化し、Parquet 形式に変換し、 S3 に出力します。
  5. このデータを AWS Glue Crawler が AWS Glue Data Catalog にカタログ化します。
  6. このメタストアを利用して Amazon Athena から SQL を実行可能です。
  7. Amazon QuickSight から Amazon Athena 経由でデータを取得し、CloudTrail Logs を可視化します。

ソリューションの構築

提供されている AWS CloudFormation テンプレートを利用することで、 CloudTrail の作成から AWS Glue を使った ETL 処理まで構築でき、Amazon Athena 以降だけ手作業で構築します。

元ブログの大部分は CloudFormation を使わなかった場合の手順に費やされています。 CloudFormation を使わない場合は、元ブログの「Walkthrough」の先頭から、使う場合は、元ブログの「Query results with Athena」から読めば OK です。

前提

  • AWS Glue
  • Amazon Athena
  • Amazon QuickSight

が利用可能なリージョンでソリューションを構築してください。

東京(ap-northeast-1)リージョンはこの条件を満たしています。

AWS CloudFormation によるソリューションの構築

CloudFormation スタックの作成

CloudFormationのスタック作成ページに移動し、S3 のURL に次の URL を指定します。

https://s3.amazonaws.com/lfcarocomdemo/gluecloudtrail/cloudtrailglue.yaml

パラメーターの指定

  • スタック名
  • S3 バケット名
  • ETL ジョブの実行スケジュール

を指定し、スタックを作成してください。

このS3バケットは

  • CloudTrail Logs の出力
  • ETL 処理の出力

に利用します。

スタック作成後、5分程度すると、全リソースが作成されます。

以降では、データ処理パイプラインをデータ発生ポイントから順に確認します。

CloudTrail Logs -> S3 出力 -> Lambda(前処理) -> S3 出力まで

CloudTrail Logs の作成

CloudFormation により StackName-cloudtrail-XXX という CloudTrail が作成されます。

CloudTrail の証跡ログは CloudFormation のスタック作成時に指定した S3 バケットのキー「AWSLogs/AWS-ACCOUNT/CloudTrail/REGION/」以下に出力されます。

Lambda による CloudTrail Logs のクレンジング

AWS Glue ジョブのデータソースとして利用できるようにするための前処理を Lambda で行います。

CloudFormation により StackName-flatlambda-XXX という名前の Lambda 関数が作成され、CloudTrail ログが S3 に出力されると、この Lambda 関数が呼び出されます。 CloudFormation のスタック作成時に指定した S3 バケットのキー「flatfiles/」以下に出力します。

lambda-flatlambda.py

from __future__ import print_function
import json
import urllib
import boto3
import gzip
import os
import re
import time
s3 = boto3.resource('s3')
client = boto3.client('s3')
glue = boto3.client('glue')

def convertColumntoLowwerCaps(obj):
    for key in obj.keys():
        new_key = re.sub(r'[\W]+', '', key)
        v = obj[key]
        if isinstance(v, dict):
            if len(v) > 0:
                convertColumntoLowwerCaps(v)
        if new_key != key:
            obj[new_key] = obj[key]
            del obj[key]
    return obj

def lambda_handler(event, context):
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key'].encode('utf8'))
    print(bucket)
    print(key)
    try:
        newKey = 'flatfiles/' + key.replace("/", "")
        client.download_file(bucket, key, '/tmp/file.json.gz')
        with gzip.open('/tmp/out.json.gz', 'w') as output, gzip.open('/tmp/file.json.gz', 'rb') as file:
            i = 0
            for line in file: 
                for record in json.loads(line,object_hook=convertColumntoLowwerCaps)['Records']:
                  if i != 0:
                      output.write("\n")
                  if 'responseelements' in record and record['responseelements'] != None and 'version' in record['responseelements']:
                      del record['responseelements']['version']
                  if 'requestparameters' in record and record['requestparameters'] != None and 'maxitems' in record['requestparameters']:
                      del record['requestparameters']['maxitems']               
                  output.write(json.dumps(record))
                  i += 1
        client.upload_file('/tmp/out.json.gz', bucket,newKey)
        return "success"
    except Exception as e:
        print(e)
        raise e
    return "success"

Lambda 関数名から JSON のネストされたスキーマを平坦化しているのかと思いきや、平坦化は後ほど紹介する AWS Glue のジョブ(cloudtrailtoparquet)で行っています。

ブログ当時はAWS Glue Cralwer のためにフィールド(カラム)名を小文字にすることが推奨されていたため、フィールド(カラム)名を小文字に変換していましたが、現在はそのような対応は不要です。 Lambda 内の関数名(convertColumntoLowwerCaps)に当時の名残りがあります。

AWS Glue の ETL 処理

AWS Glue の ETL ジョブの作成

CloudFormation により cloudtrailtoparquet-XXX という名前の PySpark のジョブが作成されます。

ジョブはトリガー ScheduledJobTrigger-XXX によって一定間隔でスケジュール実行されます。

このジョブは以下の順で処理します。

  1. ソースデータのクロール
  2. データ変換
  3. 変換データのクロール

1. ソースデータのクロール

Lambda で前処理を行ったデータ処理対象のデータ s3://BUCKET-NAME/flatfiles/ に対して、クローラー cloudtrailjson-XXX を呼び出して AWS Glue データカタログの flatfiles テーブルに格納します。

2. データ変換

メインのデータ変換処理では Relationalize 処理(DynamicFrame のネストされたスキーマをフラット化し、フラット化されたフレームから配列列をピボットアウト)を行い、更に、大規模データ処理に向いた Parquet 形式に変換します。

Relationalize 処理により

requestparameters(struct type)
{"requestparameters.loggroupname":"/aws-glue/jobs/output", "logstreamname":"jr_XXX", ...}

のような struct 型のデータが

requestparameters.loggroupname(string type) requestparameters.logstreamname(string type) ...
"/aws-glue/jobs/output" "jr_XXX" ...

のように変換されます。

3. 変換データのクロール

加工後のデータは s3://BUCKET-NAME/parquet/cloudtrail 以下に出力し、クローラー cloudtrailparquet-XXX を呼び出して AWS Glue データカタログの cloudtrail テーブルに格納します。

PySpark スクリプト

pyspark-cloudtrailtoparquet

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import boto3
import time

glue = boto3.client('glue',region_name='us-east-1')



## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME','sourcedatabase', 'sourcetable','destinationpath','resultscrawler','sourcecrawler'])


sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
sourcedatabase = args['sourcedatabase']
sourcetable = args['sourcetable']
destinationpath = args['destinationpath']
resutlCrawler = args['resultscrawler']
sourceCrawler = args['sourcecrawler']

try:
	glue.start_crawler(Name=sourceCrawler)
	print "started crawler"
except:
	print "crawler is not running"

response = glue.get_crawler(Name=sourceCrawler)
state = response['Crawler']['State']
while state == 'RUNNING':
	time.sleep(60)
	response = glue.get_crawler(Name=sourceCrawler)
	state = response['Crawler']['State']
print "final state " + state

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = sourcedatabase, table_name = sourcetable, transformation_ctx = "datasource0")
if datasource0.toDF().head(1):
    resolvechoice1 = ResolveChoice.apply(frame = datasource0, choice = "make_struct", transformation_ctx = "resolvechoice1")
    relationalized1 = resolvechoice1.relationalize("trail", args["TempDir"]).select("trail")
    dropnullfields1 = DropNullFields.apply(frame = relationalized1, transformation_ctx = "dropnullfields1")
    datasink = glueContext.write_dynamic_frame.from_options(frame = dropnullfields1, connection_type = "s3", connection_options = {"path": destinationpath}, format = "parquet", transformation_ctx = "datasink4")
    glue.start_crawler(Name=resutlCrawler)
job.commit()

S3 データの可視化

Amazon Athena からデータを確認

S3 の CloudTrail の JSON データも、ETL ジョブ後の Parquet データも AWS Glueデータカタログに格納されています。

そのため、Amazon Athena と AWS Glue データカタログを連携している場合、これらのデータに対して Amazon Athena からシームレスに SQL を実行できます。

SELECT *
FROM cloudtrail
WHERE eventtime > '2018-09-26T00:00:00Z'
        AND 'useridentity.invokedby' = 'glue.amazonaws.com'
ORDER BY  eventtime asc

Amazon QuickSight で可視化

最後に AWS が提供するクラウド型の BI、 Amazon QuickSight から CloudTrail Logs を可視化します。

Amazon QuickSight のデータソースに Amazon Athenaを指定し、先程作成したデータベース・テーブルを指定します。

また、Amazon QuickSight の管理画面から、Amazon QuickSight が Amazon Athena/S3 にアクセスできるように、権限を付与します。

ヴィジュアルをゴニョゴニョすると、可視化の完成です。

まとめ

このウォークスルーを動かすことで、AWS Glue の主要コンポーネント

  • データカタログ
  • ETL 処理

だけでなく

  • AWS Glue で ETL 処理するための前処理
  • ETL 処理後のデータの可視化

と、データ処理パイプラインを下流から上流まで、身近な CloudTrail Logs を例に、一通り学べます。

手を動かしながら理解したい方は、一度このウォークスルーを流すことをお勧めします。

また、一度も AWS Glue を触ったことがない方は、管理コンソールに付属するチュートリアルをまずやってみるのがお勧めです。このチュートリアルについては次の過去ブログを参照ください。

AWS Glue 実践入門:サービスメニュー内で展開されている「ガイド付きチュートリアル」を試してみた

参考