AWS Glue・Amazon Athena・Amazon QuickSightを使ってAWS CloudTrailログをETL後に可視化する
AWSビッグデータブログに、AWS CloudTrailログのインプットに対してAWS GlueでETL処理を行い、Amazon Athena・Amazon QuickSightで可視化するウォークスルーがあります。
Visualize AWS Cloudtrail Logs Using AWS Glue and Amazon QuickSight | AWS Big Data Blog
AWS CloudFormation を使ってサクッとソリューションを構築しつつ、AWS Glue を使った典型的な ETL パイプラインを学べます。
当エントリではその内容について実際に動かしてみたものを、順を追って見ていきたいと思います。
ソリューションの概要
- AWS の操作履歴が AWS CloudTrail Logsによって S3 に出力されます。
- S3への書き込みイベントが Lambda 関数を呼び出し、簡単なデータ加工を行い、 S3 に出力します。
- このデータを AWS Glue Crawler が AWS Glue Data Catalog にカタログ化します。
- AWS Glue ジョブがデータを平坦化し、Parquet 形式に変換し、 S3 に出力します。
- このデータを AWS Glue Crawler が AWS Glue Data Catalog にカタログ化します。
- このメタストアを利用して Amazon Athena から SQL を実行可能です。
- Amazon QuickSight から Amazon Athena 経由でデータを取得し、CloudTrail Logs を可視化します。
ソリューションの構築
提供されている AWS CloudFormation テンプレートを利用することで、 CloudTrail の作成から AWS Glue を使った ETL 処理まで構築でき、Amazon Athena 以降だけ手作業で構築します。
元ブログの大部分は CloudFormation を使わなかった場合の手順に費やされています。 CloudFormation を使わない場合は、元ブログの「Walkthrough」の先頭から、使う場合は、元ブログの「Query results with Athena」から読めば OK です。
前提
- AWS Glue
- Amazon Athena
- Amazon QuickSight
が利用可能なリージョンでソリューションを構築してください。
東京(ap-northeast-1)リージョンはこの条件を満たしています。
AWS CloudFormation によるソリューションの構築
CloudFormation スタックの作成
CloudFormationのスタック作成ページに移動し、S3 のURL に次の URL を指定します。
https://s3.amazonaws.com/lfcarocomdemo/gluecloudtrail/cloudtrailglue.yaml
パラメーターの指定
- スタック名
- S3 バケット名
- ETL ジョブの実行スケジュール
を指定し、スタックを作成してください。
このS3バケットは
- CloudTrail Logs の出力
- ETL 処理の出力
に利用します。
スタック作成後、5分程度すると、全リソースが作成されます。
以降では、データ処理パイプラインをデータ発生ポイントから順に確認します。
CloudTrail Logs -> S3 出力 -> Lambda(前処理) -> S3 出力まで
CloudTrail Logs の作成
CloudFormation により StackName-cloudtrail-XXX という CloudTrail が作成されます。
CloudTrail の証跡ログは CloudFormation のスタック作成時に指定した S3 バケットのキー「AWSLogs/AWS-ACCOUNT/CloudTrail/REGION/」以下に出力されます。
Lambda による CloudTrail Logs のクレンジング
AWS Glue ジョブのデータソースとして利用できるようにするための前処理を Lambda で行います。
CloudFormation により StackName-flatlambda-XXX という名前の Lambda 関数が作成され、CloudTrail ログが S3 に出力されると、この Lambda 関数が呼び出されます。 CloudFormation のスタック作成時に指定した S3 バケットのキー「flatfiles/」以下に出力します。
from __future__ import print_function import json import urllib import boto3 import gzip import os import re import time s3 = boto3.resource('s3') client = boto3.client('s3') glue = boto3.client('glue') def convertColumntoLowwerCaps(obj): for key in obj.keys(): new_key = re.sub(r'[\W]+', '', key) v = obj[key] if isinstance(v, dict): if len(v) > 0: convertColumntoLowwerCaps(v) if new_key != key: obj[new_key] = obj[key] del obj[key] return obj def lambda_handler(event, context): bucket = event['Records'][0]['s3']['bucket']['name'] key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key'].encode('utf8')) print(bucket) print(key) try: newKey = 'flatfiles/' + key.replace("/", "") client.download_file(bucket, key, '/tmp/file.json.gz') with gzip.open('/tmp/out.json.gz', 'w') as output, gzip.open('/tmp/file.json.gz', 'rb') as file: i = 0 for line in file: for record in json.loads(line,object_hook=convertColumntoLowwerCaps)['Records']: if i != 0: output.write("\n") if 'responseelements' in record and record['responseelements'] != None and 'version' in record['responseelements']: del record['responseelements']['version'] if 'requestparameters' in record and record['requestparameters'] != None and 'maxitems' in record['requestparameters']: del record['requestparameters']['maxitems'] output.write(json.dumps(record)) i += 1 client.upload_file('/tmp/out.json.gz', bucket,newKey) return "success" except Exception as e: print(e) raise e return "success"
Lambda 関数名から JSON のネストされたスキーマを平坦化しているのかと思いきや、平坦化は後ほど紹介する AWS Glue のジョブ(cloudtrailtoparquet)で行っています。
ブログ当時はAWS Glue Cralwer のためにフィールド(カラム)名を小文字にすることが推奨されていたため、フィールド(カラム)名を小文字に変換していましたが、現在はそのような対応は不要です。
Lambda 内の関数名(convertColumntoLowwerCaps
)に当時の名残りがあります。
AWS Glue の ETL 処理
AWS Glue の ETL ジョブの作成
CloudFormation により cloudtrailtoparquet-XXX という名前の PySpark のジョブが作成されます。
ジョブはトリガー ScheduledJobTrigger-XXX によって一定間隔でスケジュール実行されます。
このジョブは以下の順で処理します。
- ソースデータのクロール
- データ変換
- 変換データのクロール
1. ソースデータのクロール
Lambda で前処理を行ったデータ処理対象のデータ s3://BUCKET-NAME/flatfiles/ に対して、クローラー cloudtrailjson-XXX を呼び出して AWS Glue データカタログの flatfiles テーブルに格納します。
2. データ変換
メインのデータ変換処理では Relationalize 処理(DynamicFrame のネストされたスキーマをフラット化し、フラット化されたフレームから配列列をピボットアウト)を行い、更に、大規模データ処理に向いた Parquet 形式に変換します。
Relationalize 処理により
requestparameters(struct type) |
---|
{"requestparameters.loggroupname":"/aws-glue/jobs/output", "logstreamname":"jr_XXX", ...} |
のような struct 型のデータが
requestparameters.loggroupname(string type) | requestparameters.logstreamname(string type) | ... |
---|---|---|
"/aws-glue/jobs/output" | "jr_XXX" | ... |
のように変換されます。
3. 変換データのクロール
加工後のデータは s3://BUCKET-NAME/parquet/cloudtrail 以下に出力し、クローラー cloudtrailparquet-XXX を呼び出して AWS Glue データカタログの cloudtrail テーブルに格納します。
PySpark スクリプト
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job import boto3 import time glue = boto3.client('glue',region_name='us-east-1') ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME','sourcedatabase', 'sourcetable','destinationpath','resultscrawler','sourcecrawler']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) sourcedatabase = args['sourcedatabase'] sourcetable = args['sourcetable'] destinationpath = args['destinationpath'] resutlCrawler = args['resultscrawler'] sourceCrawler = args['sourcecrawler'] try: glue.start_crawler(Name=sourceCrawler) print "started crawler" except: print "crawler is not running" response = glue.get_crawler(Name=sourceCrawler) state = response['Crawler']['State'] while state == 'RUNNING': time.sleep(60) response = glue.get_crawler(Name=sourceCrawler) state = response['Crawler']['State'] print "final state " + state datasource0 = glueContext.create_dynamic_frame.from_catalog(database = sourcedatabase, table_name = sourcetable, transformation_ctx = "datasource0") if datasource0.toDF().head(1): resolvechoice1 = ResolveChoice.apply(frame = datasource0, choice = "make_struct", transformation_ctx = "resolvechoice1") relationalized1 = resolvechoice1.relationalize("trail", args["TempDir"]).select("trail") dropnullfields1 = DropNullFields.apply(frame = relationalized1, transformation_ctx = "dropnullfields1") datasink = glueContext.write_dynamic_frame.from_options(frame = dropnullfields1, connection_type = "s3", connection_options = {"path": destinationpath}, format = "parquet", transformation_ctx = "datasink4") glue.start_crawler(Name=resutlCrawler) job.commit()
S3 データの可視化
Amazon Athena からデータを確認
S3 の CloudTrail の JSON データも、ETL ジョブ後の Parquet データも AWS Glueデータカタログに格納されています。
そのため、Amazon Athena と AWS Glue データカタログを連携している場合、これらのデータに対して Amazon Athena からシームレスに SQL を実行できます。
SELECT * FROM cloudtrail WHERE eventtime > '2018-09-26T00:00:00Z' AND 'useridentity.invokedby' = 'glue.amazonaws.com' ORDER BY eventtime asc
Amazon QuickSight で可視化
最後に AWS が提供するクラウド型の BI、 Amazon QuickSight から CloudTrail Logs を可視化します。
Amazon QuickSight のデータソースに Amazon Athenaを指定し、先程作成したデータベース・テーブルを指定します。
また、Amazon QuickSight の管理画面から、Amazon QuickSight が Amazon Athena/S3 にアクセスできるように、権限を付与します。
ヴィジュアルをゴニョゴニョすると、可視化の完成です。
まとめ
このウォークスルーを動かすことで、AWS Glue の主要コンポーネント
- データカタログ
- ETL 処理
だけでなく
- AWS Glue で ETL 処理するための前処理
- ETL 処理後のデータの可視化
と、データ処理パイプラインを下流から上流まで、身近な CloudTrail Logs を例に、一通り学べます。
手を動かしながら理解したい方は、一度このウォークスルーを流すことをお勧めします。
また、一度も AWS Glue を触ったことがない方は、管理コンソールに付属するチュートリアルをまずやってみるのがお勧めです。このチュートリアルについては次の過去ブログを参照ください。