
RDS for MySQLのスロークエリーログをAWS LambdaでElasticsearchに取り込む
はじめに
藤本です。
Elasticseachに取り込むネタが続いています。 前回のELBのアクセスログをAWS LambdaでElasticsearchに取り込むに続いて、今回はRDS for MySQLのスロークエリログをElasticsearchに取り込む実装をご紹介します。
概要
MySQL Serverはスロークエリーログにより指定した秒数を超えるクエリを記録することができます。スロークエリログはパフォーマンス劣化の解析、クエリの適切性、DBのマシンパワーの適切性のチェックに役立ちます。RDS for MySQLも例外ではありません。パラメータグループを設定することにより、スロークエリログを有効にすることができます。それに加えてRDS for MySQLの場合、AWSのAPIによりデータベースに接続せずともスロークエリログを取得することができます。
スロークエリログフォーマット
# Time: <ログ出力時間> # User@Host: <ユーザー名> @ <接続元IP> Id: <接続元識別子> # Query_time: <処理実行時間> Lock_time: <ロック発生時間> Rows_sent: <送信された件数> Rows_examined: <処理された件数> <SQL文>
スロークエリログサンプル
# Time: 151120 23:58:31 # User@Host: root[root] @ [10.232.10.32] Id: 10932 # Query_time: 36.467637 Lock_time: 0.000213 Rows_sent: 0 Rows_examined: 0 use db; SET timestamp=1448063911; LOAD DATA LOW_PRIORITY LOCAL INFILE '/tmp/test.csv' INTO TABLE items
今回はAWS Lambdaのスケジュールイベントにより一時間に一回、RDS for MySQLからスロークエリログをダウンロードし、加工し、Amazon Elasticsearch Serviceに取り込みます。
設定は以下のような流れとなります。
- 事前準備
- RDS for MySQLのスロークエリログ出力設定
- Amazon Elasticsearch Serverce作成
- Amazon Elasticsearch Service設定
- Index Template作成
- AWS Lambda Funcion作成
- Lambda Function作成
- Event sources作成
- 動作確認
- ログ取り込み
- Kibanaによる可視化
環境
やってみた
事前準備
1. RDS for MySQLのスロークエリログ出力設定
DBパラメータグループの「slow_query_log」を「1」に設定してください。
2. Amazon Elasticsearch Serverce作成
Amazon Elasticsearch Serviceのドメイン作成は[新機能]Amazon Elasticsearch Serviceがリリースされました!を参照。
Amazon Elasticsearch Service設定
3. Index Template作成
RDS for MySQLのスロークエリログ用にIndex Templateを作成します。今回、MySQLのスロークエリログから取得したい情報を基にMappingのTemplateを作成します。
- template : スロークエリログが投入されるインデックスの命名規則を指定します。今回はslowquerylog-YYYYMMDDという名前でINDEXを作成します。
- mappings : タイプ名を指定します。今回はRDSのIDでタイプを作成します。
- timestamp : スロークエリログが出力された日時。date型。
- user : DBの接続したユーザー名。
- client : DBへ接続したクライアントのIPアドレス。
- client_id : DBへ接続したクライアントの識別子。
- query_time : クエリの実行時間
- lock_time : クエリによりロックが発生した時間
- rows_sent : クエリで送信された行数
- rows_examined : クエリの処理対象となった行数
- sql : SQL文
curl -XPUT 'http://search-******************.ap-northeast-1.es.amazonaws.com/_template/slowquerylog' -d '{ "template": "slowquerylog-*", "mappings": { "": { "properties": { "timestamp": { "type": "date" }, "user": { "type": "string", "index": "not_analyzed" }, "client": { "type": "ip" }, "client_id": { "type": "integer" }, "query_time": { "type": "float" }, "lock_time": { "type": "float" }, "rows_examined": { "type": "integer" }, "rows_sent": { "type": "integer" }, "sql": { "type": "string", "index": "not_analyzed" } } } } }'
AWS Lambda Funcion作成
4. Lambda Function作成
PythonのLambda Functionを作成します。今回は標準ライブラリのみで実装していますので、ソースコードをベタ貼りすれば大丈夫です。ソースコードはGistにアップロードしました。実行時間をデフォルトの3秒から伸ばしてください。今回は100件以下のログ数でしたので2秒で完了しています。RoleはRDSへのリード権限、Amazon Elasticsearch ServiceのESHttpPost権限を割り当てたIAM Roleを設定してください。Memoryはスロークエリログファイルのサイズが大きい場合は合わせて増やしましょう。
現在は4つのパラメータを設定可能です。コメントアウトで括られた変数に設定してください。
- ES_HOST
- 接続先となるElasticsearchのホスト名、IPアドレス
- INDEX_PREFIX
- ログを格納するINDEX名のプレフィックス
- INDEX名は「INDEX_PREFIX-YYYYMMDD」の形式
- RDS_ID
- スロークエリログを取得したいRDSのIDを指定
- RDSからのログ取得、Elasticsearchのタイプに利用
- TIMEZONE
- RDSのデフォルトTIMEZONEはUTC
- Elasticsearchでのログデータのタイムゾーンを変換したい場合、変換したいタイムゾーンを指定(ex.JSTの場合、Asia/Tokyo)
- 変換しない場合はブランク
- 各種ログとタイムゾーンを合わせることをオヌヌメ
################################################## | |
### Elasticsearch host name | |
ES_HOST = "search-*******************.ap-northeast-1.es.amazonaws.com" | |
### Elasticsearch prefix for index name | |
INDEX_PREFIX = "slowquerylog" | |
### Elasticsearch type name is rds instance id | |
RDS_ID = "<RDS_INSTANCE_IDENTIFY>" | |
### Enabled to change timezone. If you set UTC, this parameter is blank | |
TIMEZONE = "Asia/Tokyo" | |
################################################# | |
### Query time format regex | |
TIME_REGEX = "^[a-zA-Z#:_ ]+([0-9.]+)[a-zA-Z:_ ]+([0-9.]+)[a-zA-Z:_ ]+([0-9.]+).[a-zA-Z:_ ]+([0-9.]+)$" | |
### Exclude noise string | |
NOISE = [ | |
"/rdsdbbin/mysql/bin/mysqld, Version: 5.6.21-log (MySQL Community Server (GPL)). started with:", | |
"Tcp port: 3306 Unix socket: /tmp/mysql.sock", | |
"Time Id Command Argument" | |
] | |
################################################# | |
import boto3 | |
import re | |
import os | |
import json | |
from datetime import datetime | |
from dateutil import tz, zoneinfo | |
from botocore.awsrequest import AWSRequest | |
from botocore.auth import SigV4Auth | |
from botocore.endpoint import PreserveAuthSession | |
from botocore.credentials import Credentials | |
R = re.compile(TIME_REGEX) | |
NOW = datetime.now() | |
INDEX = INDEX_PREFIX + "-" + datetime.strftime(NOW, "%Y%m%d") | |
TYPE = RDS_ID | |
SLOWQUERYLOG_PREFIX = "slowquery/mysql-slowquery.log." | |
def lambda_handler(event, context): | |
client = boto3.client("rds") | |
db_files = client.describe_db_log_files(DBInstanceIdentifier=RDS_ID) | |
log_filename = SLOWQUERYLOG_PREFIX + str(NOW.hour) | |
if not filter(lambda log: log["LogFileName"] == log_filename, db_files["DescribeDBLogFiles"]): | |
return | |
body = client.download_db_log_file_portion( | |
DBInstanceIdentifier=RDS_ID, | |
LogFileName=log_filename | |
)["LogFileData"] | |
data = "" | |
doc = {} | |
for line in body.split("\n"): | |
if not line or line in NOISE: | |
continue | |
elif line.startswith("# Time: "): | |
if doc: | |
data += '{"index":{"_index":"' + INDEX + '","_type":"' + TYPE + '"}}\n' | |
data += json.dumps(doc) + "\n" | |
if len(data) > 1000000: | |
_bulk(ES_HOST, data) | |
data = "" | |
timestamp = datetime.strptime(line[8:], "%y%m%d %H:%M:%S") | |
if TIMEZONE: | |
timestamp = timestamp.replace(tzinfo=tz.tzutc()).astimezone(zoneinfo.gettz("Asia/Tokyo")) | |
doc = {"timestamp": timestamp.isoformat()} | |
elif line.startswith("# User@Host: "): | |
doc["user"] = line.split("[")[1].split("]")[0] | |
doc["client"] = line.split("[")[2].split("]")[0] | |
doc["client_id"] = line.split(" Id: ")[1] | |
elif line.startswith("# Query_time: "): | |
match = R.match(line).groups(0) | |
doc["query_time"] = match[0] | |
doc["lock_time"] = match[1] | |
doc["rows_sent"] = match[2] | |
doc["rows_examined"] = match[3] | |
else: | |
if doc.get("sql"): | |
doc["sql"] += "\n" + line | |
else: | |
doc["sql"] = line | |
if doc: | |
data += '{"index":{"_index":"' + INDEX + '","_type":"' + TYPE + '"}}\n' | |
data += json.dumps(doc) + "\n" | |
_bulk(ES_HOST, data) | |
def _bulk(host, doc): | |
credentials = _get_credentials() | |
url = _create_url(host, "/_bulk") | |
response = es_request(url, "POST", credentials, data=doc) | |
if not response.ok: | |
print response.text | |
def _get_credentials(): | |
return Credentials( | |
os.environ["AWS_ACCESS_KEY_ID"], | |
os.environ["AWS_SECRET_ACCESS_KEY"], | |
os.environ["AWS_SESSION_TOKEN"]) | |
def _create_url(host, path, ssl=False): | |
if not path.startswith("/"): | |
path = "/" + path | |
if ssl: | |
return "https://" + host + path | |
else: | |
return "http://" + host + path | |
def request(url, method, credentials, service_name, region=None, headers=None, data=None): | |
if not region: | |
region = os.environ["AWS_REGION"] | |
aws_request = AWSRequest(url=url, method=method, headers=headers, data=data) | |
SigV4Auth(credentials, service_name, region).add_auth(aws_request) | |
return PreserveAuthSession().send(aws_request.prepare()) | |
def es_request(url, method, credentials, region=None, headers=None, data=None): | |
return request(url, method, credentials, "es", region, headers, data) |
5. Event sources作成
Lambda Functionに対してEvent sourcesを作成します。RDS for MySQLは一時間おきにログファイルをローテーションします。合わせて、Event sourcesの種類は一時間に一回のスケジューリング実行とします。過去のログファイルを眺めていると、0分ジャストではなく、次の時間の5分のログも確認出来たので、スケジューリング実行もずらして実行することが良さそうです。
ログ取得にMarkerやNumberOfLinesを利用すれば、5分おきの取り込みが可能となるのかもしれません。
動作確認
ログ取り込み
今回、弊社の某WEBのバックエンドで動作するMySQLのログを収集しました。
。。。3時間ほど放置しました。 Elasticsearchの状況を確認してみましょう。
データの内容は後で確認するとして、データ数が172件、スキーマも適用されています。
Kibanaによる可視化
Kibanaでグラフ化してみましょう。
時間単位のスロークエリログに残されたQuery_Timeを見てみましょう。
定期的に7分近くのクエリが発行されていることが分かります。 マウスポインタを点に合わせるとクエリにかかった時間、そのSQLが表示されます。(画像を載せたかったのですが、キャプチャする際に消えてしまう。。)
まとめ
いかがでしょうか? ELBの時と処理フローは変わりません。 イベントによる発火はAWS Lambdaが、ETLのExtractは各種AWSサービスのAPIが、LoadはElasticsearchのAPIが用意されています。実装者はログファイルがどのようなフォーマットであるか理解し、どのようにして必要なデータを取り出し、JSON形式に変換することができるかというETL処理のTransformに注力すればいいだけです。 集中したいところに集中できる!まさにこれに尽きますね。