Redisのsetnxを使ってマルチサーバ環境での Web API ロック機構を実現する

Redisのsetnxを使ってマルチサーバ環境での Web API ロック機構を実現する

Clock Icon2016.09.02

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

「これ、同時にリクエスト飛んできた場合って、どうなるの?」
「2つ登録されちゃいますね…」

はじめに

Web APIは、リクエストを受け、処理を終えた後レスポンスを返します。モバイルアプリやウェブアプリとセットで開発しているとつい忘れがちになってしまいますが、同時に同じリクエストが飛んできた場合にどういう挙動をするか ということは常に考慮しておかなくてはなりません。「UI上同時にリクエストが送れないから問題なし」では Web APIの設計が不十分です。別の端末から同時に同じリクエストを送ることも可能ですので、UIとは切り離して考えるべきです。

さて、この状況、GETリクエストについては大きな問題にはならないでしょう。いくらリクエストが来てもサーバ側の状態に変化はないので、負荷対策だけしておけば大丈夫そうです。問題はPUTリクエストやPOSTリクエストといった、サーバ側のリソースに変化を及ぼすリクエストです。冪等性が担保されているものについては問題ないですが、新規リソース登録など、同時にリクエストが走ると想定とは異なる結果を生むものもあるでしょう。そこで「処理をロックする」という考えが生まれます。今回は処理のロックについてマルチサーバでの実現手段を考えてみます。

モノリシックなサーバサイドアプリケーションの場合、プログラム言語の排他制御機構やデータベースのトランザクションを生かすことがロックの常套手段でした。しかし最近はそうも言ってられません。

  • 複数サーバで動作することが当たり前となり、あるサーバで排他制御したとしても、別のサーバで同時に実行される
  • ひとつの処理の中で複数のデータベースにアクセスするケースもある
  • ひとつの処理の中で別のマイクロサービスを呼び出す構成もある

こうなると、プログラム言語やデータベースの機能だけでは、ロック機構を実現することが難しくなります。そこで、ElastiCacheでも利用可能な、Redisを使ってロック機構を実装してみることにします。Redisのsetnxの特性「同一キーのデータを登録しようとすると弾かれる」を使います。登録処理開始前にロックデータを作成、処理が終わり次第ロックデータを削除するというものです。この一連の流れを、ローカルにたてたRedisとPlayframework Scalaで試してみます。ローカルで成功すれば、ElastiCacheを利用することで同一リージョンのマルチサーバ環境におけるロック機構が実現できます。

やること

  • ロック機構を導入する対象としてユーザ登録処理を例にとる
  • Redisクライアントの実装
  • Redisクライアントを使ってユーザ登録処理のロックを実装
  • ロックできているか試す
  • ロック機構をテストする

ロック機構を導入する対象の処理 - ユーザ登録処理

以下のようなシーケンスで示される会員登録処理を考えてみましょう。(正常系のみです)

00_register

  1. クライアントから登録リクエストがあったら、会員登録処理をよぶ。
  2. 登録しようとしている会員がすでにいないかチェックし、問題なければ会員IDを発行する。
  3. ユーザ登録処理をコールする。保存先はNoSQLのデータベースとする。
  4. 初回登録特典として会員登録の際にポイントを登録する。ポイント管理システムは外部のサービスを利用しているものとする。
  5. 登録処理が完了したら、クライアントへレスポンスを返す。

この処理、同時に同じ内容のリクエストが複数飛んでくるとどうなるでしょうか。ひとつめの処理が完全におわり、ユーザーデータが登録された後であれば、ふたつめの処理は「すでに登録されているよ」と判定できそうですが、ほぼ同時のタイミングだと同じユーザが違うシーケンス番号で複数登録されてしまいそうです。不要なデータができてしまいますね。ロックしましょう。以下の図のようなイメージです。

00_register

ロックを行うところで、Redisのsetnxをよびます。

Redisクライアントの実装

setnxを実行するための汎用的なRedisクライアントを実装します。

trait RedisJsonAdapter[T] {

  def pool: RedisClientPool

  implicit def format: Format[T]

  protected def insertIfNotExists(key: String, ttl: Int, value: T): Boolean = pool.withClient { redis =>
    val json = Json.toJson(value)
    redis.setnx(key, json) match {
      case false => false
      case true => {
        val setExpire = redis.expire(key, ttl)
        if (setExpire) true else throw new RuntimeException("set expire failed.")
      }
    }
  }

  protected def deleteByKey(key: String): Unit = pool.withClient { redis =>
    redis.del(key)
  }
}

これを使って、ロック処理のクライアントを実装します。

class RegisterLockDao @Inject()(
    config: RegisterLockDaoConfig,
    @Named("register_lock") override val pool: RedisClientPool
) extends RedisJsonAdapter[Unit] with RegisterLockComponent {

  override implicit def format: Format[Unit] = new Format[Unit] {
    override def writes(o: Unit): JsValue = Json.obj()
    override def reads(json: JsValue): JsResult[Unit] = JsSuccess(())
  }

  override def lock(key: String): Either[IllegalStateException, Unit] =
  insertIfNotExists(key, config.ttl, ()) match {
    case true => Right(Unit)
    case false => Left(new IllegalStateException)
  }

  override def unlock(key: String): Unit = deleteByKey(key)

}

本来はキャッシュの値にJsonを登録できるのですが、今回はキーさえ登録されれば十分ですので値は空です。ロックに失敗した場合、IllegalStateExceptionをもつFutureを返すようにします。

Redisクライアントを使ってユーザ登録処理を実装

ロック処理を使って会員登録処理を実装します。図でいうと、「会員登録サービス」に相当する処理が以下です。

class UserRegisterService @Inject()(
    lockComponent: RegisterLockComponent, // -- ①
    userRepository: UserRepository,       // -- ②
    pointRepository: PointRepository      // -- ③
) {

  def register(userName: String): Future[(String, Int)] = {
    val bonusPoint = 100
    val result = for {
      _ <- findUser(userName)
      _ <- FutureSupport.eitherToFuture(lock(userName))       // -- ④
      userId = UUID.randomUUID().toString
      sessionId <- userRepository.register(userId, userName)  
      userPoint <- pointRepository.append(userId, bonusPoint)
    } yield (sessionId, userPoint)

    result.onComplete {
      case Failure(e: RegisterConflictError) => ()
      case _ => lockComponent.unlock(userName)                // -- ⑤
    }
    result
  }

  private def findUser(userName: String): Future[Unit] =
    for {
      maybeUser <- userRepository.findByName(userName)
      _ <- maybeUser match {
        case Some(id) => Future.failed(new UserAlreadyRegisteredError)
        case None => Future.successful(())
      }
    } yield ()


  private def lock(userName: String): Either[RegisterConflictError, Unit] =
    lockComponent.lock(userName).left.map {
      _ => new RegisterConflictError()                        // -- ⑥
    }
}
  • ① RegisterLockComponent : 先ほど記載した、ロック処理を受け持つコンポーネントです。その実態はRedisのsetnxを使ったキャッシュ書き込み処理でした。
  • ② UserRepository : ユーザー登録を受け持ちます。
  • ③ PointRepository : ポイントの登録を受け持ちます。
  • ④ ロック処理です
  • ⑤ ロック解除 : ロック失敗が原因のエラーが発生しているときは解除しません。それ以外は処理が完了次第ロックを解除します。
  • ⑥ エラー変換 : 下層で発生したEllegalStateExceptionを、アプリケーションの知識としてRegisterConflictErrorに変換します。コントローラは、このエラーを受け、さらにエラーレスポンスを生成します。

ロックできているか試す

実行して試します。まずはRedisを起動しましょう。

y-wada$ redis-server /usr/local/etc/redis.conf
88229:M 02 Sep 00:28:16.628 * Increased maximum number of open files to 10032 (it was originally set to 7168).

次にPlayframeworkを起動します。

y-wada$ ./activator run
[info] Loading project definition from /Users/y-wada/intellij/play-app/project

同時にリクエストを送ってみましょう。

10_201

一方のリクエストは成功しますが…

20_422

もう一方は、ロックにひっかかってコンフリクトエラーが発生します。成功ですね。

テストする

無事、ロック処理が成功することを確認できました。ところで…ロックに弾かれた場合、以降の処理は継続されないのでしょうか?ScalaでflatMapを使って処理を書いた場合、文脈が維持されるはずですので、ロック処理に失敗してFuture.failed(error)となった場合、以降の処理は実行されないはずです。

本当にそうか?ロック解除も想定通りの動きをするのか?確かめたい。ならばテストしましょう。確認する内容を一覧にします。

# 失敗箇所 実行されるはず 実行されないはず
1 ロック失敗
(つまり処理中)
ユーザ検索
ロック
ユーザ登録
ポイント加算
ロック解除
2 ユーザ登録失敗 ユーザ検索
ロック
ユーザ登録
ロック解除
ポイント加算
3 ポイント加算失敗 ユーザ検索
ロック
ユーザ登録
ポイント加算
ロック解除
-

代表として1番目のテストについて、Specs2のコードを載せます。

"登録処理がロックされている" should {

  trait Before {
    self: CommonBefore =>
    mockLockComponent.lock(anyString) returns Left(new IllegalStateException())
  }

  class Context extends CommonContext with Before

  "RegisterConflictErrorが発生" in new Context {
    val result = service.register(mockUserName)
    await(result) must throwA[RegisterConflictError]
  }

  "登録系処理は呼び出されず、ロックも解除されない" in new Context {
    val result = service.register(mockUserName)
    await(result) must throwA[Throwable]

    there was
        one(mockUserRepository).findByName(anyString) andThen
        one(mockLockComponent).lock(anyString) andThen
        no(mockUserRepository).register(anyString, anyString) andThen
        no(mockPointRepository).append(anyString, anyInt) andThen
        no(mockLockComponent).unlock(anyString)
  }
}

テスト実行します。

30_test-result

表で示したテストケースがパスしていることを確認できました。ここまでわかると安心感がありますね!

おわりに

Redisのsetnxを利用してロック機構を実現する例を実装しました。RedisをElastiCacheで利用すれば、同一リージョン内にあるEC2からは同じRedisにアクセスすることになるため、マルチサーバ環境でも問題なく動作するはずです。更新系の処理については、保護するべきものを見極めて適切に対処するようにしたいですね。

今回実装したサンプルはGitHubに公開しています。よろしければ参考にしてください。

参考

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.