object TestSource {
implicit val ec = ExecutionContext.global
def main(args: Array[String]): Unit = {
def buildSource = {
println("fresh")
Source(List(() => 1,() => 2,() => 3,() => {
println("crash")
throw new RuntimeException(":(((")
}))
}
val restarting = RestartSource.onFailuresWithBackoff(
minBackoff = Duration(1, SECONDS) ,
maxBackoff = Duration(1, SECONDS),
randomFactor = 0.0,
maxRestarts = 10
)(() => {
buildSource
})
implicit val actorSystem: ActorSystem = ActorSystem()
implicit val executionContext: ExecutionContext = actorSystem.dispatcher
restarting.runWith(Sink.foreach(e => println(e())))
}
}
Приведенный выше код выводит: 1,2,3, сбой Почему мой источник не перезагружается? Это в значительной степени копия официальной документации 1:1.
редактировать:
я тоже пробовал
val rs = RestartSink.withBackoff[() => Int](
Duration(1, SECONDS),
Duration(1, SECONDS),
0.0,
10
)(_)
val rsDone = rs(() => {
println("???")
Sink.foreach(e => println(e()))
})
restarting.runWith(rsDone)
но все еще не получить перезагрузки