[小ネタ][Python] AmazonESでデータの存在確認をする
小室@札幌です。札幌はすでに夜は15℃以下になる日も珍しくなくなりました。秋が高速で通り過ぎていきます!!もうちょっとゆっくりしていってくれても良いんですが。
はじめに
AmazonES(Amazon Elasticsearch Service)で膨大な数のデータの存在チェックをする必要があったので、それを確認するためのスクリプトを記述しました。
Elasticsearchには豊富なAPIが揃っているので、それを利用するだけなのですが、何にせよ対象が数万件と膨大でしたので、今回はPythonで存在が確認できなかったデータのみを抽出するスクリプトを作成しました。
ElasticsearchのAPI
存在を確認するexistsというAPIがあったのですが、以下の理由から見送りました。
- 非推奨APIになっている。Elasticsearch Reference [2.4] » Search APIs » Search Exists API
- 単に存在を確認したいだけでなく、一件だけ登録されていることを確認したい(1件以上登録されてるのはエラー)
今回は上記の理由から Elasticsearch Reference [2.4] » Search APIs » Count APIを利用しました。
データの構造
Elasticsearchのデータ構造はこんな感じになっています。基本的にサーバーアプリケーションの変更ログで出力されたJsonを格納しています。 _source
以下の内容がサーバー側から送信されてくるJson本体になります。
{ "_index": "modify-log", "_type": "modify-log", "_id": "e04fc495-537c-4060-a1f0-257150c20035", "_score": null, "_source": { "timestamp": "2016-07-25T23:59:49.245+0900", "request_id": "e04fc495-537c-4060-a1f0-257150c20035", "client_id": "xxxxc", "hash_id": "HAAAAAAAAASH_IDDDDDDDDDDD", "event_type": "EventName", "details": { "username": "Deleted" }, "host": "ip-12-123-456-999" }, "fields": { "_ttl": 63535207406, "timestamp": [ 1469458789245 ] }, "sort": [ 1469458789245 ] }
request_id
が必ず一意になるように生成されているため、ESの _id
もこちらの値を適用し、データが一意で登録できるようにしています。
Pythonスクリプト
処理内容
- 判定対象の
request_id
のリストをテキストファイルを準備しておく - ファイルを読み込み、順番に
request_id
を取得する - AmazonESのエンドポイントを指定したCount APIのURLの検索引数に
request_id
を付与してアクセスする - レスポンスJSONをパースし、指定した
request_id
に該当するデータが何件登録されているかを調べる - 1件以外であれば、エラーとしてエラーファイルに出力する
- 以降リストファイルが終了するまでループ
以下のような環境で動作することを確認しています。
動作環境
- OS: Mac OSX 10.10.5 及び Amazon Linux AMI release 2016.03
- Python: Python 3.4.3
コード
#!/usr/bin/python # coding: UTF-8 from __future__ import print_function # Python 2/3 compatibility import json import sys import time import requests argvs = sys.argv argc = len(argvs) # 引数でテキストファイルを指定 if (argc != 2): print('Input text file') quit() # 引数で指定されたrequestIdリストファイル readFile = open(argvs[1], 'r') # エラー書き込み用ファイル file = open('error_requestId.txt', 'w') for i, requestId in enumerate(readFile): r = requests.get( 'http:///modify-log/_count?q=request_id:%s' % requestId) print(r.text) print('now processing %d' % i) data = json.loads(r.text) if (int(data['count']) != 1): file.write("%s\t%d\n" % (requestId, int(data['count']))) time.sleep(0.2) file.close()
外部ライブラリとして、URLへアクセスするために Requests を利用しています。
何件目の処理をしているのかを知りたかったため、 enumerate
で実行カウントを取れるようにしています。
for i, requestId in enumerate(readFile):
さらに特に不要かと思ったのですが、念の為sleepを0.2秒付与しています。これにより、およそ38000件のデータは 38000 * (1/5) = 7600 sec = 126 min
2時間程度で全ての確認が終わる計算です。
実行結果について
上記スクリプトを実行すると以下の様なコンソール表示が出力されます。
$ python elastic_search.py exists2.txt {"count":1,"_shards":{"total":5,"successful":5,"failed":0}} now processing 0 {"count":1,"_shards":{"total":5,"successful":5,"failed":0}} now processing 1 {"count":1,"_shards":{"total":5,"successful":5,"failed":0}} now processing 2 {"count":1,"_shards":{"total":5,"successful":5,"failed":0}} now processing 3 {"count":1,"_shards":{"total":5,"successful":5,"failed":0}} now processing 4
あとは終わるのをじーっと待つのみです。 count : 1
以外のものはエラーファイルとして出力されます。
エラーファイルについて
エラーが発生した requestId
についてはデータの件数とともに格納しました。
"4d937ff9-4ab4-4ff2-a5d4-0b29cb518ba0" 0
最終的に、このファイルに出力されるものが存在しなければ、全てのデータは正常に1件ずつ登録されているということが確認できます。
まとめ
ElasticsearchをKibana以外で初めて利用しましたが、APIが豊富でとても便利な印象です。ただ、データの構造(特に _id 体系)をうまく決めてやらないと同じデータが複数登録されてしまうなど、なかなか経験や事前の設計がモノを言うサービスだなーと感じました。
またAWSの操作にかぎらず、こういった大量データを確認する単純作業などはLL言語を一つ覚えておくと良いですね。自分は今後Python中心に実装してみたいと思います。今更ですが、Pythonはなかなか直感的に書けて楽しいです。