akka http clientのmax-open-requestsについて

akka-httpのmax-open-requestsについて調べました。 
2020.10.22

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

akka http client APIを使っていて以下のようなエラーに遭遇したので関連する設定について調べました。

akka.stream.BufferOverflowException: Exceeded configured max-open-requests value of [4].

これは何か?

エラーメッセージに記載されている説明ページには以下のように説明されています(要約)

  • ホストごとにコネクションをプーリングしている
  • コネクション数は akka.http.host-connection-pool.max-connectionで設定する
  • プールを使うAPI(Http().singleRequest()など)でリクエストを送信すると、プールが処理できるリクエスト数を超えた場合はキューに入れられる
  • キューのサイズ(max-open-requests) を超えた場合はBufferOverflowExceptionが送出される

挙動を確認

サンプルコードで動作を確認してみました。

確認したいこと

max-open-requestsとmax-connectionの数を変更しながらmax-open-requestsの数を超えてリクエストを送信した時に成功するリクエスト、失敗するリクエストの数を確認しました。

build.sbt

今回使用したScala、ライブラリのバージョンは以下の通りです。

scalaVersion := "2.13.3"

lazy val AkkaVersion = "2.6.8"
lazy val AkkaHttpVersion = "10.2.1"
libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-stream" % AkkaVersion,
  "com.typesafe.akka" %% "akka-http" % AkkaHttpVersion,
  "io.monix" %% "monix" % "3.2.2"
)

サーバ

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
import monix.eval.Task
import monix.execution.Scheduler.Implicits.global

import scala.concurrent.duration._
import scala.io.StdIn

object SlowServer extends App {

  implicit val system: ActorSystem = ActorSystem("server")

  val route = path("") {
    get {
      handle {
        case _: HttpRequest => waitFor.map(_ => HttpResponse()).runToFuture
      }
    }
  }

  val bindingFuture = Http().newServerAt("0.0.0.0", 9000).bind(route)
  println(s"Server online at http://localhost:9000/\nPress RETURN to stop...")
  StdIn.readLine() // let it run until user presses return
  bindingFuture.flatMap(_.unbind())
    .onComplete(_ => system.terminate())

  def waitFor: Task[Unit] = Task.sleep(5.seconds)

}

クライアント

import java.time.LocalDateTime

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpRequest

import scala.util.{Failure, Success}

object Client extends App {
  implicit val system: ActorSystem = ActorSystem("client")
  import system.dispatcher

  for (i <- 1 to 8) {
    Http().singleRequest(HttpRequest(uri = "http://localhost:9000/")).onComplete {
      case Success(_) => println(s"[${LocalDateTime.now}] $i succeeded")
      case Failure(e) => println(s"[${LocalDateTime.now}] $i failed: $e")
    }
  }

}

結果(max-connection=1, max-open-requests=4)

[2020-10-21T18:08:50.099] 6 failed: akka.stream.BufferOverflowException: Exceeded configured max-open-requests value of [4]. This means that the request queue of this pool (HostConnectionPoolSetup(localhost,9000,ConnectionPoolSetup(ConnectionPoolSettings(1,0,5,4,1,Duration.Inf,100 milliseconds,2 minutes,30 seconds,ClientConnectionSettings(Some(User-Agent: akka-http/10.2.1),10 seconds,1 minute,512,None,WebSocketSettings(<function0>,ping,Duration.Inf,akka.http.impl.settings.WebSocketSettingsImpl$$$Lambda$179/1680503330@21460c97),List(),ParserSettings(2048,16,64,64,8192,64,Some(9223372036854775807),8388608,256,1048576,Strict,RFC6265,true,Set(),Full,Error,HashMap(If-Range -> 0, If-Modified-Since -> 0, If-Unmodified-Since -> 0, default -> 12, If-None-Match -> 0, User-Agent -> 32, Content-MD5 -> 0, Date -> 0, If-Match -> 0),false,false,true,akka.util.ConstantFun$$$Lambda$159/116734858@20111ce8,akka.util.ConstantFun$$$Lambda$159/116734858@20111ce8,akka.util.ConstantFun$$$Lambda$160/1551945522@31228ec9),100 milliseconds,None,Http2ClientSettingsImpl(256,65536,10000000,512000,1024,false,None),TCPTransport),1 second,List()),akka.http.scaladsl.HttpConnectionContext$@4517bff3,akka.event.MarkerLoggingAdapter@58530084))) has completely filled up because the pool currently does not process requests fast enough to handle the incoming request load. Please retry the request later. See http://doc.akka.io/docs/akka-http/current/scala/http/client-side/pool-overflow.html for more information.
[2020-10-21T18:08:50.099] 8 failed: akka.stream.BufferOverflowException: Exceeded configured max-open-requests value of [4]. This means that the request queue of this pool (HostConnectionPoolSetup(localhost,9000,ConnectionPoolSetup(ConnectionPoolSettings(1,0,5,4,1,Duration.Inf,100 milliseconds,2 minutes,30 seconds,ClientConnectionSettings(Some(User-Agent: akka-http/10.2.1),10 seconds,1 minute,512,None,WebSocketSettings(<function0>,ping,Duration.Inf,akka.http.impl.settings.WebSocketSettingsImpl$$$Lambda$179/1680503330@21460c97),List(),ParserSettings(2048,16,64,64,8192,64,Some(9223372036854775807),8388608,256,1048576,Strict,RFC6265,true,Set(),Full,Error,HashMap(If-Range -> 0, If-Modified-Since -> 0, If-Unmodified-Since -> 0, default -> 12, If-None-Match -> 0, User-Agent -> 32, Content-MD5 -> 0, Date -> 0, If-Match -> 0),false,false,true,akka.util.ConstantFun$$$Lambda$159/116734858@20111ce8,akka.util.ConstantFun$$$Lambda$159/116734858@20111ce8,akka.util.ConstantFun$$$Lambda$160/1551945522@31228ec9),100 milliseconds,None,Http2ClientSettingsImpl(256,65536,10000000,512000,1024,false,None),TCPTransport),1 second,List()),akka.http.scaladsl.HttpConnectionContext$@4517bff3,akka.event.MarkerLoggingAdapter@58530084))) has completely filled up because the pool currently does not process requests fast enough to handle the incoming request load. Please retry the request later. See http://doc.akka.io/docs/akka-http/current/scala/http/client-side/pool-overflow.html for more information.
[2020-10-21T18:08:50.099] 7 failed: akka.stream.BufferOverflowException: Exceeded configured max-open-requests value of [4]. This means that the request queue of this pool (HostConnectionPoolSetup(localhost,9000,ConnectionPoolSetup(ConnectionPoolSettings(1,0,5,4,1,Duration.Inf,100 milliseconds,2 minutes,30 seconds,ClientConnectionSettings(Some(User-Agent: akka-http/10.2.1),10 seconds,1 minute,512,None,WebSocketSettings(<function0>,ping,Duration.Inf,akka.http.impl.settings.WebSocketSettingsImpl$$$Lambda$179/1680503330@21460c97),List(),ParserSettings(2048,16,64,64,8192,64,Some(9223372036854775807),8388608,256,1048576,Strict,RFC6265,true,Set(),Full,Error,HashMap(If-Range -> 0, If-Modified-Since -> 0, If-Unmodified-Since -> 0, default -> 12, If-None-Match -> 0, User-Agent -> 32, Content-MD5 -> 0, Date -> 0, If-Match -> 0),false,false,true,akka.util.ConstantFun$$$Lambda$159/116734858@20111ce8,akka.util.ConstantFun$$$Lambda$159/116734858@20111ce8,akka.util.ConstantFun$$$Lambda$160/1551945522@31228ec9),100 milliseconds,None,Http2ClientSettingsImpl(256,65536,10000000,512000,1024,false,None),TCPTransport),1 second,List()),akka.http.scaladsl.HttpConnectionContext$@4517bff3,akka.event.MarkerLoggingAdapter@58530084))) has completely filled up because the pool currently does not process requests fast enough to handle the incoming request load. Please retry the request later. See http://doc.akka.io/docs/akka-http/current/scala/http/client-side/pool-overflow.html for more information.
[2020-10-21T18:08:50.099] 5 failed: akka.stream.BufferOverflowException: Exceeded configured max-open-requests value of [4]. This means that the request queue of this pool (HostConnectionPoolSetup(localhost,9000,ConnectionPoolSetup(ConnectionPoolSettings(1,0,5,4,1,Duration.Inf,100 milliseconds,2 minutes,30 seconds,ClientConnectionSettings(Some(User-Agent: akka-http/10.2.1),10 seconds,1 minute,512,None,WebSocketSettings(<function0>,ping,Duration.Inf,akka.http.impl.settings.WebSocketSettingsImpl$$$Lambda$179/1680503330@21460c97),List(),ParserSettings(2048,16,64,64,8192,64,Some(9223372036854775807),8388608,256,1048576,Strict,RFC6265,true,Set(),Full,Error,HashMap(If-Range -> 0, If-Modified-Since -> 0, If-Unmodified-Since -> 0, default -> 12, If-None-Match -> 0, User-Agent -> 32, Content-MD5 -> 0, Date -> 0, If-Match -> 0),false,false,true,akka.util.ConstantFun$$$Lambda$159/116734858@20111ce8,akka.util.ConstantFun$$$Lambda$159/116734858@20111ce8,akka.util.ConstantFun$$$Lambda$160/1551945522@31228ec9),100 milliseconds,None,Http2ClientSettingsImpl(256,65536,10000000,512000,1024,false,None),TCPTransport),1 second,List()),akka.http.scaladsl.HttpConnectionContext$@4517bff3,akka.event.MarkerLoggingAdapter@58530084))) has completely filled up because the pool currently does not process requests fast enough to handle the incoming request load. Please retry the request later. See http://doc.akka.io/docs/akka-http/current/scala/http/client-side/pool-overflow.html for more information.
[2020-10-21T18:08:55.309] 1 succeeded
[2020-10-21T18:09:00.317] 2 succeeded
[2020-10-21T18:09:05.330] 3 succeeded
[2020-10-21T18:09:10.342] 4 succeeded

結果(max-connection=2, max-open-requests=4)

[2020-10-21T18:10:55.453] 5 failed: akka.stream.BufferOverflowException: Exceeded configured max-open-requests value of [4]. This means that the request queue of this pool (HostConnectionPoolSetup(localhost,9000,ConnectionPoolSetup(ConnectionPoolSettings(2,0,5,4,1,Duration.Inf,100 milliseconds,2 minutes,30 seconds,ClientConnectionSettings(Some(User-Agent: akka-http/10.2.1),10 seconds,1 minute,512,None,WebSocketSettings(<function0>,ping,Duration.Inf,akka.http.impl.settings.WebSocketSettingsImpl$$$Lambda$179/1680503330@6cfdcfc2),List(),ParserSettings(2048,16,64,64,8192,64,Some(9223372036854775807),8388608,256,1048576,Strict,RFC6265,true,Set(),Full,Error,HashMap(If-Range -> 0, If-Modified-Since -> 0, If-Unmodified-Since -> 0, default -> 12, If-None-Match -> 0, User-Agent -> 32, Content-MD5 -> 0, Date -> 0, If-Match -> 0),false,false,true,akka.util.ConstantFun$$$Lambda$159/116734858@4debe213,akka.util.ConstantFun$$$Lambda$159/116734858@4debe213,akka.util.ConstantFun$$$Lambda$160/1551945522@4feaa7b3),100 milliseconds,None,Http2ClientSettingsImpl(256,65536,10000000,512000,1024,false,None),TCPTransport),1 second,List()),akka.http.scaladsl.HttpConnectionContext$@7e093aec,akka.event.MarkerLoggingAdapter@1918aa6e))) has completely filled up because the pool currently does not process requests fast enough to handle the incoming request load. Please retry the request later. See http://doc.akka.io/docs/akka-http/current/scala/http/client-side/pool-overflow.html for more information.
[2020-10-21T18:10:55.453] 8 failed: akka.stream.BufferOverflowException: Exceeded configured max-open-requests value of [4]. This means that the request queue of this pool (HostConnectionPoolSetup(localhost,9000,ConnectionPoolSetup(ConnectionPoolSettings(2,0,5,4,1,Duration.Inf,100 milliseconds,2 minutes,30 seconds,ClientConnectionSettings(Some(User-Agent: akka-http/10.2.1),10 seconds,1 minute,512,None,WebSocketSettings(<function0>,ping,Duration.Inf,akka.http.impl.settings.WebSocketSettingsImpl$$$Lambda$179/1680503330@6cfdcfc2),List(),ParserSettings(2048,16,64,64,8192,64,Some(9223372036854775807),8388608,256,1048576,Strict,RFC6265,true,Set(),Full,Error,HashMap(If-Range -> 0, If-Modified-Since -> 0, If-Unmodified-Since -> 0, default -> 12, If-None-Match -> 0, User-Agent -> 32, Content-MD5 -> 0, Date -> 0, If-Match -> 0),false,false,true,akka.util.ConstantFun$$$Lambda$159/116734858@4debe213,akka.util.ConstantFun$$$Lambda$159/116734858@4debe213,akka.util.ConstantFun$$$Lambda$160/1551945522@4feaa7b3),100 milliseconds,None,Http2ClientSettingsImpl(256,65536,10000000,512000,1024,false,None),TCPTransport),1 second,List()),akka.http.scaladsl.HttpConnectionContext$@7e093aec,akka.event.MarkerLoggingAdapter@1918aa6e))) has completely filled up because the pool currently does not process requests fast enough to handle the incoming request load. Please retry the request later. See http://doc.akka.io/docs/akka-http/current/scala/http/client-side/pool-overflow.html for more information.
[2020-10-21T18:10:55.453] 7 failed: akka.stream.BufferOverflowException: Exceeded configured max-open-requests value of [4]. This means that the request queue of this pool (HostConnectionPoolSetup(localhost,9000,ConnectionPoolSetup(ConnectionPoolSettings(2,0,5,4,1,Duration.Inf,100 milliseconds,2 minutes,30 seconds,ClientConnectionSettings(Some(User-Agent: akka-http/10.2.1),10 seconds,1 minute,512,None,WebSocketSettings(<function0>,ping,Duration.Inf,akka.http.impl.settings.WebSocketSettingsImpl$$$Lambda$179/1680503330@6cfdcfc2),List(),ParserSettings(2048,16,64,64,8192,64,Some(9223372036854775807),8388608,256,1048576,Strict,RFC6265,true,Set(),Full,Error,HashMap(If-Range -> 0, If-Modified-Since -> 0, If-Unmodified-Since -> 0, default -> 12, If-None-Match -> 0, User-Agent -> 32, Content-MD5 -> 0, Date -> 0, If-Match -> 0),false,false,true,akka.util.ConstantFun$$$Lambda$159/116734858@4debe213,akka.util.ConstantFun$$$Lambda$159/116734858@4debe213,akka.util.ConstantFun$$$Lambda$160/1551945522@4feaa7b3),100 milliseconds,None,Http2ClientSettingsImpl(256,65536,10000000,512000,1024,false,None),TCPTransport),1 second,List()),akka.http.scaladsl.HttpConnectionContext$@7e093aec,akka.event.MarkerLoggingAdapter@1918aa6e))) has completely filled up because the pool currently does not process requests fast enough to handle the incoming request load. Please retry the request later. See http://doc.akka.io/docs/akka-http/current/scala/http/client-side/pool-overflow.html for more information.
[2020-10-21T18:10:55.453] 6 failed: akka.stream.BufferOverflowException: Exceeded configured max-open-requests value of [4]. This means that the request queue of this pool (HostConnectionPoolSetup(localhost,9000,ConnectionPoolSetup(ConnectionPoolSettings(2,0,5,4,1,Duration.Inf,100 milliseconds,2 minutes,30 seconds,ClientConnectionSettings(Some(User-Agent: akka-http/10.2.1),10 seconds,1 minute,512,None,WebSocketSettings(<function0>,ping,Duration.Inf,akka.http.impl.settings.WebSocketSettingsImpl$$$Lambda$179/1680503330@6cfdcfc2),List(),ParserSettings(2048,16,64,64,8192,64,Some(9223372036854775807),8388608,256,1048576,Strict,RFC6265,true,Set(),Full,Error,HashMap(If-Range -> 0, If-Modified-Since -> 0, If-Unmodified-Since -> 0, default -> 12, If-None-Match -> 0, User-Agent -> 32, Content-MD5 -> 0, Date -> 0, If-Match -> 0),false,false,true,akka.util.ConstantFun$$$Lambda$159/116734858@4debe213,akka.util.ConstantFun$$$Lambda$159/116734858@4debe213,akka.util.ConstantFun$$$Lambda$160/1551945522@4feaa7b3),100 milliseconds,None,Http2ClientSettingsImpl(256,65536,10000000,512000,1024,false,None),TCPTransport),1 second,List()),akka.http.scaladsl.HttpConnectionContext$@7e093aec,akka.event.MarkerLoggingAdapter@1918aa6e))) has completely filled up because the pool currently does not process requests fast enough to handle the incoming request load. Please retry the request later. See http://doc.akka.io/docs/akka-http/current/scala/http/client-side/pool-overflow.html for more information.
[2020-10-21T18:11:00.546] 1 succeeded
[2020-10-21T18:11:00.548] 2 succeeded
[2020-10-21T18:11:05.556] 3 succeeded
[2020-10-21T18:11:05.558] 4 succeeded

結果(max-connection=2, max-open-requests=8)

[2020-10-21T18:12:01.703] 1 succeeded
[2020-10-21T18:12:01.705] 2 succeeded
[2020-10-21T18:12:06.711] 3 succeeded
[2020-10-21T18:12:06.713] 4 succeeded
[2020-10-21T18:12:11.719] 5 succeeded
[2020-10-21T18:12:11.722] 6 succeeded
[2020-10-21T18:12:16.732] 7 succeeded
[2020-10-21T18:12:16.734] 8 succeeded

考察

上記の各結果より以下のことがわかりました。

  • max-connectionの値に関わらず、未処理および処理中のリクエスト数の合計がmax-open-requestsを超えた場合にBufferOverflowExceptionが発生する
  • max-connectionの値に応じて同時に処理されるリクエスト数が増加する

BufferOverflowExceptionのワークアラウンド

BufferOverflowExceptionの発生原因とワークアラウンドが先ほどのページで紹介されています。

以下に一部抜粋します。

  • The server is too slow (improve server performance)
  • The network is too slow (improve network performance)
  • The client issues requests too fast (slow down creation of requests if possible)
  • There’s high latency between client and server (use more concurrent connections to hide latency with parallelism)
  • There are peaks in the request rate (prevent peaks by tuning the client application or increase max-open-requests to buffer short-term peaks)

さて、今回の私のユースケースは以下のような状況です。

  • 単位時間あたりのリクエスト数の増加してからBufferOverflowExceptionが発生するようになった
  • サーバのレイテンシは十分に小さい
  • 同一サーバに繰り返しリクエストしている
  • max-connectionsがデフォルトの4のまま
  • バックオフによるリトライは既に実装済みだがリトライ上限を超えてエラーが発生している

上記より以下を検討しました。

  • サーバ側に余裕があるのであればmax-connectionを増やす
  • ピーク時に備えてmax-open-requestsを増やす

まとめ

akka-http clientではホストコネクションプールに送信されるリクエスト数がmax-open-requestsで定義されるキューのサイズを超えるとBufferOverflowExceptionが発生します。 BufferOverflowExceptionのワークアラウンドはワークロードによって異なるので利用状況に合わせて検討する必要があります。