RDS for MySQLのスロークエリーログをAWS LambdaでElasticsearchに取り込む

RDS for MySQLのスロークエリーログをAWS LambdaでElasticsearchに取り込む

Clock Icon2015.11.23

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

藤本です。

Elasticseachに取り込むネタが続いています。 前回のELBのアクセスログをAWS LambdaでElasticsearchに取り込むに続いて、今回はRDS for MySQLのスロークエリログをElasticsearchに取り込む実装をご紹介します。

概要

MySQL Serverはスロークエリーログにより指定した秒数を超えるクエリを記録することができます。スロークエリログはパフォーマンス劣化の解析、クエリの適切性、DBのマシンパワーの適切性のチェックに役立ちます。RDS for MySQLも例外ではありません。パラメータグループを設定することにより、スロークエリログを有効にすることができます。それに加えてRDS for MySQLの場合、AWSのAPIによりデータベースに接続せずともスロークエリログを取得することができます。

スロークエリログフォーマット

# Time: <ログ出力時間>
# User@Host: <ユーザー名> @ <接続元IP> Id: <接続元識別子>
# Query_time: <処理実行時間> Lock_time: <ロック発生時間> Rows_sent: <送信された件数> Rows_examined: <処理された件数>
<SQL文>

スロークエリログサンプル

# Time: 151120 23:58:31
# User@Host: root[root] @ [10.232.10.32] Id: 10932
# Query_time: 36.467637 Lock_time: 0.000213 Rows_sent: 0 Rows_examined: 0
use db;
SET timestamp=1448063911;
LOAD DATA LOW_PRIORITY LOCAL INFILE '/tmp/test.csv'
INTO TABLE items

今回はAWS Lambdaのスケジュールイベントにより一時間に一回、RDS for MySQLからスロークエリログをダウンロードし、加工し、Amazon Elasticsearch Serviceに取り込みます。

設定は以下のような流れとなります。

  • 事前準備
  • RDS for MySQLのスロークエリログ出力設定
  • Amazon Elasticsearch Serverce作成
  • Amazon Elasticsearch Service設定
  • Index Template作成
  • AWS Lambda Funcion作成
  • Lambda Function作成
  • Event sources作成
  • 動作確認
  • ログ取り込み
  • Kibanaによる可視化

環境

RDS-ES

やってみた

事前準備

1. RDS for MySQLのスロークエリログ出力設定

DBパラメータグループの「slow_query_log」を「1」に設定してください。

2. Amazon Elasticsearch Serverce作成

Amazon Elasticsearch Serviceのドメイン作成は[新機能]Amazon Elasticsearch Serviceがリリースされました!を参照。

Amazon Elasticsearch Service設定

3. Index Template作成

RDS for MySQLのスロークエリログ用にIndex Templateを作成します。今回、MySQLのスロークエリログから取得したい情報を基にMappingのTemplateを作成します。

  • template : スロークエリログが投入されるインデックスの命名規則を指定します。今回はslowquerylog-YYYYMMDDという名前でINDEXを作成します。
  • mappings : タイプ名を指定します。今回はRDSのIDでタイプを作成します。
  • timestamp : スロークエリログが出力された日時。date型。
  • user : DBの接続したユーザー名。
  • client : DBへ接続したクライアントのIPアドレス。
  • client_id : DBへ接続したクライアントの識別子。
  • query_time : クエリの実行時間
  • lock_time : クエリによりロックが発生した時間
  • rows_sent : クエリで送信された行数
  • rows_examined : クエリの処理対象となった行数
  • sql : SQL文
curl -XPUT 'http://search-******************.ap-northeast-1.es.amazonaws.com/_template/slowquerylog' -d '{
"template": "slowquerylog-*",
"mappings": {
"": {
"properties": {
"timestamp": { "type": "date" },
"user": { "type": "string", "index": "not_analyzed" },
"client": { "type": "ip" },
"client_id": { "type": "integer" },
"query_time": { "type": "float" },
"lock_time": { "type": "float" },
"rows_examined": { "type": "integer" },
"rows_sent": { "type": "integer" },
"sql": { "type": "string", "index": "not_analyzed" }
}
}
}
}'

AWS Lambda Funcion作成

4. Lambda Function作成

PythonのLambda Functionを作成します。今回は標準ライブラリのみで実装していますので、ソースコードをベタ貼りすれば大丈夫です。ソースコードはGistにアップロードしました。実行時間をデフォルトの3秒から伸ばしてください。今回は100件以下のログ数でしたので2秒で完了しています。RoleはRDSへのリード権限、Amazon Elasticsearch ServiceのESHttpPost権限を割り当てたIAM Roleを設定してください。Memoryはスロークエリログファイルのサイズが大きい場合は合わせて増やしましょう。

現在は4つのパラメータを設定可能です。コメントアウトで括られた変数に設定してください。

  • ES_HOST
  • 接続先となるElasticsearchのホスト名、IPアドレス
  • INDEX_PREFIX
  • ログを格納するINDEX名のプレフィックス
  • INDEX名は「INDEX_PREFIX-YYYYMMDD」の形式
  • RDS_ID
  • スロークエリログを取得したいRDSのIDを指定
  • RDSからのログ取得、Elasticsearchのタイプに利用
  • TIMEZONE
  • RDSのデフォルトTIMEZONEはUTC
  • Elasticsearchでのログデータのタイムゾーンを変換したい場合、変換したいタイムゾーンを指定(ex.JSTの場合、Asia/Tokyo)
  • 変換しない場合はブランク
  • 各種ログとタイムゾーンを合わせることをオヌヌメ

##################################################
### Elasticsearch host name
ES_HOST = "search-*******************.ap-northeast-1.es.amazonaws.com"
### Elasticsearch prefix for index name
INDEX_PREFIX = "slowquerylog"
### Elasticsearch type name is rds instance id
RDS_ID = "<RDS_INSTANCE_IDENTIFY>"
### Enabled to change timezone. If you set UTC, this parameter is blank
TIMEZONE = "Asia/Tokyo"
#################################################
### Query time format regex
TIME_REGEX = "^[a-zA-Z#:_ ]+([0-9.]+)[a-zA-Z:_ ]+([0-9.]+)[a-zA-Z:_ ]+([0-9.]+).[a-zA-Z:_ ]+([0-9.]+)$"
### Exclude noise string
NOISE = [
"/rdsdbbin/mysql/bin/mysqld, Version: 5.6.21-log (MySQL Community Server (GPL)). started with:",
"Tcp port: 3306 Unix socket: /tmp/mysql.sock",
"Time Id Command Argument"
]
#################################################
import boto3
import re
import os
import json
from datetime import datetime
from dateutil import tz, zoneinfo
from botocore.awsrequest import AWSRequest
from botocore.auth import SigV4Auth
from botocore.endpoint import PreserveAuthSession
from botocore.credentials import Credentials
R = re.compile(TIME_REGEX)
NOW = datetime.now()
INDEX = INDEX_PREFIX + "-" + datetime.strftime(NOW, "%Y%m%d")
TYPE = RDS_ID
SLOWQUERYLOG_PREFIX = "slowquery/mysql-slowquery.log."
def lambda_handler(event, context):
client = boto3.client("rds")
db_files = client.describe_db_log_files(DBInstanceIdentifier=RDS_ID)
log_filename = SLOWQUERYLOG_PREFIX + str(NOW.hour)
if not filter(lambda log: log["LogFileName"] == log_filename, db_files["DescribeDBLogFiles"]):
return
body = client.download_db_log_file_portion(
DBInstanceIdentifier=RDS_ID,
LogFileName=log_filename
)["LogFileData"]
data = ""
doc = {}
for line in body.split("\n"):
if not line or line in NOISE:
continue
elif line.startswith("# Time: "):
if doc:
data += '{"index":{"_index":"' + INDEX + '","_type":"' + TYPE + '"}}\n'
data += json.dumps(doc) + "\n"
if len(data) > 1000000:
_bulk(ES_HOST, data)
data = ""
timestamp = datetime.strptime(line[8:], "%y%m%d %H:%M:%S")
if TIMEZONE:
timestamp = timestamp.replace(tzinfo=tz.tzutc()).astimezone(zoneinfo.gettz("Asia/Tokyo"))
doc = {"timestamp": timestamp.isoformat()}
elif line.startswith("# User@Host: "):
doc["user"] = line.split("[")[1].split("]")[0]
doc["client"] = line.split("[")[2].split("]")[0]
doc["client_id"] = line.split(" Id: ")[1]
elif line.startswith("# Query_time: "):
match = R.match(line).groups(0)
doc["query_time"] = match[0]
doc["lock_time"] = match[1]
doc["rows_sent"] = match[2]
doc["rows_examined"] = match[3]
else:
if doc.get("sql"):
doc["sql"] += "\n" + line
else:
doc["sql"] = line
if doc:
data += '{"index":{"_index":"' + INDEX + '","_type":"' + TYPE + '"}}\n'
data += json.dumps(doc) + "\n"
_bulk(ES_HOST, data)
def _bulk(host, doc):
credentials = _get_credentials()
url = _create_url(host, "/_bulk")
response = es_request(url, "POST", credentials, data=doc)
if not response.ok:
print response.text
def _get_credentials():
return Credentials(
os.environ["AWS_ACCESS_KEY_ID"],
os.environ["AWS_SECRET_ACCESS_KEY"],
os.environ["AWS_SESSION_TOKEN"])
def _create_url(host, path, ssl=False):
if not path.startswith("/"):
path = "/" + path
if ssl:
return "https://" + host + path
else:
return "http://" + host + path
def request(url, method, credentials, service_name, region=None, headers=None, data=None):
if not region:
region = os.environ["AWS_REGION"]
aws_request = AWSRequest(url=url, method=method, headers=headers, data=data)
SigV4Auth(credentials, service_name, region).add_auth(aws_request)
return PreserveAuthSession().send(aws_request.prepare())
def es_request(url, method, credentials, region=None, headers=None, data=None):
return request(url, method, credentials, "es", region, headers, data)

5. Event sources作成

Lambda Functionに対してEvent sourcesを作成します。RDS for MySQLは一時間おきにログファイルをローテーションします。合わせて、Event sourcesの種類は一時間に一回のスケジューリング実行とします。過去のログファイルを眺めていると、0分ジャストではなく、次の時間の5分のログも確認出来たので、スケジューリング実行もずらして実行することが良さそうです。

ログ取得にMarkerやNumberOfLinesを利用すれば、5分おきの取り込みが可能となるのかもしれません。

動作確認

ログ取り込み

今回、弊社の某WEBのバックエンドで動作するMySQLのログを収集しました。

。。。3時間ほど放置しました。 Elasticsearchの状況を確認してみましょう。

Amazon_Elasticsearch_Service_Management_Console

データの内容は後で確認するとして、データ数が172件、スキーマも適用されています。

Kibanaによる可視化

Kibanaでグラフ化してみましょう。

時間単位のスロークエリログに残されたQuery_Timeを見てみましょう。

Visualize_-_Kibana_4

定期的に7分近くのクエリが発行されていることが分かります。 マウスポインタを点に合わせるとクエリにかかった時間、そのSQLが表示されます。(画像を載せたかったのですが、キャプチャする際に消えてしまう。。)

まとめ

いかがでしょうか? ELBの時と処理フローは変わりません。 イベントによる発火はAWS Lambdaが、ETLのExtractは各種AWSサービスのAPIが、LoadはElasticsearchのAPIが用意されています。実装者はログファイルがどのようなフォーマットであるか理解し、どのようにして必要なデータを取り出し、JSON形式に変換することができるかというETL処理のTransformに注力すればいいだけです。 集中したいところに集中できる!まさにこれに尽きますね。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.