Nano Hash - криптовалюты, майнинг, программирование

Akka RestartSource не перезагружается

object TestSource {
  implicit val ec = ExecutionContext.global

  def main(args: Array[String]): Unit = {
    def buildSource = {
      println("fresh")
      Source(List(() => 1,() => 2,() => 3,() => {
        println("crash")
      throw new RuntimeException(":(((")
      }))
    }
    val restarting = RestartSource.onFailuresWithBackoff(
      minBackoff = Duration(1, SECONDS) ,
      maxBackoff = Duration(1, SECONDS),
      randomFactor = 0.0,
      maxRestarts = 10
    )(() => {
      buildSource
    })

     implicit val actorSystem: ActorSystem           = ActorSystem()
     implicit val executionContext: ExecutionContext = actorSystem.dispatcher

    restarting.runWith(Sink.foreach(e => println(e())))

  }
}

Приведенный выше код выводит: 1,2,3, сбой Почему мой источник не перезагружается? Это в значительной степени копия официальной документации 1:1.

редактировать:

я тоже пробовал

    val rs = RestartSink.withBackoff[() => Int](
      Duration(1, SECONDS),
      Duration(1, SECONDS),
      0.0,
      10
    )(_)
    val rsDone = rs(() => {
      println("???")
      Sink.foreach(e => println(e()))
    })
    restarting.runWith(rsDone)

но все еще не получить перезагрузки

30.11.2020

Ответы:


1

Это связано с тем, что исключение запускается за пределами buildSource Source в Sink.foreach, когда вы вызываете функции, испускаемые из Source.

Попробуй это:

    val restarting = RestartSource.onFailuresWithBackoff(
      minBackoff = Duration(1, SECONDS) ,
      maxBackoff = Duration(1, SECONDS),
      randomFactor = 0.0,
      maxRestarts = 10
      )(() => {
        buildSource
         .map(e => e()) //call the functions inside the RestartSource
      })

Таким образом, ваше исключение произойдет внутри внутреннего Source, обернутого RestartSource, и сработает механизм перезапуска.

30.11.2020

2

Источник не перезагружается, потому что ваш источник никогда не выходит из строя, поэтому никогда не нуждается в перезапуске.

Исключение возникает, когда Sink.foreach оценивает полученную функцию.

Как заметил Артур, если вы можете переместить неисправный бит в источник, вы можете обернуть все до приемника в RestartSource.

Хотя это не поможет в этом надуманном примере (поскольку перезапуск приемника не приводит к повторной отправке ранее отправленных сообщений), обертывание приемника в RestartSink может быть полезно в реальных случаях, когда такое может произойти (вне сети). на мой взгляд, потоки из Kafka взрываются, потому что коммит смещения в приемнике не удался (например, после перебалансировки) должен быть примером такого случая).

В качестве альтернативы, если вы хотите перезапустить весь поток в случае сбоя какой-либо части, и поток материализуется как Future, вы можете реализовать повторную попытку с отсрочкой для неудачного будущего.

30.11.2020
  • добавил этот случай к вопросу 30.11.2020
  • Что происходит, когда вы добавляете элемент в источник после элемента сбоя? В противном случае вполне может случиться так, что, поскольку источник завершил работу (он излучает все, что может излучать), нет необходимости перезапускаться. 30.11.2020
  • Из документов для RestartSource: обернутый приемник может... быть завершен путем подачи завершения в этот приемник... [который] завершится и не будет перезапущен. Это может быть вызвано просто завершением восходящего потока. 30.11.2020

  • 3

    Исходник просто никогда не вылетает, как тут уже говорили. Вы на самом деле крашитесь, вы сток, а не источник с этим утверждением e => e()

    это происходит при применении лямбды выше к последнему элементу источника:

    java.lang.RuntimeException: :(((
    

    Вот тот же поток без необработанного исключения в приемнике:

    ... RestartSource.withBackoff( ...

    restarting.runWith(
      Sink.foreach(e => {
        def i: Int = try{ e() } catch {
          case t: Throwable =>
            println(t)
            -1
        }
        println(i)
      })
    )
    

    Работает отлично.

    30.11.2020
    Новые материалы

    Кластеризация: более глубокий взгляд
    Кластеризация — это метод обучения без учителя, в котором мы пытаемся найти группы в наборе данных на основе некоторых известных или неизвестных свойств, которые могут существовать. Независимо от..

    Как написать эффективное резюме
    Предложения по дизайну и макету, чтобы представить себя профессионально Вам не позвонили на собеседование после того, как вы несколько раз подали заявку на работу своей мечты? У вас может..

    Частный метод Python: улучшение инкапсуляции и безопасности
    Введение Python — универсальный и мощный язык программирования, известный своей простотой и удобством использования. Одной из ключевых особенностей, отличающих Python от других языков, является..

    Как я автоматизирую тестирование с помощью Jest
    Шутка для победы, когда дело касается автоматизации тестирования Одной очень важной частью разработки программного обеспечения является автоматизация тестирования, поскольку она создает..

    Работа с векторными символическими архитектурами, часть 4 (искусственный интеллект)
    Hyperseed: неконтролируемое обучение с векторными символическими архитектурами (arXiv) Автор: Евгений Осипов , Сачин Кахавала , Диланта Хапутантри , Тимал Кемпития , Дасвин Де Сильва ,..

    Понимание расстояния Вассерштейна: мощная метрика в машинном обучении
    В обширной области машинного обучения часто возникает необходимость сравнивать и измерять различия между распределениями вероятностей. Традиционные метрики расстояния, такие как евклидово..

    Обеспечение масштабируемости LLM: облачный анализ с помощью AWS Fargate и Copilot
    В динамичной области искусственного интеллекта все большее распространение получают модели больших языков (LLM). Они жизненно важны для различных приложений, таких как интеллектуальные..