この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
はじめに
こんにちは、中山です。
今回DynamoDBへの検索をElastiCache Redisを使ってキャッシュさせてみました。
なぜキャッシュさせるのか
DynamoDBの料金はプロビジョンドスループットの容量に大きく依存しています。一秒間にどの程度のデータを読み書きするのか事前に指定する値です。処理する必要のあるデータ量に応じてRCU(ReadCapacityUnits)とWCU(WriteCapacityUnits)をそれぞれ調整する必要があります。つまり、大量のデータの読み書きをするには、それだけ徳を積む必要があるということになります。
しかし、RCU/WCUの値をなるべく上げないようにかつコストを下げる方法はいくつかあります。それが、今回ご紹介するElastiCacheにキャッシュさせる方法です。DynamoDBから読み込んだデータをElastiCacheにキャッシュさせることで、次回以降はDynamoDBからの読み込み処理をスキップすることができます。結果として、RCUを低く抑えることができるという訳です。
今回は動作確認の検証環境を作ってみました。普通にEC2で構築してもよかったのですが、LambdaのVPC機能を使っています。私はLambdaを使いたかった。
コード
GitHubに上げておきました。いつものようにApexとTerraformで作っています。ご自由にお使いください。
ディレクトリ構造
以下の通りです。
cache-dynamodb-with-elasticache-redis/
├── README.md
├── functions
│ └── cache_dynamodb_with_elasticache_redis
│ └── main.py
├── infrastructure
│ ├── dev
│ │ ├── main.tf
│ │ ├── outputs.tf
│ │ └── variables.tf
│ └── modules
│ ├── cloudwatch
│ │ ├── cloudwatch.tf
│ │ └── variables.tf
│ ├── dynamodb
│ │ ├── dynamodb.tf
│ │ ├── outputs.tf
│ │ └── variables.tf
│ ├── elasticache
│ │ ├── elasticache.tf
│ │ ├── outputs.tf
│ │ └── variables.tf
│ ├── iam
│ │ ├── iam.tf
│ │ ├── outputs.tf
│ │ ├── policy
│ │ │ └── lambda_assume_role_policy.json
│ │ └── variables.tf
│ ├── network
│ │ ├── network.tf
│ │ ├── outputs.tf
│ │ └── variables.tf
│ └── security_group
│ ├── outputs.tf
│ ├── security_group.tf
│ └── variables.tf
├── project.json
└── requirements.txt
コードの解説
project.json
Lambda関数の定義ファイルです。
{
"name": "cache-dynamodb-with-elasticache-redis",
"description": "cache dynamodb with elasticache redis",
"nameTemplate": "{{.Function.Name}}",
"memory": 128,
"timeout": 5,
"runtime": "python",
"defaultEnvironment": "dev",
"vpc": {
"securityGroups": ["<sg-id>"],
"subnets": ["<subnet-id>"]
},
"environment": {
"ElastiCacheEndpoint": "<elasticache_endpoint>",
"DynamoDBTableName": "cache_dynamodb_with_elasticache_redis"
},
"hooks": {
"deploy": "[[ -d redis ]] || pip install -r ../../requirements.txt -t ./"
}
}
vpc
の中でLambda関数に紐付けるセキュリティグループ、サブネットを指定しています。 <sg-id>
と <subnet-id>
は任意のものを指定してください。 environment
の中でLambda関数から参照する環境変数を指定します。それぞれElastiCacheのエンドポイントと、DynamoDBのテーブル名です。 <elasticache_endpoint>
は環境毎に変わります。
今回ElastiCacheのエンジンはRedisを利用しています。PythonからRedisを操作するためにこちらのパッケージを利用しているので hooks
でデプロイ前にパッケージをインストールしています。
functions/cache_dynamodb_with_elasticache_redis/main.py
Lambda関数のコードがこちらです。DynamoDBへのデータ登録、登録したデータをキャッシュにのせる処理を行っています。
from __future__ import print_function
import os
import time
import boto3
import redis
import random
import string
ELASTI_CACHE_ENDPOINT = os.environ["ElastiCacheEndpoint"]
DYNAMO_DB_TABLE_NAME = os.environ["DynamoDBTableName"]
WORD = "".join([random.choice(string.ascii_letters + string.digits) for n in xrange(16)])
TIME = str(time.time()).split(".")[0]
def handle(event, context):
client = boto3.client("dynamodb")
r = redis.StrictRedis(
host=ELASTI_CACHE_ENDPOINT,
port=6379,
db=0
)
client.put_item(
TableName=DYNAMO_DB_TABLE_NAME,
Item={
"Word": {"S": WORD},
"CreatedAt": {"N": TIME}
}
)
resp = client.scan(
TableName=DYNAMO_DB_TABLE_NAME,
ConsistentRead=True
)
last_range_key = sorted([x["CreatedAt"]["N"] for x in resp["Items"]])[0]
last_hash_key = [x for x in resp["Items"] if last_range_key == x["CreatedAt"]["N"]][0]["Word"]["S"]
if (int(TIME) - int(last_range_key)) > 30:
client.delete_item(
TableName=DYNAMO_DB_TABLE_NAME,
Key={
"Word": {"S": last_hash_key},
"CreatedAt": {"N": last_range_key}
}
)
if r.exists(last_hash_key):
r.delete(last_hash_key)
print("Deleted from cache: {}, {}".format(last_hash_key, last_range_key))
else:
print("{} does not exist".format(last_hash_key))
for item in resp["Items"]:
hash_key = item["Word"]["S"]
range_key = item["CreatedAt"]["N"]
redis_keys = r.scan_iter()
if hash_key in redis_keys:
print("Cache hit: {}, {}".format(hash_key, r.get(hash_key)))
else:
print("Cache miss: {}, {}".format(hash_key, range_key))
r.set(hash_key, range_key)
print("Added to cache: {}, {}".format(hash_key, range_key))
処理の概要は以下の通りです。
- DynamoDBにputItem
適当な文字列と作成日をそれぞれ Word
と CreatedAt
属性にputItemして、Lambda関数の起動毎にデータを追加しています。
- DynamoDBからスキャン
全データを取得したいので scan
メソッドでスキャンします。
DynamoDBには結果整合性と強い整合性がありますが、今回は強い整合性読み込みを行っています。 scan
メソッドの ConsistentRead=True
がそれです。結果整合性の場合 put_item
メソッドでアイテムを追加した直後にスキャンすると、結果がまだ反映されてないことがあるためです。ただし、強い整合性の読み込みは結果整合性の読み込みに比べて2倍の費用がかかるので、ご利用する際はご注意ください。
実際に使う場合はまずキャッシュにのっているか確認した後、載っていなかったらDynamoDBからデータを取得するという処理になります。手抜きです。
- 取得したデータから一番古いアイテムを取得
リスト内包表記でごちゃごちゃやっています。もっとうまいやり方がある気がする。
- 古いデータの削除
現在時刻と比較して古いデータは、DynamoDBには delete_item
で、ElastiCacheには delete
でそれぞれデータとキャッシュを削除しています。
- データの表示
DynamoDBとElastiCacheに入っているデータを表示しています。キャッシュにのっていないデータは set
メソッドで登録しています。
infrastructure/modules/dynamodb/dynamodb.tf
DynamoDBのテーブル定義をしているファイルです。パーティションキーに Word
を、ソートキーに CreatedAt
を指定しています。RCU/WCUは適当に5にしています。
resource "aws_dynamodb_table" "dynamodb" {
name = "${var.name}"
read_capacity = 5
write_capacity = 5
hash_key = "Word"
range_key = "CreatedAt"
attribute {
name = "Word"
type = "S"
}
attribute {
name = "CreatedAt"
type = "N"
}
}
infrastructure/modules/elasticache/elasticache.tf
ElastiCacheを構築しているコードです。特に凝ったことはやってないです。サブネットグループとパラメータグループを作成して、クラスタを作っているだけになります。
resource "aws_elasticache_subnet_group" "redis" {
name = "${replace(var.name, "_", "-")}"
subnet_ids = ["${var.private_subnet_id}"]
description = "${replace(var.name, "_", " ")}"
}
resource "aws_elasticache_parameter_group" "redis" {
name = "${replace(var.name, "_", "-")}"
family = "redis2.8"
description = "${replace(var.name, "_", " ")}"
}
resource "aws_elasticache_cluster" "redis" {
cluster_id = "${element(split("_", var.name), 0)}"
engine = "redis"
engine_version = "2.8.24"
maintenance_window = "sun:05:00-sun:06:00"
node_type = "cache.t2.micro"
num_cache_nodes = 1
parameter_group_name = "${aws_elasticache_parameter_group.redis.id}"
port = 6379
subnet_group_name = "${aws_elasticache_subnet_group.redis.name}"
security_group_ids = ["${var.security_group_id_redis}"]
tags {
Name = "${var.name}"
}
}
実行方法
Apexの実行方法は以前のエントリに書きました。詳細はそちらを参照してください。
- ApexとTerraformでCloudWatch EventsによりInvokeされるLambda関数をデプロイする
- ApexとTerraformでCloudWatch Events Schedule x Lambda x SNS を設定する
動作確認
今回のLambda関数はCloudWatch Eventsで1分毎に実行させています。勝手に実行してくれるのでログを垂れ流して動作確認しましょう。
まず何もない状態で起動させると以下のようにキャッシュミスの発生と、キャッシュへの登録を実行した旨表示されると思います。
$ apex logs cache_dynamodb_with_elasticache_redis --follow
/aws/lambda/cache_dynamodb_with_elasticache_redis Cache miss: XXIBTpkVITbXrF5Y, 1470370027
/aws/lambda/cache_dynamodb_with_elasticache_redis Added to cache: XXIBTpkVITbXrF5Y, 1470370027
/aws/lambda/cache_dynamodb_with_elasticache_redis END RequestId: 1345de42-5ac2-11e6-b72a-bf963f993f51
/aws/lambda/cache_dynamodb_with_elasticache_redis REPORT RequestId: 1345de42-5ac2-11e6-b72a-bf963f993f51 Duration: 745.80 ms Billed Duration: 800 ms Memory Size: 128 MB Max Memory Used: 27 MB
しばらく経つと以下のようにキャッシュヒットの発生と、古いデータの削除をした旨表示されると思います。
/aws/lambda/cache_dynamodb_with_elasticache_redis START RequestId: e8509839-5ac2-11e6-902b-9b7fda174ec6 Version: $LATEST
/aws/lambda/cache_dynamodb_with_elasticache_redis Deleted from cache: 1aFX1dnR5RS751aL, 1470370025
/aws/lambda/cache_dynamodb_with_elasticache_redis Cache hit: XXIBTpkVITbXrF5Y, 1470370027
/aws/lambda/cache_dynamodb_with_elasticache_redis Cache miss: 4dEITtD1l9Mj1Z6b, 1470370385
/aws/lambda/cache_dynamodb_with_elasticache_redis Added to cache: 4dEITtD1l9Mj1Z6b, 1470370385
/aws/lambda/cache_dynamodb_with_elasticache_redis Cache miss: 1aFX1dnR5RS751aL, 1470370025
/aws/lambda/cache_dynamodb_with_elasticache_redis Added to cache: 1aFX1dnR5RS751aL, 1470370025
/aws/lambda/cache_dynamodb_with_elasticache_redis END RequestId: e8509839-5ac2-11e6-902b-9b7fda174ec6
/aws/lambda/cache_dynamodb_with_elasticache_redis REPORT RequestId: e8509839-5ac2-11e6-902b-9b7fda174ec6 Duration: 776.68 ms Billed Duration: 800 ms Memory Size: 128 MB Max Memory Used: 27 MB
まとめ
いかがでしょうか。
DynamoDBとElastiCacheを組み合わせることでRCUを下げられることを確認しました。
本エントリがみなさんの参考になれば幸いです。