Redshift Serverlessの監査ログをCloudWatch Logs→S3バケットに転送してRedshift Spectrumで分析してみた

2022.11.21

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

どーも、データアナリティクス事業本部コンサルティングチームのsutoです。

Redshift Serverlessの監査ログの出力先はCloudWatch Logsのみとなっていますが、そこに分析をかけたいとなると、

  • AthenaのFederated Query(CloudWatch Connector)を利用して直接CloudWatchロググループにアクセスして分析
  • CloudWatchロググループ→S3バケットに転送してRedshift Spectrumで分析

のどちらかを検討することになると思います。

しかし、上記に記載したとおりAthena Federated Queryではログのスキャン量や処理の遅さからコストパフォーマンスが良くないことと、せっかくRedshiftを稼働しているのだからそのパフォーマンスを利用できるRedshift Spectrumに寄せたいですね。

ただ、CloudWatch LogsをS3バケットに定期的に転送させるには、Lambda or Kinesisの仕組みを構築する必要があります。

そこで今回は、CloudWatchロググループに出力されているRedshift監査ログをKinesis Firehose→S3バケット出力の仕組みを作ってRedshift Spectrumでクエリできるまでの手順をまとめました。

ポイントとしては、監査ログ内で膨大なレコード数となっている「Redshiftシステム側のヘルスチェックのため自動実行されているポーリングやクエリ」をCloudWatchサブスクリプションフィルターで除外しながらログを収集する点です。

これによって、Cloudwatch→Kinesis firehose→S3というデータ転送にかかるログの量を削減し、AWS利用コストを抑えることにつなげます。

今回作成するリソース

構成

おおまかな構成は以下の図のとおりです

構築にあたっての前提

  • Redshift Serverlessクラスターの監査ログが有効化されており、エクスポートするログとして「Connection log」と「user activity log」にチェックがあること
  • RedshiftクラスターにアタッチしているIAMロールに、関連する以下ポリシーがあること
    • AWSGlueConsoleFullAccess
      • Redshift Spectrum使用による外部テーブル(Glueデータカタログ)の操作を行う
    • AmazonS3FullAccess
      • 監査ログを保存バケットへの出力とRedshift Spectrum使用による読み取りを許可する
  • 監査ログを出力するS3バケットを作成されていること
    • S3バケットのライフサイクルは監査ログを保持したい期間に沿って設定しておく

実際にやってみた

必要なIAMロール作成

  • Kinesis Data Firehose 用のIAMロールを作成する
    • 以下の信頼されたエンティティを反映したIAMロールを作成
        {
          "Version": "2012-10-17",
          "Statement": {
            "Effect": "Allow",
            "Principal": { "Service": "firehose.amazonaws.com" },
            "Action": "sts:AssumeRole"
            } 
        }
    • 作成したIAMロールに以下マネージドポリシーをアタッチ
      • AmazonKinesisFullAccess
    • 作成したIAMロールに以下カスタムポリシーをアタッチ (bucket_name、を変更してお使いください)
        {
          "Version": "2012-10-17",
          "Statement": [
            {
              "Effect": "Allow",
              "Action": [ 
                  "s3:AbortMultipartUpload", 
                  "s3:GetBucketLocation", 
                  "s3:GetObject", 
                  "s3:ListBucket", 
                  "s3:ListBucketMultipartUploads", 
                  "s3:PutObject" ],
              "Resource": [ 
                  "arn:aws:s3:::bucket_name", 
                  "arn:aws:s3:::bucket_name/*" ]
            },
            {
              "Action": [
                  "lambda:InvokeFunction",
                  "lambda:GetFunctionConfiguration"
              ],
              "Effect": "Allow",
              "Resource": [
                  "*"
              ]
            }
          ]
        }

  • CloudWatch Logsサブスクリプションフィルター用のIAMロールを作成する

    • 以下の信頼されたエンティティを反映したIAMロールを作成(カスタム信頼ポリシーを選んで以下を貼り付けるとよい) (your_account_id、を変更してお使いください)
        {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Effect": "Allow",
                    "Principal": {
                        "Service": "logs.ap-northeast-1.amazonaws.com"
                    },
                    "Action": "sts:AssumeRole",
                    "Condition": {
                        "StringLike": {
                            "aws:SourceArn": "arn:aws:logs:ap-northeast-1:your_account_id:*"
                        }
                    }
                }
            ]
        }
    • 作成したIAMロールに以下カスタムポリシーをアタッチ (your_account_idを変更してお使いください)
        {
            "Version": "2012-10-17",
            "Statement":[
              {
                "Effect":"Allow",
                "Action":["firehose:*"],
                "Resource":["arn:aws:firehose:ap-northeast-1:your_account_id:*"]
              }
            ]
        }

Kinesis Firehoseで呼び出すLambda関数の作成

  • Lambdaの画面に移動し、「関数の作成」を選択
  • 「設計図の使用」を選択し、ブループリントの検索から「Process CloudWatch logs sent to Kinesis Firehose(Python)」を選択して「設定」

  • 関数名(redshiftlog-kinesis-transform)を入力、IAMロールは「基本的な Lambda アクセス権限で新しいロールを作成」

  • 関数が作成されたら「設定」タブの「一般設定」を選択し、タイムアウト値を5分に設定する

Useractivitylog用のS3転送設定を構築

  • Kinesis Data Firehoseの画面に移動し、「配信ストリームを作成」を選択
  • パラメータは以下のように設定
    • ソース:Direct Put
    • 送信先 :Amazon S3
    • 配信ストリーム名:RedshiftUserActivityLogStream
    • データ変換: 有効
    • Lambda関数:「参照」をクリックし、作成したLambda関数「redshiftlog-kinesis-transform」を選択
    • Lambda 関数のバージョン:$LATEST
    • Lambdaバッファサイズ:3MB
    • Lambdaバッファ間隔:300秒
    • レコード形式の変換: 無効
    • S3 bucket:作成済みのS3バケットを選択
    • 動的パーティショニング: 無効
    • S3 bucket prefix - optional: RedshiftUseractivityLog
    • S3 bucket error output prefix - optional:RedshiftUseractivityLog-error/
    • S3 バッファのヒントのバッファサイズ:5MB(デフォルト)
    • S3 バッファのヒントのバッファ間隔:300秒(デフォルト)
    • データレコードの圧縮: 無効 (CloudWatch Logs から Kinesis Data Firehose に送信されたデータは、すでに gzip(レベル6)で圧縮されているため、Kinesis Data Firehose 送信ストリーム内で圧縮する必要はありません。)
    • データレコードの暗号化: 無効
    • サーバー側の暗号化: 無効
    • Amazon CloudWatch エラーのログ記録: 有効
    • 許可:既存のロール使用する→作成済のKinesis Data Firehose 用のIAMロールを選択

  • IAM画面に移動し、前項で自動作成されたLambda関数用IAMロール(redshiftlog-kinesis-transform-role-xxxxxxx)を選択
  • 「許可」タブからカスタマーポリシーを選択し、以下のように firehose:PutRecordBatch 許可の部分を追記してください (your_account_id、作成したkinesis firehoseのarnを変更してお使いください)
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "logs:CreateLogGroup",
            "Resource": "arn:aws:logs:ap-northeast-1:your_account_id:*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": [
                "arn:aws:logs:ap-northeast-1:your_account_id:log-group:/aws/lambda/redshiftlog-kinesis-transform:*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "firehose:PutRecordBatch"
            ],
            "Resource": [
                "前項で作成したkinesis firehoseのarn"
            ]
        }
    ]
}
  • Cloudwatch画面に移動し、サブスクリプションフィルターを設定する
  • ロググループ「/aws/redshift/<Redshiftクラスター名>/useractivitylog」を選択し、「サブスクリプションフィルター」タブから「作成」→「Kinesis Firehoseサブスクリプションフィルターを作成」をクリック
    • 以下パラメータのように設定
    • Destination account:現在のアカウント
    • Kinesis Firehose delivery stream:作成した「RedshiftUserActivityLogStream」を選択
    • アクセス許可:作成したCloudwatchLogs用IAMロールを選択
    • ログの形式:その他
    • サブスクリプションフィルターのパターン:-"SELECT 1" -"SET statement_timeout TO" -"SET QUERY_GROUP TO 'health'” -”health_check” -”ConnectionCheckQuery” -”select pg_backend_pid()” ↑ハイフンからダブルクォーテーションまでを含めて正確に入力してください (Redshiftシステム側がrdsdbユーザーで一定周期で自動実行しているヘルスチェックのログレコードを収集対象から除外するために設定します)
    • サブスクリプションフィルター名:RedshiftUserActivityLogFilter

  • 約5分後以降、S3バケットにログストリームファイルが出力されていきます。

  • 最後にCloudwatchロググループの保持期間を「1ヶ月」に変更
    • ログ保管はS3バケットに寄せるため、Cloudwatchロググループ側はキャッシュデータ扱いとして保持期間を短く設定してコストを抑える

Connectionlog用のS3転送設定を構築

基本的な手順はUserActivitylog用の手順とまったく同じであるため割愛させていただきます。

以下に違いが出る設定パラメータのみを抜粋して記載します。

  • Kinesis Firehose
    • 配信ストリーム名:RedshiftConnectionLogStream
    • S3 bucket prefix - optional: RedshiftConnectionLog/
    • S3 bucket error output prefix - optional:RedshiftConnectionLog-error/
  • CloudWatchサブスクリプションフィルター
    • Kinesis Firehose delivery stream:作成した「RedshiftConnectionLogStream」を選択
    • サブスクリプションフィルターのパターン:-rdsdb
      • (Redshiftシステム側がrdsdbユーザーで一定周期で自動実行しているヘルスチェックのログレコードを収集対象から除外するために設定します)
    • サブスクリプションフィルター名:RedshiftConnectionLogFilter

監査ログ用の外部スキーマ、テーブル作成

S3バケットに収集されるようになったので、Redshift Spectrumで分析できるように外部テーブルを作成します。

  • Redshiftにログインする(CREATE DATABASEやCREATE SCHEMAが可能なユーザーでログイン)
  • Redshift監査ログ用の外部データベース(audit_logs)と外部スキーマ(auditLogSchema)を作成
    • (AmazonRedshiftRole-arnにはRedshift ServerlessクラスターにアタッチしているIAMロールを入力)
CREATE external SCHEMA auditlogschema
FROM data catalog
DATABASE audit_logs
iam_role 'AmazonRedshiftRole-arn'
REGION 'ap-northeast-1'
CREATE external DATABASE if not exists;
  • Connectionログ用の外部テーブルを作成
    • (bucket_nameをそれぞれ変更してお使いください)
        CREATE EXTERNAL TABLE auditlogschema.connections_log(
          event varchar(60),  recordtime varchar(60), 
          remotehost varchar(60),  remoteport varchar(60), 
          pid int,  dbname varchar(60), 
          username varchar(60),  authmethod varchar(60), 
          duration int,  sslversion varchar(60), 
          sslcipher varchar(150),  mtu int, 
          sslcompression varchar(70),  sslexpansion varchar(70), 
          iamauthguid varchar(50),  application_name varchar(300))
        ROW FORMAT DELIMITED 
          FIELDS TERMINATED BY '|' 
        STORED AS INPUTFORMAT 
          'org.apache.hadoop.mapred.TextInputFormat' 
        OUTPUTFORMAT 
          'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
        LOCATION 's3://bucket_name/RedshiftUseractivityLog/'
        ;

  • Useractivityログ用の外部テーブルを作成

    • (bucket_nameをそれぞれ変更してお使いください)
        create external table auditlogschema.user_activity_log(
            logrecord varchar(max)
        )
         ROW FORMAT DELIMITED 
          FIELDS TERMINATED BY '|' 
        STORED AS INPUTFORMAT 
          'org.apache.hadoop.mapred.TextInputFormat' 
        OUTPUTFORMAT 
          'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
        LOCATION 's3://bucket_name/RedshiftConnectionLog/'
        ;

最後に

以上、Redshift Serverlessの監査ログをRedshift Spectrumで分析したくてにいろいろがんばってみた話でした。

Redshift Serverlessの監査ログもS3バケットに直接出力できないのかなーと思いましたが、今後のRedshift監査ログの出力先はCloudWatch Logsを推奨されているようですね。

私はSQLでクエリ打って分析したいと思って今回の構成を作ってみましたが、ざっと閲覧するだけならログストリームにフィルタかけながら確認すればいいし、分析したいなら「Cloudwatch Insightでクエリ打って」ってことなのでしょうかねぇ。

参考