Amazon Auroraでデータベースアクティビティストリームを有効にしてKMSで暗号された監査ログを復号してS3保存してみた
Aurora PostgreSQLで監査ログを取得する場合
- PostgreSQL ネイティブのツールを利用 : pgAudit
- AWS の仕組み活用 : Database Activity Stream
の2パターンがあります。
本ブログでは、後者の Database Activity Stream を利用して監査ログを取得する方法を紹介します。
主なユースケースとしてコンプライアンスや規制要件を想定しており、各アクティビティはKMSで暗号化されています。
永続化と調査の利便性から、KMS復号してS3に保存する方法を紹介します。
※ https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.Overview.html から引用
PgAudit 方式は次の記事を参照ください。
ポイント
- DASを有効にすると、Amazon Kinesis Data Streamにアクティビティがストリームされる。コンシューマーはユーザーが実装
- レコードはエンベロープ暗号+Encrption ContextでKMS暗号化。aws_encryption_sdkを使うとシンプルに複合処理を実装できる
データベースアクティビティストリームをS3保存
まずは、AuroraクラスターのDASを有効化し、アクティビティレコードを暗号化されたままS3に保存してみます。
KMS CMKの作成
DASのレコードはKMSで暗号化されます。 そのためのKMS CMKの対象鍵を作成します。
データベースアクティビティストリームを有効化
Amazon Auroraクラスターに対して、Actionメニューの「Stat activity Stream」からDASを有効化します。
設定画面では
- 暗号処理のためのKMSをマスターキー
- ストリームの同期・非同期の選択(Aurora PostgreSQLの場合。Aurora MySQLは非同期モードのみ利用可能)
を設定します。
正確さが大事な場合は同期モード、データベースのパフォーマンスが重要な場合は非同期モードを選択します。
また、T系インスタンスなどはDASを利用できないといった制約もあります。
利用可能なエンジンバージョン、インスタンスタイプなどは、ドキュメントを参照ください。
Overview of Database Activity Streams - Amazon Aurora
Kinesis Data Firehose経由でKinesis Data StreamからS3に保存
DASを開始すると、 「aws-rds-das-AuroraリソースID」という命名規則の Amazon Kinesis Data Streamが作成されます。
ストリームデータをS3に保存するために、Amazon Kinesis Data Firehoseを作成します。
DestinationにはS3バケットを指定します。
S3オブジェクトを確認
しばらく放置し、S3に出力された DAS レコードを確認します。
$ aws s3 cp \ s3://BUCKET/das/2022/03/28/09/KDS-S3-jzmVf-2-2022-03-28-09-08-33-XXX - | jq . { "type": "DatabaseActivityMonitoringRecords", "version": "1.1", "databaseActivityEvents": "AYABeCXmSTxeGsDJf19NaB0NFABA...", "key": "AQIDAHj1R6pvOIa4MXmA+mA16cdcdRgblPE00ifyTIA6c...==" } { "type": "DatabaseActivityMonitoringRecords", "version": "1.1", "databaseActivityEvents": "AYABeMwhp69t51GJLL9N+NX3Scy6y...", "key": "AQIDAHj1R6pvOIa4MXmA+mA16cdcdRgblPE00ifyTIA6c4...==" } ...
各レコードは暗号化されたままのため、イベントの中身が不明です。
以降では、Amazon Kinesis Data Firehose Data Transformation で複合して S3 保存するよう改修します。
データベースアクティビティストリームの暗号処理について
上述の通り、データベースアクティビティストリームのレコードは KMS で暗号化されており、次の形をしています。
{ "type":"DatabaseActivityMonitoringRecords", "version":"1.1", "databaseActivityEvents":"encrypted audit records", "key":"encrypted key" }
細かいところを差し置くと、データベースアクティビティストリームのレコードは
- エンベロープ暗号化
- 暗号化コンテキスト
を使ってKMS暗号されています。
エンベロープ暗号化 では、レコードごとにユニークな暗号化キー(=データキー)を生成してこのキーで暗号化し、データキーをマスターキーで暗号化します。
データフォーマットでは
databaseActivityEvents
がデータキーを使って暗号化されたアクティビティレコードkey
が暗号化されたデータキー
です。
このデータキーはAuroraクラスターのリソースIDを暗号化コンテキストとして暗号化されています。
リソースIDをコンテキストに渡して、データキーを復号します。
# エンベロープ暗号のデータキーを復号 data_key = kms.decrypt( CiphertextBlob=base64.b64decode(payload['key']), EncryptionContext={'aws:rds:dbc-id': RESOURCE_ID})['Plaintext']
復号したデータキーを利用し、AWS Encryption SDKで、アクティビティイベントを復号します。
def decrypt_decompress(payload, data_key): # envelope decryption my_key_provider = MyRawMasterKeyProvider(data_key) my_key_provider.add_master_key("DataKey") plaintext, header = enc_client.decrypt( source=payload, materials_manager=aws_encryption_sdk.materials_managers.default.DefaultCryptoMaterialsManager(master_key_provider=my_key_provider)) # decode gzip return json.loads(zlib.decompress(plaintext, zlib.MAX_WBITS + 16))
復号用Pythonコード
AWSの公式ドキュメントには Kinesis Data Streamにポーリングするサンプルコードが掲載されています。これをベースにFirehos向けにスッキリさせたのが以下です。
import base64 import json import zlib import aws_encryption_sdk from aws_encryption_sdk import CommitmentPolicy from aws_encryption_sdk.internal.crypto import WrappingKey from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider from aws_encryption_sdk.identifiers import WrappingAlgorithm, EncryptionKeyType import boto3 RESOURCE_ID = 'cluster-ABCD123456' # cluster-ABCD123456 kms = boto3.client('kms') enc_client = aws_encryption_sdk.EncryptionSDKClient(commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_ALLOW_DECRYPT) class MyRawMasterKeyProvider(RawMasterKeyProvider): provider_id = "BC" def __new__(cls, *args, **kwargs): obj = super(RawMasterKeyProvider, cls).__new__(cls) return obj def __init__(self, plain_key): RawMasterKeyProvider.__init__(self) self.wrapping_key = WrappingKey(wrapping_algorithm=WrappingAlgorithm.AES_256_GCM_IV12_TAG16_NO_PADDING, wrapping_key=plain_key, wrapping_key_type=EncryptionKeyType.SYMMETRIC) def _get_raw_key(self, key_id): return self.wrapping_key def decrypt_decompress(payload, data_key): # envelope decryption my_key_provider = MyRawMasterKeyProvider(data_key) my_key_provider.add_master_key("DataKey") plaintext, header = enc_client.decrypt( source=payload, materials_manager=aws_encryption_sdk.materials_managers.default.DefaultCryptoMaterialsManager(master_key_provider=my_key_provider)) # decode gzip return json.loads(zlib.decompress(plaintext, zlib.MAX_WBITS + 16)) def lambda_handler(event, context): output = [] for record in event['records']: # pprint(record) # {'approximateArrivalTimestamp': 1648386972585, # 'data': 'eyJ0eXBlIjoiRGF0YWJhc2VBY3Rp...=', # 'kinesisRecordMetadata': { # 'approximateArrivalTimestamp': 1648386972585, # 'partitionKey': '8df550fe-0b31-4ccc-bf79-179155c3d692', # 'sequenceNumber': '49628017229903973410499797217363029301327752934790791298', # 'shardId': 'shardId-000000000008', # 'subsequenceNumber': 0}, # 'recordId': '49628017229903973410499797217363029301327752934790791298000000'} payload = json.loads(base64.b64decode(record['data']).decode('utf-8')) # pprint(payload) # {'databaseActivityEvents': 'AYABeHmul64eTojOgIlQqh...', # 'key': 'AQIDAHj1R6pvOIa4MXmA+mA...y7ryl0dPKfUjo0vw==', # 'type': 'DatabaseActivityMonitoringRecords', # 'version': '1.1'} # decrypt data key with context data_key = kms.decrypt( CiphertextBlob=base64.b64decode(payload['key']), EncryptionContext={'aws:rds:dbc-id': RESOURCE_ID})['Plaintext'] payload_decoded = decrypt_decompress( base64.b64decode(payload['databaseActivityEvents']), data_key) # pprint(payload_decoded) # {'clusterId': 'cluster-P74MFUJBI3PYA2N5L22YVVX3VY', # 'databaseActivityEventList': [{'type': 'heartbeat'}], # 'instanceId': 'db-KBKJGYAAF5QT253EYKEI37WBCY', # 'type': 'DatabaseActivityMonitoringRecord'} output_record = { 'recordId': record['recordId'], 'result': 'Ok', 'data': base64.b64encode((json.dumps(payload_decoded) + '\n').encode('utf-8')) # ) } output.append(output_record) print('Successfully processed {} records.'.format(len(event['records']))) return {'records': output}
依存している aws_encryption_sdk
とともに Lambda 関数としてパッケージ化し、Firehose の Data transformation 用 Lambda 関数として指定します。
復号されたS3オブジェクトを確認
しばらく放置し、S3に出力された 復号された DAS レコードを確認します。
{ "type": "DatabaseActivityMonitoringRecord", "clusterId": "cluster-P74MFUJBI3PYA2N5L22YVVX3VY", "instanceId": "db-KBKJGYAAF5QT253EYKEI37WBCY", "databaseActivityEventList": [ { "logTime": "2022-03-27 13:05:16.607273+00", "statementId": 2, "substatementId": 1, "objectType": null, "command": "ERROR", "objectName": null, "databaseName": "test", "dbUserName": "postgres", "remoteHost": "172.31.20.245", "remotePort": "52864", "sessionId": "6240610c.1dda", "rowCount": null, "commandText": null, "paramList": [], "pid": 7642, "clientApplication": "", "exitCode": "28P01", "class": "MISC", "serverVersion": "13.4.1", "serverType": "PostgreSQL", "serviceName": "Amazon Aurora PostgreSQL-Compatible edition", "serverHost": "172.31.34.221", "netProtocol": "TCP", "dbProtocol": "Postgres 3.0", "type": "record", "startTime": "2022-03-27 13:05:16.607269+00", "errorMessage": "password authentication failed for user \"postgres\"" } ] } { "type": "DatabaseActivityMonitoringRecord", "clusterId": "cluster-P74MFUJBI3PYA2N5L22YVVX3VY", "instanceId": "db-KBKJGYAAF5QT253EYKEI37WBCY", "databaseActivityEventList": [ { "logTime": "2022-03-27 13:05:16.613649+00", "statementId": 1, "substatementId": 1, "objectType": null, "command": "AUTH FAILURE", "objectName": null, "databaseName": "test", "dbUserName": "postgres", "remoteHost": "172.31.20.245", "remotePort": "52866", "sessionId": "6240610c.1ddb", "rowCount": null, "commandText": null, "paramList": [], "pid": 7643, "clientApplication": "", "exitCode": -1, "class": "MISC", "serverVersion": "13.4.1", "serverType": "PostgreSQL", "serviceName": "Amazon Aurora PostgreSQL-Compatible edition", "serverHost": "172.31.34.221", "netProtocol": "TCP", "dbProtocol": "Postgres 3.0", "type": "record", "startTime": "2022-03-27 13:05:16.613637+00", "errorMessage": null } ] } ...
期待通り、復号されています。
最後に
Amazon Aurora のデータベースアクティビティストリーム(DAS)を使って監査ログを S3 に永続化する方法を紹介しました。
ストリームデータはKMSで暗号化されているため、レコードを調査するには、どこかで復号処理を入れる必要があります。
今回はS3に保存する Firehose フェーズで復号するサンプルを紹介しました。 復号ロジックを移植すれば、S3 PUT後のオブジェクトを加工することも、かんたんにできるかと思います。
DAS は pgAudit
を使った監査ログに比べ、AWSや暗号処理の知識や実装力が求められる一方で、MySQL/PostgreSQLに対して一貫した手法でログを取得でき、Kinesis Data Streamから先はユーザーがデータ処理を自由に実装できるのがメリットです。
それでは。