話題の記事

DynamoDBにおけるスループット超過対策 〜 Fallback-Queueingパターン

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

amazon-dynamodb

よく訓練されたアップル信者、都元です。今日はAmazon DynamoDBのちょっと厄介な特性とその対処方法について。

DynamoDBの課金モデル

今さら説明することでもないかもしれませんが、DynamoDBはプロビジョニングされたスループットに対して課金されます。 意味わかりますか? さっぱり伝えられてる気がしませんw 「provisioned throughput」ってどう訳せばいいんですかねぇ。

おっと、いきなり脱線してしまいましたが。つまり「どのくらいの頻度でどのくらいのデータ量を読み書きをする予定なのか」というユーザからの事前申告に基づいて、Amazonがその処理能力を確保し、その確保分に対して料金が発生します。具体的には1秒あたり1KBのデータを100回書き込みたければ、100ユニットの書き込みスループットを設定し、この100ユニットに対する課金を支払います。実際にどの程度読み書きされたのかは関係ありません *1

で、この「処理能力の上限」を超えたらどうなるのか。リクエストスループットは、プロビジョニングされた容量を超えると調整されます。 意味わかりますか? さっぱり伝えられてる気がしませんw 要するにリクエストが失敗します。Java SDKにおいてはProvisionedThroughputExceededExceptionという例外が発生します。

失敗を発生させないために出来る対策としては、まずは簡単にお金で解決ということになりますが、バッファを見込んで必要以上に大きなスループットを確保するというのも、今ひとつスマートさに欠ける気がしてなりません。また、どれだけ大きなスループットを確保したところで、一瞬のスパイクによってリクエストが失敗する可能性をゼロにはできません。つまり、DynamoDBに対するI/Oのリクエストは常に「失敗するかもしれない」という考慮が必要です。

データの読み出しに関しては、失敗したとしても「データのロスト」という最悪の事態は発生しません。時間を置いて処理をやり直せば済むことが多いです。しかしデータの書き込みに失敗すると、書き込むべきデータは行き場を失ってしまいます。このデータロストを許容するのかしないのか。場合によっては許容するのも一つの選択肢だとは思います。しかし、許容しないのであれば、失敗する前提で何らかのプログラマティックな対処が必要となります。

そこで、本エントリーでは、DynamoDBの書き込み失敗をAmazon SQSでフォローするアーキテクチャを紹介します。

Fallback-Queueingパターン

fallbacked-queue

調子に乗ってパターン名など命名してみましたが、要するに失敗したらキューに突っ込んでおいて、落ち着いてからリトライする、というパターンです。非常に分かりやすいアーキテクチャですね。これを実現するのに必要な仕組みは2つ。

  • DynamoDBへのputについて、失敗した場合はSQSに転送するようなAPIプロキシ
  • SQSを監視し、メッセージを見つけ次第DynamoDBに再putを試みるワーカー

これらを実装して行きます。

AWS SDK for Java v1.4

ところで先日、AWS SDK for Javaのv1.4がリリースされました。JavaのSDKでこの機能がキレイに実装できるのは、実はこのバージョンのお陰です。

v1.4の変更点に「Model classes now implement java.io.Serializable」というものがあります。要するに、APIに対するリクエストやレスポンスオブジェクトがシリアライズ可能であることが明示的に保証された、ということです。シリアライズ可能、ということはすなわち、これらのオブジェクトをbyte[](バイト列)やString(文字列)に変換してディスク等に保存したり、他システムに転送をしても構わない、ということです。

この性質を利用すると、com.amazonaws.services.dynamodb.model.PutItemRequestをシリアライズにより文字列に変換してSQSに突っ込む、ということが非常に楽になります。v1.3までのシリアライズ可能性が保証されていない状態では、格納したいデータを適切に文字列化し、その文字列を元に再びデータを復元するという処理を、データの形式に基づいて個別に実装する必要がありました。しかし、PutItemRequestSerializableであるということは、このリクエストオブジェクトを文字列化してしまえば全てが済む、ということです。

検証においては、ObjectOutputStream等を使ってbyte[]に変換した後、16進エンコードによって文字列化したものをSQSに突っ込む、ということをしました。参考までに下にシリアライズ処理のコードを貼っておきます。例外処理とかがアンマリですが、本質ではないので、さくっと次に進みましょう。ちなみに、16進エンコードのロジックはSpring FrameworkのHexクラスから拝借したしやした。

class Serializer<T extends Serializable> {
  
  private static final char[] HEX = {
    '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'
  };
  
  static byte[] decode(CharSequence s) {
    int nChars = s.length();
    
    if (nChars % 2 != 0) {
      throw new IllegalArgumentException("Hex-encoded string must have an even number of characters");
    }
    
    byte[] result = new byte[nChars / 2];
    
    for (int i = 0; i < nChars; i += 2) {
      int msb = Character.digit(s.charAt(i), 16);
      int lsb = Character.digit(s.charAt(i + 1), 16);
      
      if (msb < 0 || lsb < 0) {
        throw new IllegalArgumentException("Non-hex character in input: " + s);
      }
      result[i / 2] = (byte) ((msb << 4) | lsb);
    }
    return result;
  }
  
  static char[] encode(byte[] bytes) {
    final int nBytes = bytes.length;
    char[] result = new char[2 * nBytes];
    
    int j = 0;
    for (int i = 0; i < nBytes; i++) {
      // Char for top 4 bits
      result[j++] = HEX[(0xF0 & bytes[i]) >>> 4];
      // Bottom 4
      result[j++] = HEX[(0x0F & bytes[i])];
    }
    
    return result;
  }
  
  public String serialize(T serializable) {
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    try (ObjectOutputStream out = new ObjectOutputStream(baos)) {
      out.writeObject(serializable);
    } catch (Exception e) {
      throw new RuntimeException("unexpected exception", e);
    }
    return new String(encode(baos.toByteArray()));
  }
  
  @SuppressWarnings("unchecked")
  public T deserialize(String serialized) {
    byte[] decoded = decode(serialized);
    try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(decoded))) {
      return (T) in.readObject();
    } catch (Exception e) {
      throw new RuntimeException("unexpected exception", e);
    }
  }
}

テストも一応。

public class SerializerTest {
  
  @Test
  public void testSerializeAndDeserialize() throws IOException {
    Serializer<Hello> serializer = new Serializer<>();
    String serialized = serializer.serialize(new Hello("classmethod rocks"));
    Hello deserialized = serializer.deserialize(serialized);
    assertThat(deserialized.data, is("classmethod rocks"));
  }
  
  @SuppressWarnings("serial")
  private static class Hello implements Serializable {
    
    private final String data;
    
    public Hello(String data) {
      this.data = data;
    }
  }
}

fallbackを実装したAmazonDynamoDBのラッパーを作る

fallbackの機能は、AmazonDynamoDBのラッパーとして実装してみました。まぁ厳密に見ればリスコフ則を破る形になっていますが、便利さとのトレードオフで便利さが勝利した結果です。

まず、形の大枠はこんな感じ。先ほどのSerializerは静的に持っちゃいます。コンストラクタで、移譲先のAmazonDynamoDBとフォールバック先のAmazonSQS及びキュー名を受け取ります。AmazonDynamoDBインターフェイスにより実装を求められるメソッドは、基本的にそのままdelegateに転送します。

package jp.classmethod.aws.fallbackqueueing;

import java.io.IOException;

import com.amazonaws.*;
import com.amazonaws.regions.*;
import com.amazonaws.services.dynamodb.*;
import com.amazonaws.services.dynamodb.model.*;
import com.amazonaws.services.sqs.*;
import com.amazonaws.services.sqs.model.*;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FallbackedAmazonDynamoDBClient implements AmazonDynamoDB {
  
  private static Logger logger = LoggerFactory.getLogger(FallbackedAmazonDynamoDBClient.class);
  
  private static final Serializer<PutItemRequest> SERIALIZER = new Serializer<PutItemRequest>();
  
  private final AmazonDynamoDB delegate;
  
  private final AmazonSQS sqs;
	
  private final String queueName;
  
  public FallbackedAmazonDynamoDBClient(AmazonDynamoDB delegate, AmazonSQS sqs, String queueName) {
    this.delegate = delegate;
    this.sqs = sqs;
    this.queueName = queueName;
  }
  
  @Override
  public void setEndpoint(String endpoint) throws IllegalArgumentException {
    delegate.setEndpoint(endpoint);
  }

  // その他、数多くの委譲メソッドは省略
}

これらの委譲メソッドの中で、putItemにだけ細工をします。ProvisionedThroughputExceededExceptionが発生したら、SQSに転送する、という処理ですね。下記の例では、目当てのキューが無ければ作ったりもします。この辺りは、組み込む先のシステムのポリシーに合わせて上手く実装してください。

ちなみに、フォールバックが動いた時は戻り値がnullになるってところがアレです。まぁ、そういうモンです。

  @Override
  public PutItemResult putItem(PutItemRequest putItemRequest) throws AmazonClientException {
    try {
      PutItemResult putItemResult = delegate.putItem(putItemRequest);
      logger.trace("putItem completed successfully: {} -> {}", putItemRequest, putItemResult);
      return putItemResult;
    } catch (ProvisionedThroughputExceededException e) {
      logger.info("putItem failed because provisioned throughput exceeded: {}", putItemRequest);
      enqueue(putItemRequest);
    }
    return null;
  }

  private void enqueue(PutItemRequest putItemRequest) {
    String serializedRequest = SERIALIZER.serialize(putItemRequest);
    String queueUrl;
    try {
      queueUrl = sqs.getQueueUrl(new GetQueueUrlRequest(queueName)).getQueueUrl();
    } catch (AmazonServiceException e) {
      CreateQueueResult result = sqs.createQueue(new CreateQueueRequest().withQueueName(queueName));
      queueUrl = result.getQueueUrl();
      logger.info("New queue [{}] created", queueName);
    }
    sqs.sendMessage(new SendMessageRequest().withQueueUrl(queueUrl).withMessageBody(serializedRequest));
    logger.debug("putItemRequest enqueued: {}", putItemRequest);
  }

SQSワーカーの実装

さて、fallbackが実装されたクライアントは出来上がりましたので、今度はワーカーを実装していきます。こちらは外のスレッドで延々とポーリングを続けるような実装です。SQS APIのロングポーリング機能を活用し、リクエスト頻度は最長で約20秒間隔まで抑えました。この辺りの頻度もシステムのポリシーに合わせてどうぞ。

package jp.classmethod.aws.fallbackqueueing;

import java.util.List;

import com.amazonaws.*;
import com.amazonaws.services.dynamodb.*;
import com.amazonaws.services.dynamodb.model.*;
import com.amazonaws.services.sqs.*;
import com.amazonaws.services.sqs.model.*;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FallbackQueueWorker implements Runnable {
  
  private static Logger logger = LoggerFactory.getLogger(FallbackQueueWorker.class);
  
  private static final Serializer<PutItemRequest> SERIALIZER = new Serializer<PutItemRequest>();
  
  private final AmazonDynamoDB dynamoDB;
  
  private final AmazonSQS sqs;
  
  private final String queueName;
  
  
  public FallbackQueueWorker(AmazonDynamoDB dynamoDB, AmazonSQS sqs, String queueName) {
    this.dynamoDB = dynamoDB;
    this.sqs = sqs;
    this.queueName = queueName;
  }
  
  @Override
  public void run() {
    logger.trace("worker#run()");
    String queueUrl = getQueueUrl();
    if (queueUrl == null) {
      return;
    }
    
    ReceiveMessageResult receiveMessageResult = sqs.receiveMessage(new ReceiveMessageRequest()
      .withQueueUrl(queueUrl)
      .withWaitTimeSeconds(20));
    List<Message> messages = receiveMessageResult.getMessages();
    logger.debug("{} messages recieved", messages.size());
    for (Message message : messages) {
      PutItemRequest putItemRequest = SERIALIZER.deserialize(message.getBody());
      try {
        PutItemResult putItemResult = dynamoDB.putItem(putItemRequest);
        logger.debug("fallbacked-putItem completed successfully: {} -> {}", putItemRequest, putItemResult);
        sqs.deleteMessage(new DeleteMessageRequest()
          .withQueueUrl(queueUrl).withReceiptHandle(message.getReceiptHandle()));
      } catch (ProvisionedThroughputExceededException e) {
        logger.info("fallbacked-puItem failed: {}", putItemRequest);
      }
    }
  }
  
  private String getQueueUrl() {
    try {
      return sqs.getQueueUrl(new GetQueueUrlRequest().withQueueName(queueName)).getQueueUrl();
    } catch (AmazonServiceException e) {
      // ignore
    }
    return null;
  }
}

つかいかた

役者が揃いました。FallbackedAmazonDynamoDBClientを生成して、普通に使えます。ただし、前述の通り、フォールバックが発生した場合、putItemの戻りがnullになることだけはくれぐれもご注意を。

AmazonDynamoDBClient d = new AmazonDynamoDBClient();
AmazonSQSClient q = new AmazonSQSClient();
AmazonDynamoDB dynamoDB = new FallbackedAmazonDynamoDBClient(d, q, QUEUE_NAME);

一方、ワーカーの方は、executorに繰り返し実行させます。

ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
executorService.scheduleWithFixedDelay(new FallbackQueueWorker(d, q, QUEUE_NAME), 0, 10, TimeUnit.MILLISECONDS);

検証

さて、以上で実装は完了。あとは検証ですが、わざとProvisionedThroughputExceededExceptionを発生させなければならないので、これが意外とめんどくさいのです。

まず、AmazonDynamoDBClientにはデフォルトで、APIリクエストの失敗時に再試行を行うようになっています。この仕組みは以下のようにして無効にします。

AmazonDynamoDBClient dynamoDB = new AmazonDynamoDBClient(new ClasspathPropertiesFileCredentialsProvider());
dynamoDB.setConfiguration(new ClientConfiguration().withMaxErrorRetry(0));

続いて、書き込みスループットを1(最小値)に設定したテーブルを作り *2…。

CreateTableRequest createTableRequest = new CreateTableRequest()
  .withTableName(tableName)
  .withKeySchema(new KeySchema(new KeySchemaElement()
    .withAttributeName("name")
    .withAttributeType(ScalarAttributeType.S)))
  .withProvisionedThroughput(new ProvisionedThroughput()
    .withReadCapacityUnits(1L).withWriteCapacityUnits(1L));
dynamoDB.createTable(createTableRequest);

ループで延々とitemをputしてみました。筆者の環境では300台後半でProvisionedThroughputExceededExceptionが発生しました。そのためITEM_SIZE400としましたが、状況に応じて適当に調整してください。

for (int i = 0; i < ITEM_SIZE; i++) {
  Map<String, AttributeValue> item = new HashMap<>();
  item.put("name", new AttributeValue(Integer.toString(i)));
  PutItemRequest req = new PutItemRequest(tableName, item);
  PutItemResult res = dynamoDB.putItem(req);
  logger.info("PutItemResult {} = {}", i, res);
}

FallbackedAmazonDynamoDBClientについてもFallbackQueueWorkerを回しながら同じことをしてみましょう。

ClasspathPropertiesFileCredentialsProvider credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();
AmazonDynamoDBClient d = new AmazonDynamoDBClient(credentialsProvider);
d.setConfiguration(new ClientConfiguration().withMaxErrorRetry(0));
AmazonSQSClient q = new AmazonSQSClient(credentialsProvider);
AmazonDynamoDB dynamoDB = new FallbackedAmazonDynamoDBClient(d, q, QUEUE_NAME);

ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
executorService.scheduleWithFixedDelay(new FallbackQueueWorker(d, q, QUEUE_NAME), 0, 10, TimeUnit.MILLISECONDS);

この結果、やはり300台後半からDynamoDBへのput操作は失敗し始め、queueにエントリが溜まるようになりました。全てのitemをputした後、queue-workerもおとなしくなった頃合いを見計らって、全itemをscanで取得し、数を数えてみた結果、全てのitemがDynamoDBに格納できていることを確認できました。

ScanResult scanResult = dynamoDB.scan(new ScanRequest().withTableName(TABLE_NAME));
assertThat(scanResult.getCount(), is(ITEM_SIZE));
logger.info("all items are available!!");

まとめ

というわけで、DynamoDBを使う場合は避けて通れない、スループット超過への対処方法について、実装を交えて解説してみました。ちなみに、本エントリの考え方の道筋を示してくれたのは「Using SQS to throttle DynamoDB throughput」でした。やってみたら思いのほかキレイに実装できて、よかったよかった。

脚注

  1. 厳密には、データ転送料や保存料がありますが、今回のテーマの範囲外なのでひとまず考えないこととします。
  2. そして、テーブルの準備ができるまで数分待機し…。