この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
小室@札幌です。札幌はすでに夜は15℃以下になる日も珍しくなくなりました。秋が高速で通り過ぎていきます!!もうちょっとゆっくりしていってくれても良いんですが。
はじめに
AmazonES(Amazon Elasticsearch Service)で膨大な数のデータの存在チェックをする必要があったので、それを確認するためのスクリプトを記述しました。
Elasticsearchには豊富なAPIが揃っているので、それを利用するだけなのですが、何にせよ対象が数万件と膨大でしたので、今回はPythonで存在が確認できなかったデータのみを抽出するスクリプトを作成しました。
ElasticsearchのAPI
存在を確認するexistsというAPIがあったのですが、以下の理由から見送りました。
- 非推奨APIになっている。Elasticsearch Reference [2.4] » Search APIs » Search Exists API
- 単に存在を確認したいだけでなく、一件だけ登録されていることを確認したい(1件以上登録されてるのはエラー)
今回は上記の理由から Elasticsearch Reference [2.4] » Search APIs » Count APIを利用しました。
データの構造
Elasticsearchのデータ構造はこんな感じになっています。基本的にサーバーアプリケーションの変更ログで出力されたJsonを格納しています。 _source
以下の内容がサーバー側から送信されてくるJson本体になります。
{
"_index": "modify-log",
"_type": "modify-log",
"_id": "e04fc495-537c-4060-a1f0-257150c20035",
"_score": null,
"_source": {
"timestamp": "2016-07-25T23:59:49.245+0900",
"request_id": "e04fc495-537c-4060-a1f0-257150c20035",
"client_id": "xxxxc",
"hash_id": "HAAAAAAAAASH_IDDDDDDDDDDD",
"event_type": "EventName",
"details": {
"username": "Deleted"
},
"host": "ip-12-123-456-999"
},
"fields": {
"_ttl": 63535207406,
"timestamp": [
1469458789245
]
},
"sort": [
1469458789245
]
}
request_id
が必ず一意になるように生成されているため、ESの _id
もこちらの値を適用し、データが一意で登録できるようにしています。
Pythonスクリプト
処理内容
- 判定対象の
request_id
のリストをテキストファイルを準備しておく - ファイルを読み込み、順番に
request_id
を取得する - AmazonESのエンドポイントを指定したCount APIのURLの検索引数に
request_id
を付与してアクセスする - レスポンスJSONをパースし、指定した
request_id
に該当するデータが何件登録されているかを調べる - 1件以外であれば、エラーとしてエラーファイルに出力する
- 以降リストファイルが終了するまでループ
以下のような環境で動作することを確認しています。
動作環境
- OS: Mac OSX 10.10.5 及び Amazon Linux AMI release 2016.03
- Python: Python 3.4.3
コード
#!/usr/bin/python
# coding: UTF-8
from __future__ import print_function # Python 2/3 compatibility
import json
import sys
import time
import requests
argvs = sys.argv
argc = len(argvs)
# 引数でテキストファイルを指定
if (argc != 2):
print('Input text file')
quit()
# 引数で指定されたrequestIdリストファイル
readFile = open(argvs[1], 'r')
# エラー書き込み用ファイル
file = open('error_requestId.txt', 'w')
for i, requestId in enumerate(readFile):
r = requests.get(
'http:///modify-log/_count?q=request_id:%s' % requestId)
print(r.text)
print('now processing %d' % i)
data = json.loads(r.text)
if (int(data['count']) != 1):
file.write("%s\t%d\n" % (requestId, int(data['count'])))
time.sleep(0.2)
file.close()
外部ライブラリとして、URLへアクセスするために Requests を利用しています。
何件目の処理をしているのかを知りたかったため、 enumerate
で実行カウントを取れるようにしています。
for i, requestId in enumerate(readFile):
さらに特に不要かと思ったのですが、念の為sleepを0.2秒付与しています。これにより、およそ38000件のデータは 38000 * (1/5) = 7600 sec = 126 min
2時間程度で全ての確認が終わる計算です。
実行結果について
上記スクリプトを実行すると以下の様なコンソール表示が出力されます。
$ python elastic_search.py exists2.txt
{"count":1,"_shards":{"total":5,"successful":5,"failed":0}}
now processing 0
{"count":1,"_shards":{"total":5,"successful":5,"failed":0}}
now processing 1
{"count":1,"_shards":{"total":5,"successful":5,"failed":0}}
now processing 2
{"count":1,"_shards":{"total":5,"successful":5,"failed":0}}
now processing 3
{"count":1,"_shards":{"total":5,"successful":5,"failed":0}}
now processing 4
あとは終わるのをじーっと待つのみです。 count : 1
以外のものはエラーファイルとして出力されます。
エラーファイルについて
エラーが発生した requestId
についてはデータの件数とともに格納しました。
"4d937ff9-4ab4-4ff2-a5d4-0b29cb518ba0" 0
最終的に、このファイルに出力されるものが存在しなければ、全てのデータは正常に1件ずつ登録されているということが確認できます。
まとめ
ElasticsearchをKibana以外で初めて利用しましたが、APIが豊富でとても便利な印象です。ただ、データの構造(特に _id 体系)をうまく決めてやらないと同じデータが複数登録されてしまうなど、なかなか経験や事前の設計がモノを言うサービスだなーと感じました。
またAWSの操作にかぎらず、こういった大量データを確認する単純作業などはLL言語を一つ覚えておくと良いですね。自分は今後Python中心に実装してみたいと思います。今更ですが、Pythonはなかなか直感的に書けて楽しいです。