cats-sagaとFault Injectionによるキャンセル処理のテスト

cats-sagaとPimp My Libraryパターンによる障害注入を組み合わせることでトランザクションのロールバックテストが行える設計を試してみました。
2020.11.25

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

cats-saga を使うと合成された処理に対する補償処理(キャンセルや、ロールバックなど)をシンプルに実装できます。 今回はこれに加えてトランザクションのロールバックを実装中に各処理でのキャンセル処理を意図的に発生させることで、ロールバック処理のテストをできるようにしてみました。

使ったライブラリ、言語のバージョン

今回使ったbuild.sbtは以下の通りです。

name := "cats-saga-example"
version := "0.1"

scalaVersion := "2.13.4"
libraryDependencies += "com.vladkopanev" %% "cats-saga" % "0.3.0"

cats-saga

cats-sagaではCompensable[F]によって1階の高カインド型に補償処理を追加できます。 以下のコード例はcats-saga READMEより抜粋の抜粋です。

def orderSaga[F[_]: Concurrent](): F[Unit] = {
  import com.vladkopanev.cats.saga.Saga._
    
  (for {
    _ <- collectPayments(2d, 2) compensate refundPayments(2d, 2)
    _ <- assignLoyaltyPoints(1d, 1) compensate cancelLoyaltyPoints(1d, 1)
    _ <- closeOrder(1) compensate reopenOrder(1)
  } yield ()).transact
}

FaultInjection

FaultInjectionは以下のように任意のEffect型に対してImplicit classにより例外を発生させるメソッドを拡張します。

final case class FaultInjectionContext(errorOn: Option[String])

object FaultInjectionContext {

  def apply(errorOn: String): FaultInjectionContext = apply(errorOn.some)

  //任意のEffectにFaultInjection機能を実装するヘルパー
  implicit class EffectFaultInjectionOps[F[_] : Effect, A](ops: F[A]) {
    def runWithFaultInjection(name: String)(implicit ctx: FaultInjectionContext): F[A] = {
      //コンテキストで指定されているエラーの注入点が呼び出しているポイントと同じならエラーにする
      if (ctx.errorOn.exists(_ === name)) Effect[F].raiseError(new RuntimeException(s"FaultInjected Error: $name"))
      else ops
    }
  }
}

サンプルコード(全文)

上記を組み合わせたサンプルコードが下記です。

package example

import java.util.UUID

import cats.effect.{Effect, ExitCode, IO, IOApp}
import cats.implicits._
import com.vladkopanev.cats.saga.Saga._

final case class FaultInjectionContext(errorOn: Option[String])

object FaultInjectionContext {

  def apply(errorOn: String): FaultInjectionContext = apply(errorOn.some)

  //任意のEffectにFaultInjection機能を実装するヘルパー
  implicit class EffectFaultInjectionOps[F[_] : Effect, A](ops: F[A]) {
    def runWithFaultInjection(name: String)(implicit ctx: FaultInjectionContext): F[A] = {
      //コンテキストで指定されているエラーの注入点が呼び出しているポイントと同じならエラーにする
      if (ctx.errorOn.exists(_ === name)) Effect[F].raiseError(new RuntimeException(s"FaultInjected Error: $name"))
      else ops
    }
  }

}

final case class Order(numSeeds: Long, orderCode: String)

final case class PaymentResult(amount: BigDecimal)

//支払いサービス
class PaymentService[F[_] : Effect] {


  def makePayment(amount: BigDecimal): F[PaymentResult] =
    Effect[F].delay(println(s"make payment $amount")) *> Effect[F].pure(PaymentResult(amount))

  def cancelPayment(payment: PaymentResult): F[Unit] = Effect[F].delay(println(s"payment canceled ${payment.amount}"))

}

//配達サービス
class OrderDeliveryService[F[_] : Effect] {

  def ship(numSeeds: Long): F[Order] = for {
    order <- Effect[F].pure(Order(numSeeds, UUID.randomUUID().toString))
    _ <- Effect[F].delay(println(s"Order(${order.orderCode}) is about to shipped"))
  } yield order

  def cancel(order: Order): F[Unit] = Effect[F].delay(println(s"Order(${order.orderCode}) is canceled"))


}


//シードの注文トランザクション
class OrderTransaction[F[_] : Effect](implicit paymentService: PaymentService[F], orderDeliveryService: OrderDeliveryService[F]) {

  import FaultInjectionContext._

  def orderSeeds(numSeeds: Long)(implicit ctx: FaultInjectionContext): F[Order] = (for {
    _ <- paymentService.makePayment(5 * numSeeds).runWithFaultInjection("PAYMENT").compensate { (e: Either[Throwable, PaymentResult]) =>
      //支払いのキャンセル
      e match {
        case Right(r) => paymentService.cancelPayment(r)
        case Left(_: Throwable) => Effect[F].delay(println("no payment cancel is needed."))
      }
    }
    order <- orderDeliveryService.ship(numSeeds).runWithFaultInjection("SHIPMENT").compensate { (e: Either[Throwable, Order]) =>
      //発送のキャンセル
      e match {
        case Right(o) => orderDeliveryService.cancel(o)
        case Left(_: Throwable) => Effect[F].delay(println("no order cancel is needed."))
      }
    }
  } yield order).transact

}


object App extends IOApp {
  implicit lazy val paymentService: PaymentService[IO] = new PaymentService[IO]
  implicit lazy val orderDeliveryService: OrderDeliveryService[IO] = new OrderDeliveryService[IO]
  implicit lazy val orderTransaction: OrderTransaction[IO] = new OrderTransaction[IO]

  override def run(args: List[String]): IO[ExitCode] = for {
    _ <- IO(println("**** ok ****"))
    _ <- orderTransaction.orderSeeds(10)(FaultInjectionContext(none))
    _ <- IO(println("**** fail on PAYMENT ****"))
    _ <- orderTransaction.orderSeeds(10)(FaultInjectionContext("PAYMENT")).handleErrorWith {
      case e => IO(println(s"Error on PAYMENT, ${e.getMessage}"))
    }
    _ <- IO(println("**** fail on SHIPMENT **** "))
    _ <- orderTransaction.orderSeeds(10)(FaultInjectionContext("SHIPMENT")).handleErrorWith {
      case e => IO(println(s"Error on SHIPMENT, ${e.getMessage}"))
    }
  } yield ExitCode.Success
}

実行結果

実行結果です。各ポイントでエラーを発生させてロールバック処理を実行させることができました。

**** ok ****
make payment 50
Order(d9ebe08d-a1f9-4316-811c-94b7ee30865c) is about to shipped
**** fail on PAYMENT ****
no payment cancel is needed.
Error on PAYMENT, FaultInjected Error: PAYMENT
**** fail on SHIPMENT **** 
make payment 50
no order cancel is needed.
payment canceled 50
Error on SHIPMENT, FaultInjected Error: SHIPMENT

まとめ

cats-sagaとPimp My Libraryパターンによる障害注入を組み合わせることでトランザクションのロールバックテストが行えました。 FaultInjectionContextをHTTPヘッダから生成すればデプロイしているAPIに対してもテストが行えます。