Nano Hash - криптовалюты, майнинг, программирование

Идиоматический способ превратить источник Akka в Spark InputDStream

По сути, я пытаюсь сделать противоположное тому, что задают в этот вопрос; то есть используйте Source[A] для вставки элементов в InputDStream[A].

До сих пор мне удавалось собрать воедино реализацию, использующую актор Feeder и актор Receiver, похожий на ActorWordCount example, но это кажется немного сложным, поэтому мне любопытно, есть ли более простой способ.


Ответы:


1

РЕДАКТИРОВАТЬ: самопринятие через 5 дней, так как хороших ответов не было.

Я извлек реализацию на основе Актера в библиотеку, Sparkka-streams, и она работает. для меня пока. Когда появится лучшее решение этого вопроса, я либо обновлю, либо откажусь от библиотеки.

Его использование заключается в следующем:

// InputDStream can then be used to build elements of the graph that require integration with Spark
val (inputDStream, feedDInput) = Streaming.connection[Int]()
val source = Source.fromGraph(GraphDSL.create() { implicit builder =>

  import GraphDSL.Implicits._

  val source = Source(1 to 10)

  val bCast = builder.add(Broadcast[Int](2))
  val merge = builder.add(Merge[Int](2))

  val add1 = Flow[Int].map(_ + 1)
  val times3 = Flow[Int].map(_ * 3)
  source ~> bCast ~> add1 ~> merge
            bCast ~> times3 ~> feedDInput ~> merge

  SourceShape(merge.out)
})

val reducedFlow = source.runWith(Sink.fold(0)(_ + _))
whenReady(reducedFlow)(_ shouldBe 230)

val sharedVar = ssc.sparkContext.accumulator(0)
inputDStream.foreachRDD { rdd =>
  rdd.foreach { i =>
    sharedVar += i
  }
}
ssc.start()
eventually(sharedVar.value shouldBe 165)
28.02.2016
  • появилось что-нибудь? Ваша библиотека давно не обновлялась... спасибо! 27.01.2017
  • нет, ничего из ряда вон выходящего :) Я написал это для пары небольших проектов, которыми занимался в то время, но трогать больше не нужно. Если вы заинтересованы, пожалуйста, не стесняйтесь отправить PR! 27.01.2017

  • 2

    Ссылка: http://spark.apache.org/docs/latest/streaming-custom-receivers.html

    Вы можете сделать это так:

    class StreamStopped extends RuntimeException("Stream stopped")
    
    // Serializable factory class
    case class SourceFactory(start: Int, end: Int) {
      def source = Source(start to end).map(_.toString)
    }
    
    class CustomReceiver(sourceFactory: SourceFactory)
      extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
    
      implicit val materializer = ....
    
      def onStart() {
        sourceFactory.source.runForEach { e =>
          if (isStopped) {
            // Stop the source
            throw new StreamStopped
          } else {
            store(e)
          }
        } onFailure {
          case _: StreamStopped => // ignore
          case ex: Throwable => reportError("Source exception", ex)
        }
      }
    
      def onStop() {}
    }
    
    val customReceiverStream = ssc.receiverStream(new CustomReceiver(SourceFactory(1,100))
    
    28.02.2016
  • Я пробовал что-то подобное, прежде чем использовать вышеупомянутый вариант Актера, но во время выполнения Spark пытается сериализовать Source и умирает. 28.02.2016
  • Извините, я не эксперт по Spark и не заметил, что Receiver должен быть сериализуемым. В этом случае одним из вариантов может быть вместо передачи исходного экземпляра передача информации для создания источника (в форме фабричного объекта) и создание источника в Receiver#onStart. Смотрите мой обновленный пример. 28.02.2016
  • Достаточно честно, это может быть одним из способов сделать это в случае, когда допустимо запускать поток из приемника, но я специально ищу использование существующего Source[A], а не создание источника из приемника. Если бы мы могли использовать существующий Source, мы могли бы легко обобщить его на любой существующий поток Akka (или преобразование потока), что приятно. 28.02.2016
  • В akka-stream Source[A] или Graph в целом — это просто план потока выполнения. Если вы намерены поделиться «чертежом», то независимо от того, передаете ли вы Source[A] в качестве параметра или создаете его в Receiver, мало что зависит от времени выполнения, потому что в обоих случаях материализация происходит внутри Receiver. Однако, если вы хотите передать поток данных materialized в dstream, тогда ваш оригинальный подход к актерам Feeder/Receiver действительно имеет смысл. 28.02.2016
  • Если у меня есть Source[A], который мне нужно разделить, транслировать или иным образом повторно использовать в других потоках, а физический ресурс, который передает вещи в источник, ограничен только одним соединением за раз (например, веб-камера, в зависимости от драйвера), я думаю, что разница между созданием и запуском потока из приемника и передачей существующего Flow/Graph/Source довольно велика. Кроме того, с точки зрения эффективности, даже без этого ограничения, возможность повторного использования существующего определенного источника лучше. 28.02.2016
  • Позвольте мне уточнить: скажем, если у вас есть Source[Photo], который подключается к какой-либо веб-камере, вы можете закодировать свой приемник следующим образом: Case A1: class CustomReceiver(src: Source[Photo]); Случай A2: class CustomReceiver(factory: WebcamSourceFactory); Случай B: class CustomReceiver(feeder: ActorRef); Как в Случае A1, так и в Случае A2 материализация источника, т. е. подключение к веб-камере, происходит внутри Receiver при вызове runForeach, поэтому во время выполнения они не имеют большого значения. .. 28.02.2016
  • Учитывая сценарий вашей веб-камеры, Случай B может быть более подходящим и необходимым, т. е. материализовать и подключить Source[Photo] только один раз к одному актеру Feeder и поделиться экземпляром актера, то есть вашим исходным подходом. 28.02.2016
  • Ваш Case A1 на самом деле не будет работать, потому что вы держите ссылку на несериализуемый Source. Случай A2 будет использовать множество аргументов для материализации неразделяемого источника внутри получателя (в таком случае зачем вообще использовать Source для начала?). Сравнивать A1 и A2 бессмысленно, потому что один не будет работать, а другой будет работать, но будет расточительным. Случай B — это то, что у меня уже есть, и, задавая этот вопрос, я пытаюсь выяснить, является ли это 1. лучшим способом 2. единственным способом. 28.02.2016
  • Ну, не все Source одинаковы, и некоторые из них поддерживают свои собственные общие ресурсы внутри и, следовательно, не являются «расточительными» для материализации несколько раз, например. Http().cachedHostConnectionPool(...) из http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.3/scala/http/client-side/host-level.html. В таком случае на самом деле лучше просто позволить Spark материализовать Source в Receiver и избежать сложностей при совместном использовании материализованного потока (которые в первую очередь вызывают у вас вопрос). 28.02.2016
  • Если оставить в стороне тот факт, что ваш пример кода демонстрирует создание еще одного нового Source, ваш текущий сценарий зависит от наличия потока, который кэшируется каким-то синглтоном с отслеживанием состояния, который не всегда доступен, и если вы должны были реализовать себя, он сам по себе является источник сложности. Мало того, если Source уже существует в приложении драйвера (весьма вероятно), он все равно создает бесполезный ресурс, поскольку выделяет источник на узле Spark. Кроме того, принудительное размещение источника в приемнике сильно ограничивает возможность компоновки данного графа. Короче, далеко не идеал. 28.02.2016
  • @lloydmeta, выделение локального ресурса (например, пула ресурсов в случае Http().cachedHostConnectionPool()) на узле Spark и его совместное использование сотнями/тысячами заданий, выполняющихся на узле, не является пустой тратой времени. В этом случае это действительно хорошо для производительности и масштабируемости (чтобы задания использовали локальный пул на узле). В конце концов, Spark — это кластер/производительность/параллельная обработка. Без обид, я просто хочу сказать, что серебряной пули не существует. Я не думаю, что есть один идеальный способ, и все зависит от вашего варианта использования. 29.02.2016
  • Не нужно убеждать меня, что серебряной пули не существует; Я тот, кто критикует ваши. Он полностью зависит от возможности создавать новые источники из узлов Spark, что имеет огромные недостатки с точки зрения возможности (веб-камера, веб-сокеты...) и информационной безопасности. Кроме того, ваше решение не будет расточительным только в том случае, если у вас нет существующего Source в вашем драйвере, ваше выделение источника кэшируется чем-то, и все ваши задания Spark выполняются в одном и том же исполнителе. +node (неоптимальное использование кластера). Это «если на если»: я надеюсь найти более общее решение, отвечающее на вопрос SO, используя существующий источник 29.02.2016
  • Новые материалы

    Кластеризация: более глубокий взгляд
    Кластеризация — это метод обучения без учителя, в котором мы пытаемся найти группы в наборе данных на основе некоторых известных или неизвестных свойств, которые могут существовать. Независимо от..

    Как написать эффективное резюме
    Предложения по дизайну и макету, чтобы представить себя профессионально Вам не позвонили на собеседование после того, как вы несколько раз подали заявку на работу своей мечты? У вас может..

    Частный метод Python: улучшение инкапсуляции и безопасности
    Введение Python — универсальный и мощный язык программирования, известный своей простотой и удобством использования. Одной из ключевых особенностей, отличающих Python от других языков, является..

    Как я автоматизирую тестирование с помощью Jest
    Шутка для победы, когда дело касается автоматизации тестирования Одной очень важной частью разработки программного обеспечения является автоматизация тестирования, поскольку она создает..

    Работа с векторными символическими архитектурами, часть 4 (искусственный интеллект)
    Hyperseed: неконтролируемое обучение с векторными символическими архитектурами (arXiv) Автор: Евгений Осипов , Сачин Кахавала , Диланта Хапутантри , Тимал Кемпития , Дасвин Де Сильва ,..

    Понимание расстояния Вассерштейна: мощная метрика в машинном обучении
    В обширной области машинного обучения часто возникает необходимость сравнивать и измерять различия между распределениями вероятностей. Традиционные метрики расстояния, такие как евклидово..

    Обеспечение масштабируемости LLM: облачный анализ с помощью AWS Fargate и Copilot
    В динамичной области искусственного интеллекта все большее распространение получают модели больших языков (LLM). Они жизненно важны для различных приложений, таких как интеллектуальные..