サーバーレスでAuroraの実行中クエリをCloudWatch Logsへ出力する

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

大栗です。

RDSで障害が発生する一因として、想定外に長時間実行しているクエリが邪魔になっていることがあります。RDS for MySQLやAuroraの場合はSlow Queryで長時間実行されたクエリは分かりますが、実行が完了したものしか出力されないため現在動いているものはshow processlist;を実行して実行中のクエリを確認する必要があります。監視システムで定期的にRDSにログインして実行すれば良いのですが、面倒なのでEC2を使わずに実装してみました。

全体概要

Auroraの場合は、ストアドプロシージャ経由でLambdaを実行できます。この機能とMySQLが持っているスケジュール機能を利用します。

Aurora processlist to CloudWatch Logs

  1. AuroraのEvent Schedulerでinformation_schema.processlistの内容を取得してlambda_asyncを呼ぶ
  2. LambdaでPROCESSLISTの内容をCloudWatch Logsへ出力する

実装

  1. Auroraの設定
  2. データをLambdaに送るプロシージャの作成
  3. Lambdaの作成
  4. イベントの作成

Auroraの設定

Aurora用のパラメータグループを作成します。

クラスタパラメータグループ

  1. Aurora用のIAM Role
  2. Auroraを起動してIAM Roleを設定する
  3. クラスメタパラメターグループのaws_default_lambda_roleにIAM RoleのARNを設定する

手順の詳細は以下の記事を参考にして下さい。

【新機能】Amazon AuroraからLambdaを呼べるようになりました

データをLambdaに送るプロシージャの作成

DBインスタンス名とinformation_schema.processlistのデータを引数としてmysql.lambda_asyncを呼び出すストアドプロシージャを作成します。

processlist_to_cwlogs.sql

DROP PROCEDURE IF EXISTS processlist_to_cwlogs;
DELIMITER $$
CREATE PROCEDURE processlist_to_cwlogs ()
BEGIN
  DECLARE done INT;
  DECLARE myData VARCHAR(65535);
  DECLARE list_processes CURSOR FOR SELECT 
      concat(
        'ID:',      id, '\\t',
        'USER:',    user, '\\t',
        'HOST:',    host, '\\t',
        'DB:',      ifnull(db, ''), '\\t',
        'COMMAND:', command, '\\t',
        'TIME:',    time, '\\t',
        'STATE:',   ifnull(state, ''), '\\t',
        'INFO:',    to_base64(substring(ifnull(info, ''), 1, @processlist_length))
      ) AS data
    FROM information_schema.processlist
    ORDER BY time
    LIMIT 10;

  /* Loop Event Handler */
  DECLARE EXIT HANDLER FOR NOT FOUND SET done = 0;

  SELECT SUBSTRING_INDEX(SUBSTRING_INDEX(@@init_connect, ',', -3),',',1) INTO @processlist_lambda_arn;
  SELECT SUBSTRING_INDEX(SUBSTRING_INDEX(@@init_connect, ',', -2),',',1) INTO @processlist_length;
  SELECT SUBSTRING_INDEX(SUBSTRING_INDEX(@@init_connect, ',', -1),',',1) INTO @processlist_count;
  SELECT unix_timestamp(now()) INTO @datetime;

  SELECT variable_value INTO @server_name 
  FROM information_schema.global_variables 
  WHERE variable_name = 'aurora_server_id';

  SET @pos    = 0; 
  SET done = 1;
  OPEN list_processes;
  WHILE done OR @processlist_count > @pos DO

    FETCH list_processes INTO myData;
    SELECT @processlist_lambda_arn;
    SELECT concat('{\"dbInstanceName\" : \"', @server_name, '\", \"logData\" : \"', myData, '\"}');

    CALL mysql.lambda_async (
      @processlist_lambda_arn,
      concat('{\"dbInstanceName\" : \"', @server_name, '\", \"logData\" : \"', myData, '\"}')
    );

  END WHILE;
  CLOSE list_processes;
END
$$
DELIMITER ;

プロシージャで使用するLambdaのARN(@processlist_lambda_arn)、出力するクエリ内容の最大文字数(@processlist_length)、出力する最大行数(@processlist_count)はinit_connectに記載します。information_schema.processlistの内容はLTSVにしています。ストアドプロシージャ上でINFOの内容をエスケープするのが難しかったのでBASE64エンコードしています。

パラメータグループではinit_connectを以下のようにコメントとして記述し、event_schedulerを有効にしています。

パラメータ名 内容
init_connect -- ,arn:aws:lambda:ap-northeast-1:123456789012:function:aurora_logs,10,4096
event_scheduler ON

Lambdaの作成

Lambdaファンクションは、pythonで以下のように作成してみました。ltsvモジュールはLambda標準では使用できないため注意してください。なお、普段shell、ansible、CFn以外書かないので不細工なコードだと思いますが気にしないでください。。。

from __future__ import print_function

import json
import sys
import datetime
import ltsv
import boto3

if sys.version_info[0] == 3:
from io import StringIO
else:
from cStringIO import StringIO

client = None

def lambda_handler(event, context):

global client
if client is None:
client = boto3.client('logs')

log_group_name = "RDS-Processlist"

print("dbInstanceName = " + event['dbInstanceName'])
print("logData = " + event['logData'])

parse_data = dict(next(ltsv.reader(StringIO(event['logData']))))

parse_data['INFO'] = parse_data['INFO'].decode("base64")

log_group_exist = False
group = client.describe_log_groups(
logGroupNamePrefix=log_group_name)
for elem in group['logGroups']:
if elem['logGroupName'] == log_group_name:
log_group_exist = True
break

sequence_token = ''
if log_group_exist:
stream = client.describe_log_streams(
logGroupName=log_group_name,
logStreamNamePrefix=event['dbInstanceName'])
for elem in stream['logStreams']:
if elem['logStreamName'] == event['dbInstanceName']:
sequence_token = elem['uploadSequenceToken']
break
else:
response = client.create_log_group(logGroupName=log_group_name)

if sequence_token == '':
response = client.create_log_stream(
logGroupName=log_group_name,
logStreamName=event['dbInstanceName']
)
response = client.put_log_events(
logGroupName=log_group_name,
logStreamName=event['dbInstanceName'],
logEvents=[
{
'timestamp' : int(datetime.datetime.now().strftime('%s')) * 1000,
'message' : json.dumps(parse_data)
}
])
else:
response = client.put_log_events(
logGroupName=log_group_name,
logStreamName=event['dbInstanceName'],
logEvents=[
{
'timestamp' : int(datetime.datetime.now().strftime('%s')) * 1000,
'message' : json.dumps(parse_data)
}
],
sequenceToken=sequence_token)

return event['logData']

CloudWatch Logsのロググループ名はRDS-Processlist、ログストリーム名はDBインスタンス名として出力しています。

Lambdaはこの記事を参考にlambda-uploaderでデプロイしました。

ここでAuroraにログインして、processlist_to_cwlogsを実行してCloudWatch Logsに出力されることを確認しましょう。

call processlist_to_cwlogs();

CloudWatch_Management_Console

イベントの作成

以下のように1分毎にprocesslist_to_cwlogsを実行するイベントを作成します。

CREATE EVENT processlist_to_cwlogs_event
ON SCHEDULE EVERY 1 MINUTE
COMMENT 'Output processlist to CloudWatch Logs'
DO
call processlist_to_cwlogs();

毎分information_schema.processlistの内容がCloudWatch Logsに出力されます。

CloudWatch_Management_Console

注意点

AuroraではEvent Schedulerが動作するのはWrite(Master)だけです。フェイルオーバーしてReaderになるとEvent Schedulerが停止します。

さいごに

RDBMSから外部リソースを呼ぶ機能は多用するとデータ連携の流れが分かりにくくなります。そのため本記事では機能要件ではない監視用途で利用してみました。RDSは拡張モニタリングで様々な情報を監視することができますが、それに当てはまらない監視項目はLambda経由で出力する方法も検討してはいかがでしょうか。