IAM認証付きAmazon Elasticsearch Serviceをelasticsearch-pyで操作する

Elasticsearch Service

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

概要

Elasticsearchの操作はREST APIが提供されており、ツールレベルのプログラムの場合、容易に組み込むことができます。AWS Lambdaにより実装する場合もPythonの標準パッケージで簡単に実装することができます。

ただし、アプリケーション開発となると、度重なるREST APIの処理実装、ネットワーク通信エラーやAPI例外処理を独自実装することは大きなコストを要します。そこでElasticsearchは各種言語のSDK(Client)をElastic社が提供しており、利用することができます。出来ることならSDK・Clientを利用して、エラー処理や例外処理などの実装を減らしたいですよね?

現在、提供されているSDKは以下のようなものがあります。

一方で、Amazon Elasticsearch ServiceというAWSマネージドサービスのElasticsearchを利用可能なサービスがあります。Amazon ESの特徴の一つとして、IAM Policyによる権限制御ができ、認証・認可された相手のみがElasticsearchを利用することができ、セキュリティを高めることができます。
IAM PolicyはAWSが定める署名プロセスを踏んだWeb Requestが認証され、アクセス可否を決定します。

ElasticsearchのSDKは当たり前ですが、IAMの認証を考慮していません。そのため、Amazon Elasticsearch ServiceをSDKにより操作したい場合、IPアドレスによるアクセス制御ぐらいしかできません。AWS Lambdaから操作する場合、IPアドレスを固定できないため、不特定多数に向けて公開する必要があり、SDKの利用を敬遠したくなります。

そこで今回はAmazon Elasticsearch ServiceのIAM認証を有効とした上で、ElasticsearchのSDKを利用したいという欲張りな方に向けて、IAM認証ありのAmazon ESをelasticsearcy-pyの操作メソッドをそのまま利用して操作する方法をご紹介します。

elasticsearch-py

elasticsearch-pyはElastic社が提供するPython向けOfficial low-level client for Elasticsearchです。PythonでElasticsearchを操作する場合にurllib2、urllib3、requestsなどのHTTP Requestパッケージを使わずとも直感的なプログラミングでElasticsearchを操作することが可能です。

elasticsearch-pyは各機能をコンポーネントに分離しており、コンポーネントレベルで実装クラスを変更することが可能です。疎結合の教科書のようなソースコードでした。ソースコードはシンプルですごく読みやすかったです。こんなソースコードを書ける人間になりたかったです。

と、話がそれました。

AWS ElasticSearch service #280

ハンズラボさんのブログ「AWS ElasticSearch Service の認証にIAM Roleを使う [Python編]」で紹介されていました。elasticsearch-pyのissueで実装方法を紹介されていました。これを使えば、簡単に実装できます!嬉しいです。

ただ気になったことが、、、これって、Session Token対応していないのでは??

# python

>>> from elasticsearch import Elasticsearch, RequestsHttpConnection
>>> from requests_aws4auth import AWS4Auth
>>> host="search-*************.ap-northeast-1.es.amazonaws.com"
>>> awsauth = AWS4Auth("**********","**************","ap-northeast-1","es",session_token="*************")
>>> es = Elasticsearch(
...     hosts=[{'host': host, 'port': 443}],
...     http_auth=awsauth,
...     use_ssl=True,
...     verify_certs=True,
...     connection_class=RequestsHttpConnection
... )
>>> print(es.info())
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "elasticsearch/client/utils.py", line 69, in _wrapped
    return func(*args, params=params, **kwargs)
  File "elasticsearch/client/__init__.py", line 201, in info
    _, data = self.transport.perform_request('GET', '/', params=params)
  File "elasticsearch/transport.py", line 329, in perform_request
    status, headers, data = connection.perform_request(method, url, params, body, ignore=ignore, timeout=timeout)
  File "elasticsearch/connection/http_requests.py", line 79, in perform_request
    self._raise_error(response.status_code, raw_data)
  File "elasticsearch/connection/base.py", line 105, in _raise_error
    raise HTTP_EXCEPTIONS.get(status_code, TransportError)(status_code, error_message, additional_info)
elasticsearch.exceptions.AuthorizationException: TransportError(403, u'{"message":"The security token included in the request is invalid."}')

・・・・・

おぅ、Lambdaで実装する時、どうしよう。。。Lambda Functionにアクセスキーは埋め込みたくない。

署名プロセスv4で生成される「X-Amz-Security-Token」ヘッダが渡されていないことが原因なのかな。。。

ちなみに日本語の利用もできなかったです。こっちの方が問題か。(もしくは私の使い方が悪いのかな。。)

elasticsearch-pyのソースコードを読むと、ConnectionのWrapperクラスを実装することでできそうだったので実装しました。

AwsRequestsHttpConnectionクラス作成

elasticsearch-pyはHTTP Requestを担う共通クラスとして、標準ではurllib3で実装したUrllib3HttpConnectionクラスを利用します。今回はurllib3とは別に用意されているrequestsで実装したRequestsHttpConnectionクラスを継承し、[小ネタ] botocoreのAWS APIリクエストの署名プロセスのみを利用するで紹介したbotocoreの署名プロセスを追加したクラスを作成するだけです。

ソースコードをGistにアップロードしました。
基本的に処理の追加だけで継承元クラスのメソッドをそのまま呼び出しているので、本家ソースコードで変更があってもほとんどのケースで影響を受けず、与えません。

AWS Lambda限定用途で実装しようと思ったけど、実装している内に楽しくなってきて汎用的に作ってしまって、コードが膨らんでしまいました。
利用方法は3パターンです。優先度は上から順です。

  • 引数でクレデンシャル情報を指定する。クレデンシャル情報を指定した場合、指定したキーを利用して署名プロセスを実施します。こちらの指定方法は2パターンに分かれます。
    • アクセスキー、シークレットキーを渡す。この利用方法はAWS ElasticSearch service #280でも利用可能です。
    • アクセスキー、シークレットキー、セッショントークンを渡す。こちらはあまりユースケースとしてはないでしょう。あくまでオプションレベルです。
  • 環境変数でクレデンシャル情報を指定します(AWS Lambdaによる実行を想定)
    • 環境変数のAWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEYAWS_SESSION_TOKENから取得した値をクレデンシャル情報とします。
  • メタデータから取得できる場合、こちらからクレデンシャル情報を取得します(IAM Role付きEC2インスタンスによる実行を想定)
    • http://169.254.169.254/latest/meta-data/iam/security-credentials/<role-name>から取得した値をクレデンシャル情報とします。

ソースコードの説明は最後にします。興味がある方はご覧ください。

動作確認

まず必要なパッケージをインストールします。

# pip install elasticsearch requests botocore
Collecting elasticsearch
  Using cached elasticsearch-2.2.0-py2.py3-none-any.whl
Collecting requests
  Using cached requests-2.9.1-py2.py3-none-any.whl
Collecting botocore
  Using cached botocore-1.3.23-py2.py3-none-any.whl
Collecting urllib3<2.0,>=1.8 (from elasticsearch)
  Using cached urllib3-1.14-py2.py3-none-any.whl
Collecting jmespath<1.0.0,>=0.7.1 (from botocore)
  Using cached jmespath-0.9.0-py2.py3-none-any.whl
Collecting python-dateutil<3.0.0,>=2.1 (from botocore)
  Using cached python_dateutil-2.4.2-py2.py3-none-any.whl
Collecting docutils>=0.10 (from botocore)
Collecting six>=1.5 (from python-dateutil<3.0.0,>=2.1->botocore)
  Using cached six-1.10.0-py2.py3-none-any.whl
Installing collected packages: urllib3, elasticsearch, requests, jmespath, six, python-dateutil, docutils, botocore
Successfully installed botocore-1.3.23 docutils-0.12 elasticsearch-2.2.0 jmespath-0.9.0 python-dateutil-2.4.2 requests-2.9.1 six-1.10.0 urllib3-1.14

デフォルトelasticsearch-pyによるアクセス

まずは標準のelasticsearch-pyでIAM認証付きのAmazon Elasticsearch Serviceへアクセスしてみます。
Elasticsearchクラスをインスタンス化する際は一つ目の引数にホスト名(もしくはIPアドレス)を指定します。
またデフォルトのアクセスポート番号が9200ですので、Amazon Elasticsearch Serviceがリッスンするport番号443を指定します。

# python
>>> from elasticsearch import Elasticsearch
>>> host="search-***********.ap-northeast-1.es.amazonaws.com"
>>> es = Elasticsearch([{"host":host,"port":443}],use_ssl=True,verify_certs=True)
>>> es.info()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "elasticsearch/client/utils.py", line 69, in _wrapped
    return func(*args, params=params, **kwargs)
  File "elasticsearch/client/__init__.py", line 201, in info
    _, data = self.transport.perform_request('GET', '/', params=params)
  File "elasticsearch/transport.py", line 329, in perform_request
    status, headers, data = connection.perform_request(method, url, params, body, ignore=ignore, timeout=timeout)
  File "elasticsearch/connection/http_urllib3.py", line 106, in perform_request
    self._raise_error(response.status, raw_data)
  File "elasticsearch/connection/base.py", line 105, in _raise_error
    raise HTTP_EXCEPTIONS.get(status_code, TransportError)(status_code, error_message, additional_info)
elasticsearch.exceptions.AuthorizationException: TransportError(403, u'{"Message":"User: anonymous is not authorized to perform: es:ESHttpGet on resource: arn:aws:es:ap-northeast-1:********:domain/*******/"}')
>>>

ユーザー認証できず、エラーとなりました。

AwsRequestsHttpConnectionクラスを利用したアクセス(Session Tokenなし)

続いて、実装したAwsRequestsHttpConnectionクラスを利用します。
上で作成したpythonファイルをaws_http_requests.pyという名前で保存します。
Elasticsearchクラスをインスタンス化する際に上記に加えて、connection_classに作成したAwsRequestsHttpConnectionクラス、およびIAM認証に必要なAWSのクレデンシャル情報を引数で渡します。
まずはSession Tokenなしのアクセスキーで実施します。

>>> from aws_http_requests import AwsRequestsHttpConnection
>>> from elasticsearch import Elasticsearch
>>> host="search-***********.ap-northeast-1.es.amazonaws.com"
>>> es = Elasticsearch(
...     hosts=[{"host":host, "port":443}],
...     connection_class=AwsRequestsHttpConnection,
...     aws_access_key_id="********",
...     aws_secret_access_key="***********",
...     region="ap-northeast-1"
... )
>>> es.info()
{u'status': 200, u'cluster_name': u'**********:*******', u'version': {u'lucene_version': u'4.10.4', u'build_hash': u'62ff9868b4c8a0c45860bebb259e21980778ab1c', u'number': u'1.5.2', u'build_timestamp': u'2015-04-27T09:21:06Z', u'build_snapshot': False}, u'name': u'Mosha', u'tagline': u'You Know, for Search'}

IAMによるアクセス制限を通過し、Amazon Elasticsearch ServiceからResponseが返ってきました。

続けて、簡単にいくつかのAPIを確認してみます。

検索
>>> es.search()
{u'_shards': {u'failed': 0, u'successful': 5, u'total': 5},
 u'hits': {u'hits': [], u'max_score': None, u'total': 0},
 u'timed_out': False,
 u'took': 4}

何もデータはありませんが、Responseが返ってきました。

データ登録
>>> es.index("index", "type", '{"name":"Shinji Fujimoto"}')
Out[6]:
{u'_id': u'AVKdYK-uM6TKBnvQUSyv',
 u'_index': u'index',
 u'_type': u'type',
 u'_version': 1,
 u'created': True}

>>> es.search()
{u'_shards': {u'failed': 0, u'successful': 5, u'total': 5},
 u'hits': {u'hits': [{u'_id': u'AVKdYK-uM6TKBnvQUSyv',
    u'_index': u'index',
    u'_score': 1.0,
    u'_source': {u'name': u'Shinji Fujimoto'},
    u'_type': u'type'}],
  u'max_score': 1.0,
  u'total': 1},
 u'timed_out': False,
 u'took': 3}

データを入れて、検索で入っているのが確認できます。

といった感じです。

AwsRequestsHttpConnectionクラスを利用したアクセス(Session Tokenあり)

続いて、Session Tokenも利用した操作です。

>>> from aws_http_requests import AwsRequestsHttpConnection
>>> from elasticsearch import Elasticsearch
>>> host="search-**************.ap-northeast-1.es.amazonaws.com"
>>> es = Elasticsearch(
...     hosts=[{"host":host, "port":443}],
...     connection_class=AwsRequestsHttpConnection,
...     aws_access_key_id="***************",
...     aws_secret_access_key="*******************",
...     session_token="***************************",
...     region="ap-northeast-1"
... )
>>> es.info()
{u'status': 200, u'cluster_name': u'*********:*******', u'version': {u'lucene_version': u'4.10.4', u'build_hash': u'62ff9868b4c8a0c45860bebb259e21980778ab1c', u'number': u'1.5.2', u'build_timestamp': u'2015-04-27T09:21:06Z', u'build_snapshot': False}, u'name': u'Mosha', u'tagline': u'You Know, for Search'}

アクセスできていますね。

AWS Lambdaによるアクセス

AWS Lambdaでも実行してみましょう。

以下のようなLambda Functionを用意します。

# cat lambda_function.py
from elasticsearch import Elasticsearch
from aws_http_requests import AwsRequestsHttpConnection

host = "search-****************.ap-northeast-1.es.amazonaws.com"

def lambda_handler(event, context):
    es = Elasticsearch(
        hosts=[{"host":host,"port":443}],
        use_ssl=True,
        verify_certs=True,
        connection_class=AwsRequestsHttpConnection
    )
    print es.info()

elasticsearch、requestsパッケージと一緒にアップロードし、実行します。

CloudWatch Logsを確認すると、

CloudWatch_Management_Console

アクセスできていますね。

まとめ

いかがでしたでしょうか?
Amazon Elasticsearch Serviceを利用する上でSDKが利用できず、敬遠されていた方もいらっしゃるのではないでしょうか?今回はPython ClintにWrapperクラスを一枚実装するだけでAWS認証プロセスを実現できました。

ソースコード説明

メソッド単位に区切って説明します。

コンストラクタ

まずはコンストラクタです。コンストラクタの引数にAWSクレデンシャル情報のアクセスキー、シークレットキー、リージョン名、セッショントークンを追加しました。AWS Lambda、IAM Role付きEC2インスタンスの場合、リクエスト毎にクレデンシャル情報を取得するため、これらの引数はオプションとしています。

    def __init__(self, access_key=None, secret_key=None, region=None, token=None, **kwargs):
        super(AwsRequestsHttpConnection, self).__init__(**kwargs)

        self.base_url = ":".join(self.base_url.split(":")[:-1]) ①
        self.access_key = access_key
        self.secret_key = secret_key
        self.token = token
        self.region = self.get_region(region)
HTTP Requestメソッド

続いて、HTTP Requestメソッドです。elasticsearch-pyのHTTP Request処理は全てperform_requestを利用します。HTTP Request処理の前にbotocoreの署名プロセスにより、必要なHeaderを生成します。生成されたHeaderをHTTP RequestのHeaderにセットします。セキュリティトークンを利用する場合、「X-Amz-Security-Token」が必要となりますので、こちらを利用します。

    def perform_request(self, method, url, params=None, body=None, timeout=None, ignore=()):
        aws_url = self.base_url + url
        if params:
            aws_url = '%s?%s' % (aws_url, urlencode(params or {}))
        if body is not None:
            body = body.encode("utf-8")

        credentials = self.get_credentials(self.access_key, self.secret_key, self.token)
        request = AWSRequest(method=method, url=aws_url, data=body)
        SigV4Auth(credentials, "es", self.region).add_auth(request)
        self.session.headers["X-Amz-Date"] = request.headers.get("X-Amz-Date")
        self.session.headers["Authorization"] = request.headers.get("Authorization")
        if request.headers.get("X-Amz-Security-Token"):
            self.session.headers["X-Amz-Security-Token"] = request.headers.get("X-Amz-Security-Token")

        return super(AwsRequestsHttpConnection, self).perform_request(method, url, params, body, timeout, ignore)
Credential取得メソッド

Credential取得は3パターンです。

  • 引数でアクセスキー、シークレットキー、(トークンはオプション)を渡した場合、こちらを優先します。
  • 環境変数にセットしている場合、こちらからクレデンシャル情報を取得します(AWS Lambdaによる実行を想定)
  • メタデータから取得できる場合、こちらからクレデンシャル情報を取得します(IAM Role付きEC2インスタンスによる実行を想定)
    def get_credentials(self, access_key=None, secret_key=None, token=None):
        if access_key is None and os.environ.get("AWS_ACCESS_KEY_ID"):
            access_key = os.environ["AWS_ACCESS_KEY_ID"]
            secret_key = os.environ["AWS_SECRET_ACCESS_KEY"]
            token = os.environ["AWS_SESSION_TOKEN"]
        elif access_key is None and self.is_ec2instance():
            c = InstanceMetadataFetcher().retrieve_iam_role_credentials()
            access_key = c["access_key"]
            secret_key = c["secret_key"]
            token = c["token"]
        return Credentials(access_key, secret_key, token)
リージョン取得メソッド

リージョン取得も3パターンです。

  • 引数でリージョンを渡した場合、こちらを優先します。
  • 環境変数にセットしている場合、こちらから取得します(AWS Lambdaによる実行を想定)
  • メタデータから取得できる場合、こちらから取得します(IAM Role付きEC2インスタンスによる実行を想定)
    def get_region(self, region=None):
        if region is None and os.environ.get("AWS_REGION"):
            region = os.environ["AWS_REGION"]
        elif region is None and self.is_ec2instance():
            region = requests.get(META_URL + "latest/meta-data/placement/availability-zone").text[:-1]
        return region