[小ネタ][Python] AmazonESでデータの存在確認をする

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

小室@札幌です。札幌はすでに夜は15℃以下になる日も珍しくなくなりました。秋が高速で通り過ぎていきます!!もうちょっとゆっくりしていってくれても良いんですが。

はじめに

AmazonES(Amazon Elasticsearch Service)で膨大な数のデータの存在チェックをする必要があったので、それを確認するためのスクリプトを記述しました。

Elasticsearchには豊富なAPIが揃っているので、それを利用するだけなのですが、何にせよ対象が数万件と膨大でしたので、今回はPythonで存在が確認できなかったデータのみを抽出するスクリプトを作成しました。

ElasticsearchのAPI

存在を確認するexistsというAPIがあったのですが、以下の理由から見送りました。

今回は上記の理由から Elasticsearch Reference [2.4] » Search APIs » Count APIを利用しました。

データの構造

Elasticsearchのデータ構造はこんな感じになっています。基本的にサーバーアプリケーションの変更ログで出力されたJsonを格納しています。 _source 以下の内容がサーバー側から送信されてくるJson本体になります。

{
  "_index": "modify-log",
  "_type": "modify-log",
  "_id": "e04fc495-537c-4060-a1f0-257150c20035",
  "_score": null,
  "_source": {
    "timestamp": "2016-07-25T23:59:49.245+0900",
    "request_id": "e04fc495-537c-4060-a1f0-257150c20035",
    "client_id": "xxxxc",
    "hash_id": "HAAAAAAAAASH_IDDDDDDDDDDD",
    "event_type": "EventName",
    "details": {
      "username": "Deleted"
    },
    "host": "ip-12-123-456-999"
  },
  "fields": {
    "_ttl": 63535207406,
    "timestamp": [
      1469458789245
    ]
  },
  "sort": [
    1469458789245
  ]
}

request_id が必ず一意になるように生成されているため、ESの _id もこちらの値を適用し、データが一意で登録できるようにしています。

Pythonスクリプト

処理内容

  1. 判定対象の request_id のリストをテキストファイルを準備しておく
  2. ファイルを読み込み、順番に request_id を取得する
  3. AmazonESのエンドポイントを指定したCount APIのURLの検索引数に request_id を付与してアクセスする
  4. レスポンスJSONをパースし、指定した request_id に該当するデータが何件登録されているかを調べる
  5. 1件以外であれば、エラーとしてエラーファイルに出力する
  6. 以降リストファイルが終了するまでループ

以下のような環境で動作することを確認しています。

動作環境

  • OS: Mac OSX 10.10.5 及び Amazon Linux AMI release 2016.03
  • Python: Python 3.4.3

コード

#!/usr/bin/python
# coding: UTF-8

from __future__ import print_function  # Python 2/3 compatibility

import json
import sys
import time

import requests

argvs = sys.argv
argc = len(argvs)

# 引数でテキストファイルを指定
if (argc != 2):
    print('Input text file')
    quit()

# 引数で指定されたrequestIdリストファイル
readFile = open(argvs[1], 'r')

# エラー書き込み用ファイル
file = open('error_requestId.txt', 'w')

for i, requestId in enumerate(readFile):

    r = requests.get(
        'http://<elasticsearch endpoint>/modify-log/_count?q=request_id:%s' % requestId)

    print(r.text)
    print('now processing %d' % i)
    data = json.loads(r.text)
    if (int(data['count']) != 1):
        file.write("%s\t%d\n" % (requestId, int(data['count'])))
    time.sleep(0.2)
file.close()

外部ライブラリとして、URLへアクセスするために Requests を利用しています。

何件目の処理をしているのかを知りたかったため、 enumerate で実行カウントを取れるようにしています。

for i, requestId in enumerate(readFile):

さらに特に不要かと思ったのですが、念の為sleepを0.2秒付与しています。これにより、およそ38000件のデータは 38000 * (1/5) = 7600 sec = 126 min

2時間程度で全ての確認が終わる計算です。

実行結果について

上記スクリプトを実行すると以下の様なコンソール表示が出力されます。

$ python elastic_search.py exists2.txt
{"count":1,"_shards":{"total":5,"successful":5,"failed":0}}
now processing 0
{"count":1,"_shards":{"total":5,"successful":5,"failed":0}}
now processing 1
{"count":1,"_shards":{"total":5,"successful":5,"failed":0}}
now processing 2
{"count":1,"_shards":{"total":5,"successful":5,"failed":0}}
now processing 3
{"count":1,"_shards":{"total":5,"successful":5,"failed":0}}
now processing 4

あとは終わるのをじーっと待つのみです。 count : 1 以外のものはエラーファイルとして出力されます。

エラーファイルについて

エラーが発生した requestId についてはデータの件数とともに格納しました。

"4d937ff9-4ab4-4ff2-a5d4-0b29cb518ba0"        0

最終的に、このファイルに出力されるものが存在しなければ、全てのデータは正常に1件ずつ登録されているということが確認できます。

まとめ

ElasticsearchをKibana以外で初めて利用しましたが、APIが豊富でとても便利な印象です。ただ、データの構造(特に _id 体系)をうまく決めてやらないと同じデータが複数登録されてしまうなど、なかなか経験や事前の設計がモノを言うサービスだなーと感じました。

またAWSの操作にかぎらず、こういった大量データを確認する単純作業などはLL言語を一つ覚えておくと良いですね。自分は今後Python中心に実装してみたいと思います。今更ですが、Pythonはなかなか直感的に書けて楽しいです。

参照