fs2で遅延つき繰り返し処理

fs2で遅延付きの繰り返し実行を行うにはrepeatとfixedRateまたはfixedDelayを使います。
2021.02.05

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

この記事はfs2でn秒ごとにXXXを実行する処理を実装したのでそのメモです。

(具体的には5秒ごとにキューに問合せて取得してメッセージを処理するのを繰り返すというプログラムを作ったのです)

fixedDelayとfixedRate

n秒ごとにストリームから要素を取り出すには、以下の2つのAPIが使えます。

  • Stream#fixedDelay 要素の取り出しごとに指定された間隔を待機してから次の要素を取り出す
  • Stream#fixedRate 指定された間隔ごとに要素を取り出す

ドキュメントではこれらのAPIの違いはScheduledExecutorServiceのscheduleWithFixedDelayとscheduleAtFixedRateの違いと同じだと説明されています。

repeat

上記のAPIだけだと有限なストリームや副作用によって要素を生成するストリームからは有限個の要素しか取り出せません。繰り返し要素を取り出すには#repeatを使います。また繰り返しの個数を指定するrepeatNもあります。

fixedDelayもrepeatを使って実装されています。

def fixedDelay[F[_]](d: FiniteDuration)(implicit timer: Timer[F]): Stream[F, Unit] =
    sleep(d).repeat

実行例

以下のコードはfixedDelayとrepeatを組み合わせた実行例です。

fixedDelayの場合、repeatによって繰り返し要素が取り出されるたびにgreetingのIOが評価されていること、ストリームの途中に入っているsleepの影響でfixedDelayで指定した間隔に加えて+1秒おきに挨拶が表示されていることがわかります。

その後のfixedRateの実行結果ではgreetingは一定間隔で実行されるため、sleep後のコンソール出力とgreetingのタイミングはほぼ同時になっています。

package example

import cats.Show
import cats.effect.{Effect, ExitCode, IO, IOApp}
import fs2.Stream

import java.time.ZonedDateTime
import scala.concurrent.duration._
import cats.implicits._

object RepeatExample extends IOApp {

  implicit val show:Show[ZonedDateTime] = Show.show(t =>s"${t.getHour}:${t.getMinute}:${t.getSecond}")
  def greeting[F[_]: Effect]: F[String] = console("#greeting") *> Effect[F].pure("Hello")
  def console[F[_]: Effect](msg:String): F[Unit] = Effect[F].delay(println(s"${ZonedDateTime.now().show} $msg"))

  override def run(args: List[String]): IO[ExitCode] =
    Stream
      .fixedDelay(1.seconds)
      .zipRight(Stream.eval(greeting[IO]).repeat)
      .map(greeting => s"$greeting, birdie !")
      .evalMap(s => timer.sleep(1.seconds).map(_ => s))
      .evalMap(console[IO])
      .compile
      .drain
      .map(_ => ExitCode.Success)
}

/*
fixedDelayの場合

22:14:13 #greeting
22:14:14 Hello, birdie !
22:14:15 #greeting
22:14:16 Hello, birdie !
22:14:17 #greeting
22:14:18 Hello, birdie !
22:14:19 #greeting
22:14:20 Hello, birdie !
*/

/*
fixedRateの場合
2:17:22 #greeting
22:17:23 Hello, birdie !
22:17:23 #greeting
22:17:24 Hello, birdie !
22:17:24 #greeting
22:17:25 Hello, birdie !
22:17:25 #greeting
22:17:26 Hello, birdie !
22:17:26 #greeting
22:17:27 Hello, birdie !
22:17:27 #greeting
22:17:28 Hello, birdie !
22:17:28 #greeting
*/

まとめ

fs2で遅延付きの繰り返し実行を行うにはrepeatとfixedRateまたはfixedDelayを使います。

外部リソースの負荷を伺いながら繰り返しデータを読み書きする処理を実装する場面は少なくないのでこれくらいの簡潔さで実装できるAPIは便利だと思いました。