Cats Effect Semaphoreによるインメモリキャッシュストレージの実装

キャッシュスタンピードは高負荷のワークロード下の並列コンピューティングシステムにおけるカスケード障害の1つとして知られています。この記事ではキャッシュスタンピードの対策としてCats Effect Semaphoreを用いた非同期ノンブロッキングロックによるキャッシュストレージを実装しました。
2022.02.24

はじめに

キャッシュスタンピード は高負荷のワークロード下の並列コンピューティングシステムにおけるカスケード障害の1つとして知られています。この記事ではキャッシュスタンピードの対策としてCats Effect Semaphoreを用いた非同期ノンブロッキングロックによるキャッシュストレージを実装しました。

参考: 既知の対策

スタンピードに対するよく知られた対策についてはWikipediaのCache Stampede Mitigationで簡潔に紹介されています。

また複数ノード間でのスタンピード対策として「RedisConf17 - Internet Archive - Preventing Cache Stampede with Redis and XFetch」ではRedisを使ったProbabilistic early expirationの実装の詳細が紹介されています。

前提

今回はロックを使った対策を実装します。想定しているユースケースは以下の通りです。

  • ワークスティーリングスケジューラー上で稼働しているWEB API上で使用する
  • キャッシュはノード単位で保持する
  • キャッシュ対象はアクセストークンのような有効期限付きのデータで保持するデータは高々1個である

今回の実装の方針

上記の前提においてロックを使ったキャッシュストレージを実装します。主たる方針は下記の通りです。

  • キャッシュの更新はリソースを使用するタスクが(ロックを取得した上で)行う
  • ロック待ちタスクがスレッドをブロックしないように非同期ノンブロッキングロックが可能なプリミティブ(cats.effect.concurrent.MVarやSemaphore)を選択する
  • 更新時にはロックの取得前後でキャッシュの有効性を確認して有効な場合には更新を行わない(スタンピード抑制)

実装

上記の方針を元にした実装が下記になります。

package example.pool

import cats.effect.Concurrent
import cats.effect.concurrent.{Ref, Semaphore}
import cats.implicits._

trait ResourcePool[F[_]] {
  def use[A](op: ResourcePool.Resource => F[A]): F[A]
  def empty: F[Unit]
}

object ResourcePool {
  def create[F[_]: Concurrent](implicit fetch: Fetch[F]): F[ResourcePool[F]] =
    for {
      sem <- Semaphore[F](1)(implicitly[Concurrent[F]])
      ref <- Ref.of[F, Option[Resource]](None)
    } yield SemaphoreResourcePool[F](fetch)(sem, ref)

  sealed trait InvalidResourcePoolState

  trait Fetch[F[_]] {
    def fetch: F[Resource]
  }

  trait Resource {
    def expires: Boolean
  }

  private final case class SemaphoreResourcePool[F[_]: Concurrent](
      fetch: Fetch[F]
  )(sem: Semaphore[F], ref: Ref[F, Option[Resource]])
      extends ResourcePool[F] {

    override def use[A](op: Resource => F[A]): F[A] = ifUnavailable(update) *>
      ref.get >>= (_.fold[F[A]](
      Concurrent[F].raiseError(ResourceDidNotFetched)
    )(op))

    override def empty: F[Unit] =
      sem.withPermit(ref.set(None))

    private def update: F[Unit] = sem.withPermit(fetchIfUnavailable)

    private def fetchIfUnavailable =
      ifUnavailable(fetch.fetch >>= (r => ref.set(Some(r))))

    private def ifUnavailable[A](f: => F[A]): F[Unit] =
      expired >>= (Concurrent[F].whenA(_)(f))

    private def expired = ref.get map (_.fold(true)(_.expires))

  }

  final case object ResourceDidNotFetched
      extends RuntimeException
      with InvalidResourcePoolState

  object Fetch {
    def instance[F[_]](_fetch: => F[Resource]) = new Fetch[F] {
      override def fetch: F[Resource] = _fetch
    }
  }

}

動作確認

これを以下のようなドライバーで動作確認してみます。

ここでは次の点を確認しています。

  • 同じスケジューラー上で動作する別のキャッシュを使わないタスクの実行を妨げない
  • キャッシュの更新を同時に行うのが1タスクだけ
package example.pool

import cats.effect.{ContextShift, IO, Resource, Timer}
import cats.implicits._

import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

object UpdateCache extends App {

  val resourceTTL = 20.millis
  val taskDuration = 50.millis
  val numThreads = 10
  val numTasks = 5000
  val fetchDuration = 100.millis

  implicit def fetch(implicit timer: Timer[IO]): ResourcePool.Fetch[IO] =
    ResourcePool.Fetch.instance[IO](
      IO(println(s"*** fetching on ${Thread.currentThread().getName}***")) *>
        timer.sleep(fetchDuration) *>
        IO.pure(ExpiringResource.create(resourceTTL))
        <* IO(println("** fetch done **"))
    )

  implicit def timer(implicit ec: ExecutionContext): Timer[IO] = IO.timer(ec)
  implicit def contextShift(implicit ec: ExecutionContext): ContextShift[IO] =
    IO.contextShift(ec)
  val threadPool = Resource
    .make(IO(sc))(es => IO(es.shutdown()))
    .map(es => ExecutionContext.fromExecutor(es))
  val run = threadPool.use { implicit ec =>
    for {
      pool <- ResourcePool.create[IO]
      _ <- otherTask.foreverM.start
      _ <- pool.use(task).foreverM.start
      _ <- timer.sleep(3.seconds)
    } yield ()
  }

  def sc = Executors.newWorkStealingPool()

  def task(_r: ResourcePool.Resource)(implicit timer: Timer[IO]): IO[Unit] =
    IO(println("executing task")) *> timer.sleep(taskDuration)

  def otherTask(implicit timer: Timer[IO]) =
    IO(println("executing other task")) *> timer.sleep(taskDuration)
  run.unsafeRunSync()
}

実行時の標準出力は下記の通りです。上記の2点が実現できていることがわかります。

executing other task
executing other task
*** fetching on ForkJoinPool-1-worker-1***
executing other task
executing other task
** fetch done **
executing task
executing other task
*** fetching on ForkJoinPool-1-worker-1***
executing other task
executing other task
** fetch done **
executing task
executing other task
(略)

まとめ

キャッシュスタンピード対策としてcats.effect.concurrent.Semaphoreを使った非同期ノンブロッキングロックによるキャッシュストレージを実装しました。