Amazon Auroraでデータベースアクティビティストリームを有効にしてKMSで暗号された監査ログを復号してS3保存してみた

2022.03.29

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

Aurora PostgreSQLで監査ログを取得する場合

  • PostgreSQL ネイティブのツールを利用 : pgAudit
  • AWS の仕組み活用 : Database Activity Stream

の2パターンがあります。

本ブログでは、後者の Database Activity Stream を利用して監査ログを取得する方法を紹介します。

主なユースケースとしてコンプライアンスや規制要件を想定しており、各アクティビティはKMSで暗号化されています。

永続化と調査の利便性から、KMS復号してS3に保存する方法を紹介します。

https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.Overview.html から引用

PgAudit 方式は次の記事を参照ください。

ポイント

  • DASを有効にすると、Amazon Kinesis Data Streamにアクティビティがストリームされる。コンシューマーはユーザーが実装
  • レコードはエンベロープ暗号+Encrption ContextでKMS暗号化。aws_encryption_sdkを使うとシンプルに複合処理を実装できる

データベースアクティビティストリームをS3保存

まずは、AuroraクラスターのDASを有効化し、アクティビティレコードを暗号化されたままS3に保存してみます。

KMS CMKの作成

DASのレコードはKMSで暗号化されます。 そのためのKMS CMKの対象鍵を作成します。

データベースアクティビティストリームを有効化

Amazon Auroraクラスターに対して、Actionメニューの「Stat activity Stream」からDASを有効化します。

設定画面では

  • 暗号処理のためのKMSをマスターキー
  • ストリームの同期・非同期の選択(Aurora PostgreSQLの場合。Aurora MySQLは非同期モードのみ利用可能)

を設定します。

正確さが大事な場合は同期モード、データベースのパフォーマンスが重要な場合は非同期モードを選択します。

また、T系インスタンスなどはDASを利用できないといった制約もあります。

利用可能なエンジンバージョン、インスタンスタイプなどは、ドキュメントを参照ください。

Overview of Database Activity Streams - Amazon Aurora

Kinesis Data Firehose経由でKinesis Data StreamからS3に保存

DASを開始すると、 「aws-rds-das-AuroraリソースID」という命名規則の Amazon Kinesis Data Streamが作成されます。

ストリームデータをS3に保存するために、Amazon Kinesis Data Firehoseを作成します。

DestinationにはS3バケットを指定します。

S3オブジェクトを確認

しばらく放置し、S3に出力された DAS レコードを確認します。

$ aws s3 cp \
  s3://BUCKET/das/2022/03/28/09/KDS-S3-jzmVf-2-2022-03-28-09-08-33-XXX - | jq .
{
  "type": "DatabaseActivityMonitoringRecords",
  "version": "1.1",
  "databaseActivityEvents": "AYABeCXmSTxeGsDJf19NaB0NFABA...",
  "key": "AQIDAHj1R6pvOIa4MXmA+mA16cdcdRgblPE00ifyTIA6c...=="
}
{
  "type": "DatabaseActivityMonitoringRecords",
  "version": "1.1",
  "databaseActivityEvents": "AYABeMwhp69t51GJLL9N+NX3Scy6y...",
  "key": "AQIDAHj1R6pvOIa4MXmA+mA16cdcdRgblPE00ifyTIA6c4...=="
}
...

各レコードは暗号化されたままのため、イベントの中身が不明です。

以降では、Amazon Kinesis Data Firehose Data Transformation で複合して S3 保存するよう改修します。

データベースアクティビティストリームの暗号処理について

上述の通り、データベースアクティビティストリームのレコードは KMS で暗号化されており、次の形をしています。

{
  "type":"DatabaseActivityMonitoringRecords",
  "version":"1.1",
  "databaseActivityEvents":"encrypted audit records",
  "key":"encrypted key"
}

細かいところを差し置くと、データベースアクティビティストリームのレコードは

  • エンベロープ暗号化
  • 暗号化コンテキスト

を使ってKMS暗号されています。

エンベロープ暗号化 では、レコードごとにユニークな暗号化キー(=データキー)を生成してこのキーで暗号化し、データキーをマスターキーで暗号化します。

データフォーマットでは

  • databaseActivityEvents がデータキーを使って暗号化されたアクティビティレコード
  • key が暗号化されたデータキー

です。

このデータキーはAuroraクラスターのリソースIDを暗号化コンテキストとして暗号化されています。

リソースIDをコンテキストに渡して、データキーを復号します。

    # エンベロープ暗号のデータキーを復号
    data_key = kms.decrypt(
      CiphertextBlob=base64.b64decode(payload['key']),
      EncryptionContext={'aws:rds:dbc-id': RESOURCE_ID})['Plaintext']

復号したデータキーを利用し、AWS Encryption SDKで、アクティビティイベントを復号します。

def decrypt_decompress(payload, data_key):
    # envelope decryption
    my_key_provider = MyRawMasterKeyProvider(data_key)
    my_key_provider.add_master_key("DataKey")
    plaintext, header = enc_client.decrypt(
        source=payload,
        materials_manager=aws_encryption_sdk.materials_managers.default.DefaultCryptoMaterialsManager(master_key_provider=my_key_provider))

    # decode gzip
    return json.loads(zlib.decompress(plaintext, zlib.MAX_WBITS + 16))

復号用Pythonコード

AWSの公式ドキュメントには Kinesis Data Streamにポーリングするサンプルコードが掲載されています。これをベースにFirehos向けにスッキリさせたのが以下です。

lambda_function.py

import base64
import json
import zlib

import aws_encryption_sdk
from aws_encryption_sdk import CommitmentPolicy
from aws_encryption_sdk.internal.crypto import WrappingKey
from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider
from aws_encryption_sdk.identifiers import WrappingAlgorithm, EncryptionKeyType
import boto3

RESOURCE_ID = 'cluster-ABCD123456'            # cluster-ABCD123456

kms = boto3.client('kms')

enc_client = aws_encryption_sdk.EncryptionSDKClient(commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_ALLOW_DECRYPT)

class MyRawMasterKeyProvider(RawMasterKeyProvider):
    provider_id = "BC"

    def __new__(cls, *args, **kwargs):
        obj = super(RawMasterKeyProvider, cls).__new__(cls)
        return obj

    def __init__(self, plain_key):
        RawMasterKeyProvider.__init__(self)
        self.wrapping_key = WrappingKey(wrapping_algorithm=WrappingAlgorithm.AES_256_GCM_IV12_TAG16_NO_PADDING,
                                        wrapping_key=plain_key, wrapping_key_type=EncryptionKeyType.SYMMETRIC)

    def _get_raw_key(self, key_id):
        return self.wrapping_key

def decrypt_decompress(payload, data_key):
    # envelope decryption
    my_key_provider = MyRawMasterKeyProvider(data_key)
    my_key_provider.add_master_key("DataKey")
    plaintext, header = enc_client.decrypt(
        source=payload,
        materials_manager=aws_encryption_sdk.materials_managers.default.DefaultCryptoMaterialsManager(master_key_provider=my_key_provider))

    # decode gzip
    return json.loads(zlib.decompress(plaintext, zlib.MAX_WBITS + 16))

def lambda_handler(event, context):
    output = []

    for record in event['records']:
        # pprint(record)
        # {'approximateArrivalTimestamp': 1648386972585,
        #  'data': 'eyJ0eXBlIjoiRGF0YWJhc2VBY3Rp...=',
        #  'kinesisRecordMetadata': {
        #    'approximateArrivalTimestamp': 1648386972585,
        #    'partitionKey': '8df550fe-0b31-4ccc-bf79-179155c3d692',
        #    'sequenceNumber': '49628017229903973410499797217363029301327752934790791298',
        #    'shardId': 'shardId-000000000008',
        #    'subsequenceNumber': 0},
        #  'recordId': '49628017229903973410499797217363029301327752934790791298000000'}

        payload = json.loads(base64.b64decode(record['data']).decode('utf-8'))
        # pprint(payload)
        # {'databaseActivityEvents': 'AYABeHmul64eTojOgIlQqh...',
        #  'key': 'AQIDAHj1R6pvOIa4MXmA+mA...y7ryl0dPKfUjo0vw==',
        #  'type': 'DatabaseActivityMonitoringRecords',
        #  'version': '1.1'}

        # decrypt data key with context
        data_key = kms.decrypt(
          CiphertextBlob=base64.b64decode(payload['key']),
          EncryptionContext={'aws:rds:dbc-id': RESOURCE_ID})['Plaintext']

        payload_decoded = decrypt_decompress(
          base64.b64decode(payload['databaseActivityEvents']),
          data_key)
        # pprint(payload_decoded)
        # {'clusterId': 'cluster-P74MFUJBI3PYA2N5L22YVVX3VY',
        #  'databaseActivityEventList': [{'type': 'heartbeat'}],
        #  'instanceId': 'db-KBKJGYAAF5QT253EYKEI37WBCY',
        #  'type': 'DatabaseActivityMonitoringRecord'}

        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': base64.b64encode((json.dumps(payload_decoded) + '\n').encode('utf-8')) # )
        }
        output.append(output_record)

    print('Successfully processed {} records.'.format(len(event['records'])))

    return {'records': output}

依存している aws_encryption_sdk とともに Lambda 関数としてパッケージ化し、Firehose の Data transformation 用 Lambda 関数として指定します。

復号されたS3オブジェクトを確認

しばらく放置し、S3に出力された 復号された DAS レコードを確認します。

{
  "type": "DatabaseActivityMonitoringRecord",
  "clusterId": "cluster-P74MFUJBI3PYA2N5L22YVVX3VY",
  "instanceId": "db-KBKJGYAAF5QT253EYKEI37WBCY",
  "databaseActivityEventList": [
    {
      "logTime": "2022-03-27 13:05:16.607273+00",
      "statementId": 2,
      "substatementId": 1,
      "objectType": null,
      "command": "ERROR",
      "objectName": null,
      "databaseName": "test",
      "dbUserName": "postgres",
      "remoteHost": "172.31.20.245",
      "remotePort": "52864",
      "sessionId": "6240610c.1dda",
      "rowCount": null,
      "commandText": null,
      "paramList": [],
      "pid": 7642,
      "clientApplication": "",
      "exitCode": "28P01",
      "class": "MISC",
      "serverVersion": "13.4.1",
      "serverType": "PostgreSQL",
      "serviceName": "Amazon Aurora PostgreSQL-Compatible edition",
      "serverHost": "172.31.34.221",
      "netProtocol": "TCP",
      "dbProtocol": "Postgres 3.0",
      "type": "record",
      "startTime": "2022-03-27 13:05:16.607269+00",
      "errorMessage": "password authentication failed for user \"postgres\""
    }
  ]
}
{
  "type": "DatabaseActivityMonitoringRecord",
  "clusterId": "cluster-P74MFUJBI3PYA2N5L22YVVX3VY",
  "instanceId": "db-KBKJGYAAF5QT253EYKEI37WBCY",
  "databaseActivityEventList": [
    {
      "logTime": "2022-03-27 13:05:16.613649+00",
      "statementId": 1,
      "substatementId": 1,
      "objectType": null,
      "command": "AUTH FAILURE",
      "objectName": null,
      "databaseName": "test",
      "dbUserName": "postgres",
      "remoteHost": "172.31.20.245",
      "remotePort": "52866",
      "sessionId": "6240610c.1ddb",
      "rowCount": null,
      "commandText": null,
      "paramList": [],
      "pid": 7643,
      "clientApplication": "",
      "exitCode": -1,
      "class": "MISC",
      "serverVersion": "13.4.1",
      "serverType": "PostgreSQL",
      "serviceName": "Amazon Aurora PostgreSQL-Compatible edition",
      "serverHost": "172.31.34.221",
      "netProtocol": "TCP",
      "dbProtocol": "Postgres 3.0",
      "type": "record",
      "startTime": "2022-03-27 13:05:16.613637+00",
      "errorMessage": null
    }
  ]
}
...

期待通り、復号されています。

最後に

Amazon Aurora のデータベースアクティビティストリーム(DAS)を使って監査ログを S3 に永続化する方法を紹介しました。

ストリームデータはKMSで暗号化されているため、レコードを調査するには、どこかで復号処理を入れる必要があります。

今回はS3に保存する Firehose フェーズで復号するサンプルを紹介しました。 復号ロジックを移植すれば、S3 PUT後のオブジェクトを加工することも、かんたんにできるかと思います。

DAS は pgAudit を使った監査ログに比べ、AWSや暗号処理の知識や実装力が求められる一方で、MySQL/PostgreSQLに対して一貫した手法でログを取得でき、Kinesis Data Streamから先はユーザーがデータ処理を自由に実装できるのがメリットです。

それでは。

参考