DynamoDBでElasticsearch Service, CloudSearch, SDK Scan に頼らず検索機能を実現する方法

2018.05.05

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

今回技術ブログ初投稿になります。サーバーレス開発部の新井です。 サーバーレス開発にまつわる技術ネタを投稿していきますので、よろしく願いします。

今回やりたいこと

タイトルにもある通り、 DynamoDBでElasticsearch Searvice, CloudSearch, SDK Scan に頼らず検索機能を実現するちょっとしたテクニックを紹介したいと思います。 何をするかというと、転置インデックスというテーブルを作成してDynamoDBで部分検索、あいまい検索をする方法を紹介していきます。

背景

前職でサーバーレス開発をしている際に、DynamoDBの各項目に対して検索を行いたいという要件がありました。 実際には、次のような5000レコード程度の社員情報テーブルをDynamoDBで管理しており、名前やメールアドレスの一部などハッシュキー(ID)以外の項目で社員を検索したいという場合です。

ID 名前 ふりがな 性別 メールアドレス 電話番号
01 新井 成一 あらい せいいち 03-0303-0303 seiichi@example.jp

普通に考えると、CloudSearchやElasticsearch Serviceなどが候補としてあげられるかと思いますが、当時は利用料にピンを止めて開発していたので、時間課金のインスタンスを利用すると高くなってしまうし、導入する程の規模感ではなかったので却下しました。

それならば、DynamoDB SDKのScanを利用する方法もありましたが、こちらはテーブルに対してフルスキャンを行うため、DynamoDBのRCU(Read Capacity Unit)を大量に消費するので、これも却下となりました。

そこで一旦は、すべてのカラム対してGSI(Global Secondary Indexe)を作成するという方法を採用しました。この方法では、検索エンジンやScanを使う方法と比べて安く実装はできましたが、一方で部分検索やあいまい検索ができないという欠点がありました。また、DynamoDBの特性といてインデックスを作成した数だけRCU/WCUが発生するので、安いとはいえあまり効率的な使い方ではありませんでした。

そんな中、KVSでSQLライクな部分検索やあいまい検索をするため転置インデックスなるものを作成する方法があるとのことでしたので、こちらを試してみました。以下に、それぞれの手法の簡単なメリットデメリットをまとめてみます。

費用 検索速度 部分検索 あいまい検索 備考
CloudSearch 導入する程の規模、要件ではない
ElasticSearch 導入する程の規模、要件ではない
SDK Scan 1度のScanでRCUを大量消費
GSI テーブル数分のRCU/WCUが必要
転置インデックス テーブルサイズに対する課金がメインになるため安い

実装方法

今回は転置インデックスに関する説明は省きます。概念を説明するよりも、具体的な実装方法を紹介したほうが早いからです。 というわけで、以下のアーキテクチャ図にしたがって、Lambda関数とDynamoDBのテーブルを作成していきます。

DynamoDBのテーブル作成

AWSのマネージメントコンソールから以下のテーブルを作成していきます。 設定はすべてデフォルトにしています。

  1. 社員情報テーブル (Partition key: id, Sort key: なし)
  2. 転置インデックステーブル (Partition key: key, Sort key: employeeId)
  3. 転置インデックステーブルのインデックス (Partition key: employeeId, Sort key: なし)

社員情報テーブルの作成

転置インデックステーブルの作成

転置インデックステーブルのインデックス作成

Lambda関数の作成

AWSのマネージメントコンソールから以下のLambda関数を作成していきます。 言語はPython3.6を使用します。

  1. 転置インデックス作成用のLambda関数
  2. 検索用のLambda関数

転置インデックス作成用のLambda関数の作成

このLambda関数では、以下の処理を行っています。

  1. 決められたルールに従ってキーワードを分割
  2. 分割したキーを正規化
  3. 正規化したキーを転置インデックスとしてそれぞれ転置インデックステーブルに保存

この処理を、社員情報が保存されるたびに走らせることで、各社員情報に対応した転置インデックスが生成されます。

Lambda関数へのトリガーには、社員情報テーブル(employee)のDynamoDB Streamsを指定します。 設定値はStarting PositionのTrim horizonを指定する以外は、デフォルトにします。

import boto3
import json
import re
from boto3.dynamodb.types import TypeDeserializer
from boto3.dynamodb.conditions import Key

dynamodb = boto3.resource('dynamodb')

deser = TypeDeserializer()

def lambda_handler(event, context):
    print(event)    

    for record in event['Records']:
        print('eventName: ', record['eventName'])
        new = record['dynamodb'].get('NewImage')
        newImg = {}
        if new:
            for key in new:
                newImg[key] = deser.deserialize(new[key])
                
        if (record['eventName'] == 'INSERT'):
            insert_new_item(newImg)
            
        elif (record['eventName'] == 'REMOVE'):
            remove_item(record['dynamodb']['Keys']['id']['S'])
            
        elif (record['eventName'] == 'MODIFY'):
            remove_item(record['dynamodb']['Keys']['id']['S'])
            insert_new_item(newImg)
    return 'Done'
        
def insert_new_item(newImg):
    newRecordList = []
    split_key(newRecordList, newImg)
    normalize_key(newRecordList)

    table = dynamodb.Table('invertedIndex');
    for item in newRecordList:
        table.put_item(
                Item = {'key': item, 'employeeId': newImg['id']}
        )
        
def remove_item(id):
    table = dynamodb.Table('invertedIndex')
    res = table.query(
        IndexName='employeeId-index',
        KeyConditionExpression=Key('employeeId').eq(id)
    )
    print('res: ', res)
    for item in res['Items']:
        print('item: ', item)
        res = table.delete_item(
            Key = {
                'key': item['key'],
                'employeeId': item['employeeId']
            }
        )


def split_key(newRecordList, newImg):
    for newItem in newImg:
        list = re.split('[-@ ]', newImg[newItem])
        for key in list:
            newRecordList.append(key)

def normalize_key(newRecordList):
    print('hoge')
    for s in newRecordList:
        if (re.match('斎|齊|齋', s)):
            newRecordList.append(re.sub('斎|齊|齋', '斉', s))
        if (re.match('shi', s)):
            newRecordList.append(re.sub('shi', 'si', s))
        if (re.match('chi', s)):
            newRecordList.append(re.sub('chi', 'ti', s))

検索用のLambda関数の作成

このLambda関数では、以下の処理を行っています。

  1. 任意の検索ワードをハッシュキーとして転置インデックステーブルからデータ取得
  2. 得られたidをハッシュキーとして社員情報テーブルから社員情報を取得

後にデータ登録する「斎藤 茂」という名前に対して、部分検索とあいまい検索が出来ることを検証するために、今回は検索ワードに「斉藤」を代入しています。

import boto3
dynamodb = boto3.resource('dynamodb')
from boto3.dynamodb.conditions import Key, Attr

def lambda_handler(event, context):
    query = '斉藤'
    result = []
    
    invertedTable = dynamodb.Table('invertedIndex')
    invertedRes = invertedTable.query(
        KeyConditionExpression=Key('key').eq(query)
    )

    for item in invertedRes['Items']:
        employeeTable = dynamodb.Table('employee')
        res = employeeTable.query(
            KeyConditionExpression=Key('id').eq(item['employeeId'])
        )
        result.extend(res['Items'])

    return {'query': query, 'result': result}

検証

先ほどDynamodbの社員情報テーブルに実際のデータを登録して、転置インデックステーブルが非同期で更新されていること、検索が行えることの2点を確認していきます。

データ登録

先ほど作成した社員情報テーブルへデータを追加していきます。 試しに、次のJSONデータをテーブルに登録して確認してみます。 サンプルデータはこちらを参考にさせてもらいました。

{
  "id": "001",
  "name": "斎藤 茂",
  "ruby": "サイトウ シゲル",
  "sex": "男",
  "mail": "shigeru@example.ne.jp",
  "phone": "03-3461-0373",
  "birthday": "19780223"
}

転置インデックステーブル

マネージメントコンソールからテーブルに情報を追加するごとに、Dynamo Streamsをトリガーとして起動するLambdaが非同期で転置インデックステーブルが更新されているのが確認できます。

検索実行

先ほど作成した検索用のLambda関数から指定したキーワードで検索できていることを確認します。 部分検索やあいまい検索ができることがわかります。 つまり検索ワードをハッシュキーに持たせ、元のテーブルのidをソートキーに持たせれば、逆引きで検索できるという仕組みです。

考察

転置インデックスの有効性を、費用、検索速度、検索精度の3つの視点から見ていきたいと思います。

費用

転置インデックスによる検索を実装するにあたり以下の追加費用が発生します。

  • 転置インデックス作成用のLambda関数の実行時間および実行回数に応じた課金
  • 転置インデックステーブルのRCU/WCUおよびディスク容量に応じた課金

Lambda関数の料金に関しては、仮に1秒間に一度テーブルへの更新があり、実行時間が1秒だとしても、月額5.39USD(=0.000000208×10×60×60×24×30)とほぼ無視してよい金額になります。

転置インデックス用に新たに追加されたDynamoDBのテーブルに関しては、RCU/WCUが主な課金となります。Queryによるデータ取得を行っているので、Scanを使う場合と比べ1回分のRCUの消費を大きく抑えられます。また、オートスケーリングなどを使ってうまくチューニングすることができればかなり低コストに抑えることができます。テーブルが消費するディスク容量に対する課金は、分解して元テーブルのデータを挿入しているだけなので、ディスク容量はほぼ変わらず無視してよい金額です。

※2018年4月時点での東京リージョンの料金をベースに試算しています。

検索速度

検索では、まず検索ワードをハッシュキーとして転置インデックステーブルからデータ取得し、得られたidをハッシュキーとして社員情報テーブルから社員情報を取得します。Scanを使う場合と比べ、2つのテーブルへのリクエストが発生しますが、Queryによるデータ取得を行っているので、1回あたりの検索速度は抑えられます。検索用のLambda関数の実行時間はホットスタンバイで、50~80ms程です。

検索性能

今回は、区切り文字(空白、ハイフン、カンマ)で分割したものと、「斉藤」と「斎藤」の両方をテーブルのキーに持たせることで、部分検索およびあいまい検索もできるように実装しました。不格好ではありますが、規模と要件によってはこれくらいで事足りるケースも結構あると思います。もっと柔軟に検索したい場合は、追加で区切り文字を増やしたり、形態素解析ツールやシノニム(同義語)を利用するなどの方法があります。あくまで、規模と要件に合わせて検索機能をカスタマイズすることが重要になります。

まとめ

今回はDynamoDBでSQLライクな部分検索やあいまい検索をするためのテクニックを紹介しました。泥臭いやり方ではありますが、コスト面でのメリットは大きいので規模と要件によっては使えるテクニックかと思います。