この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
Aurora PostgreSQLで監査ログを取得する場合
- PostgreSQL ネイティブのツールを利用 : pgAudit
- AWS の仕組み活用 : Database Activity Stream
の2パターンがあります。
本ブログでは、後者の Database Activity Stream を利用して監査ログを取得する方法を紹介します。
主なユースケースとしてコンプライアンスや規制要件を想定しており、各アクティビティはKMSで暗号化されています。
永続化と調査の利便性から、KMS復号してS3に保存する方法を紹介します。
※ https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.Overview.html から引用
PgAudit 方式は次の記事を参照ください。
ポイント
- DASを有効にすると、Amazon Kinesis Data Streamにアクティビティがストリームされる。コンシューマーはユーザーが実装
- レコードはエンベロープ暗号+Encrption ContextでKMS暗号化。aws_encryption_sdkを使うとシンプルに複合処理を実装できる
データベースアクティビティストリームをS3保存
まずは、AuroraクラスターのDASを有効化し、アクティビティレコードを暗号化されたままS3に保存してみます。
KMS CMKの作成
DASのレコードはKMSで暗号化されます。 そのためのKMS CMKの対象鍵を作成します。
データベースアクティビティストリームを有効化
Amazon Auroraクラスターに対して、Actionメニューの「Stat activity Stream」からDASを有効化します。
設定画面では
- 暗号処理のためのKMSをマスターキー
- ストリームの同期・非同期の選択(Aurora PostgreSQLの場合。Aurora MySQLは非同期モードのみ利用可能)
を設定します。
正確さが大事な場合は同期モード、データベースのパフォーマンスが重要な場合は非同期モードを選択します。
また、T系インスタンスなどはDASを利用できないといった制約もあります。
利用可能なエンジンバージョン、インスタンスタイプなどは、ドキュメントを参照ください。
Overview of Database Activity Streams - Amazon Aurora
Kinesis Data Firehose経由でKinesis Data StreamからS3に保存
DASを開始すると、 「aws-rds-das-AuroraリソースID」という命名規則の Amazon Kinesis Data Streamが作成されます。
ストリームデータをS3に保存するために、Amazon Kinesis Data Firehoseを作成します。
DestinationにはS3バケットを指定します。
S3オブジェクトを確認
しばらく放置し、S3に出力された DAS レコードを確認します。
$ aws s3 cp \
s3://BUCKET/das/2022/03/28/09/KDS-S3-jzmVf-2-2022-03-28-09-08-33-XXX - | jq .
{
"type": "DatabaseActivityMonitoringRecords",
"version": "1.1",
"databaseActivityEvents": "AYABeCXmSTxeGsDJf19NaB0NFABA...",
"key": "AQIDAHj1R6pvOIa4MXmA+mA16cdcdRgblPE00ifyTIA6c...=="
}
{
"type": "DatabaseActivityMonitoringRecords",
"version": "1.1",
"databaseActivityEvents": "AYABeMwhp69t51GJLL9N+NX3Scy6y...",
"key": "AQIDAHj1R6pvOIa4MXmA+mA16cdcdRgblPE00ifyTIA6c4...=="
}
...
各レコードは暗号化されたままのため、イベントの中身が不明です。
以降では、Amazon Kinesis Data Firehose Data Transformation で複合して S3 保存するよう改修します。
データベースアクティビティストリームの暗号処理について
上述の通り、データベースアクティビティストリームのレコードは KMS で暗号化されており、次の形をしています。
{
"type":"DatabaseActivityMonitoringRecords",
"version":"1.1",
"databaseActivityEvents":"encrypted audit records",
"key":"encrypted key"
}
細かいところを差し置くと、データベースアクティビティストリームのレコードは
- エンベロープ暗号化
- 暗号化コンテキスト
を使ってKMS暗号されています。
エンベロープ暗号化 では、レコードごとにユニークな暗号化キー(=データキー)を生成してこのキーで暗号化し、データキーをマスターキーで暗号化します。
データフォーマットでは
databaseActivityEvents
がデータキーを使って暗号化されたアクティビティレコードkey
が暗号化されたデータキー
です。
このデータキーはAuroraクラスターのリソースIDを暗号化コンテキストとして暗号化されています。
リソースIDをコンテキストに渡して、データキーを復号します。
# エンベロープ暗号のデータキーを復号
data_key = kms.decrypt(
CiphertextBlob=base64.b64decode(payload['key']),
EncryptionContext={'aws:rds:dbc-id': RESOURCE_ID})['Plaintext']
復号したデータキーを利用し、AWS Encryption SDKで、アクティビティイベントを復号します。
def decrypt_decompress(payload, data_key):
# envelope decryption
my_key_provider = MyRawMasterKeyProvider(data_key)
my_key_provider.add_master_key("DataKey")
plaintext, header = enc_client.decrypt(
source=payload,
materials_manager=aws_encryption_sdk.materials_managers.default.DefaultCryptoMaterialsManager(master_key_provider=my_key_provider))
# decode gzip
return json.loads(zlib.decompress(plaintext, zlib.MAX_WBITS + 16))
復号用Pythonコード
AWSの公式ドキュメントには Kinesis Data Streamにポーリングするサンプルコードが掲載されています。これをベースにFirehos向けにスッキリさせたのが以下です。
lambda_function.py
import base64
import json
import zlib
import aws_encryption_sdk
from aws_encryption_sdk import CommitmentPolicy
from aws_encryption_sdk.internal.crypto import WrappingKey
from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider
from aws_encryption_sdk.identifiers import WrappingAlgorithm, EncryptionKeyType
import boto3
RESOURCE_ID = 'cluster-ABCD123456' # cluster-ABCD123456
kms = boto3.client('kms')
enc_client = aws_encryption_sdk.EncryptionSDKClient(commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_ALLOW_DECRYPT)
class MyRawMasterKeyProvider(RawMasterKeyProvider):
provider_id = "BC"
def __new__(cls, *args, **kwargs):
obj = super(RawMasterKeyProvider, cls).__new__(cls)
return obj
def __init__(self, plain_key):
RawMasterKeyProvider.__init__(self)
self.wrapping_key = WrappingKey(wrapping_algorithm=WrappingAlgorithm.AES_256_GCM_IV12_TAG16_NO_PADDING,
wrapping_key=plain_key, wrapping_key_type=EncryptionKeyType.SYMMETRIC)
def _get_raw_key(self, key_id):
return self.wrapping_key
def decrypt_decompress(payload, data_key):
# envelope decryption
my_key_provider = MyRawMasterKeyProvider(data_key)
my_key_provider.add_master_key("DataKey")
plaintext, header = enc_client.decrypt(
source=payload,
materials_manager=aws_encryption_sdk.materials_managers.default.DefaultCryptoMaterialsManager(master_key_provider=my_key_provider))
# decode gzip
return json.loads(zlib.decompress(plaintext, zlib.MAX_WBITS + 16))
def lambda_handler(event, context):
output = []
for record in event['records']:
# pprint(record)
# {'approximateArrivalTimestamp': 1648386972585,
# 'data': 'eyJ0eXBlIjoiRGF0YWJhc2VBY3Rp...=',
# 'kinesisRecordMetadata': {
# 'approximateArrivalTimestamp': 1648386972585,
# 'partitionKey': '8df550fe-0b31-4ccc-bf79-179155c3d692',
# 'sequenceNumber': '49628017229903973410499797217363029301327752934790791298',
# 'shardId': 'shardId-000000000008',
# 'subsequenceNumber': 0},
# 'recordId': '49628017229903973410499797217363029301327752934790791298000000'}
payload = json.loads(base64.b64decode(record['data']).decode('utf-8'))
# pprint(payload)
# {'databaseActivityEvents': 'AYABeHmul64eTojOgIlQqh...',
# 'key': 'AQIDAHj1R6pvOIa4MXmA+mA...y7ryl0dPKfUjo0vw==',
# 'type': 'DatabaseActivityMonitoringRecords',
# 'version': '1.1'}
# decrypt data key with context
data_key = kms.decrypt(
CiphertextBlob=base64.b64decode(payload['key']),
EncryptionContext={'aws:rds:dbc-id': RESOURCE_ID})['Plaintext']
payload_decoded = decrypt_decompress(
base64.b64decode(payload['databaseActivityEvents']),
data_key)
# pprint(payload_decoded)
# {'clusterId': 'cluster-P74MFUJBI3PYA2N5L22YVVX3VY',
# 'databaseActivityEventList': [{'type': 'heartbeat'}],
# 'instanceId': 'db-KBKJGYAAF5QT253EYKEI37WBCY',
# 'type': 'DatabaseActivityMonitoringRecord'}
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode((json.dumps(payload_decoded) + '\n').encode('utf-8')) # )
}
output.append(output_record)
print('Successfully processed {} records.'.format(len(event['records'])))
return {'records': output}
依存している aws_encryption_sdk
とともに Lambda 関数としてパッケージ化し、Firehose の Data transformation 用 Lambda 関数として指定します。
復号されたS3オブジェクトを確認
しばらく放置し、S3に出力された 復号された DAS レコードを確認します。
{
"type": "DatabaseActivityMonitoringRecord",
"clusterId": "cluster-P74MFUJBI3PYA2N5L22YVVX3VY",
"instanceId": "db-KBKJGYAAF5QT253EYKEI37WBCY",
"databaseActivityEventList": [
{
"logTime": "2022-03-27 13:05:16.607273+00",
"statementId": 2,
"substatementId": 1,
"objectType": null,
"command": "ERROR",
"objectName": null,
"databaseName": "test",
"dbUserName": "postgres",
"remoteHost": "172.31.20.245",
"remotePort": "52864",
"sessionId": "6240610c.1dda",
"rowCount": null,
"commandText": null,
"paramList": [],
"pid": 7642,
"clientApplication": "",
"exitCode": "28P01",
"class": "MISC",
"serverVersion": "13.4.1",
"serverType": "PostgreSQL",
"serviceName": "Amazon Aurora PostgreSQL-Compatible edition",
"serverHost": "172.31.34.221",
"netProtocol": "TCP",
"dbProtocol": "Postgres 3.0",
"type": "record",
"startTime": "2022-03-27 13:05:16.607269+00",
"errorMessage": "password authentication failed for user \"postgres\""
}
]
}
{
"type": "DatabaseActivityMonitoringRecord",
"clusterId": "cluster-P74MFUJBI3PYA2N5L22YVVX3VY",
"instanceId": "db-KBKJGYAAF5QT253EYKEI37WBCY",
"databaseActivityEventList": [
{
"logTime": "2022-03-27 13:05:16.613649+00",
"statementId": 1,
"substatementId": 1,
"objectType": null,
"command": "AUTH FAILURE",
"objectName": null,
"databaseName": "test",
"dbUserName": "postgres",
"remoteHost": "172.31.20.245",
"remotePort": "52866",
"sessionId": "6240610c.1ddb",
"rowCount": null,
"commandText": null,
"paramList": [],
"pid": 7643,
"clientApplication": "",
"exitCode": -1,
"class": "MISC",
"serverVersion": "13.4.1",
"serverType": "PostgreSQL",
"serviceName": "Amazon Aurora PostgreSQL-Compatible edition",
"serverHost": "172.31.34.221",
"netProtocol": "TCP",
"dbProtocol": "Postgres 3.0",
"type": "record",
"startTime": "2022-03-27 13:05:16.613637+00",
"errorMessage": null
}
]
}
...
期待通り、復号されています。
最後に
Amazon Aurora のデータベースアクティビティストリーム(DAS)を使って監査ログを S3 に永続化する方法を紹介しました。
ストリームデータはKMSで暗号化されているため、レコードを調査するには、どこかで復号処理を入れる必要があります。
今回はS3に保存する Firehose フェーズで復号するサンプルを紹介しました。 復号ロジックを移植すれば、S3 PUT後のオブジェクトを加工することも、かんたんにできるかと思います。
DAS は pgAudit
を使った監査ログに比べ、AWSや暗号処理の知識や実装力が求められる一方で、MySQL/PostgreSQLに対して一貫した手法でログを取得でき、Kinesis Data Streamから先はユーザーがデータ処理を自由に実装できるのがメリットです。
それでは。