Nano Hash - криптовалюты, майнинг, программирование

Как создать коллекцию RDD из RDD?

У меня RDD[String], wordRDD. У меня также есть функция, которая создает RDD[String] из строки/слова. Я хотел бы создать новый RDD для каждой строки в wordRDD. Вот мои попытки:

1) Ошибка, поскольку Spark не поддерживает вложенные RDD:

var newRDD = wordRDD.map( word => {
  // execute myFunction()
  (new MyClass(word)).myFunction()
})

2) Ошибка (возможно, из-за проблемы с областью действия?):

var newRDD = sc.parallelize(new Array[String](0))
val wordArray = wordRDD.collect
for (w <- wordArray){
  newRDD = sc.union(newRDD,(new MyClass(w)).myFunction())
}

Мой идеальный результат будет выглядеть так:

// input RDD (wordRDD)
wordRDD: org.apache.spark.rdd.RDD[String] = ('apple','banana','orange'...)

// myFunction behavior
new MyClass('apple').myFunction(): RDD[String] = ('pple','aple'...'appl')

// after executing myFunction() on each word in wordRDD:
newRDD: RDD[String] = ('pple','aple',...,'anana','bnana','baana',...)

Я нашел здесь соответствующий вопрос: Spark when union многие RDD выдают ошибку переполнения стека, но это не решило мою проблему.

10.09.2015

Ответы:


1

Используйте flatMap, чтобы получить RDD[String] по своему желанию.

var allWords = wordRDD.flatMap { word => 
  (new MyClass(word)).myFunction().collect()
}
10.09.2015
  • Как это должно работать параллельно? Все, что происходит внутри wordRDD.map, выполняется в кластере. Таким образом, внутренний collect должен инициировать новое задание Spark из работающего задания. Я подозреваю, что он не будет работать распределенно. 11.09.2015
  • Он также мог бы изменить функцию, чтобы она возвращала массивы вместо RDD, но в вопросе не была указана фактическая функция. 11.09.2015
  • Но в его описании сказано, что у него есть функция, я предполагаю, что это myFunction, которая создает RDD[String] из строки/слова. 13.09.2015
  • Да, это так. Ваш ответ говорит ему изменить myFunction, чтобы вернуть что-то другое. Не зная, насколько сложна функция, трудно сказать, являются ли вычисления, выполняемые в ней, распределенными или нет. Если сбор набора данных означает, что все предыдущие вычисления больше не распределяются, то ничего не будет распространяться. 15.09.2015
  • Вы когда-нибудь пробовали запускать свой код? В принципе невозможно вызвать collect из другого RDD. Как вы представляете, как это должно быть реализовано? Метод collect запустит выполнение задания и вернет результаты в виде Seq вашему узлу драйвера. Но операция flatMap является частью вашей работы, которая выполняется распределенно. Как тогда должен выполняться метод collect? Более того, SparkContext, который необходим для создания нового RDDs, просто не сериализуем. Таким образом, невозможно отправить его с вашим UDF. 15.09.2015
  • Я не знаю, что вы подразумеваете под отправкой, но я запустил код, который я изначально опубликовал. Это сработало. @JacekLaskowski сделал мой код более компактным, но я предполагаю, что он все еще работает. 15.09.2015

  • 2

    Вы не можете создать RDD внутри другого RDD.

    Однако можно переписать вашу функцию myFunction: String => RDD[String], которая генерирует все слова из ввода, где одна буква удалена, в другую функцию modifiedFunction: String => Seq[String], чтобы ее можно было использовать из СДР. Таким образом, он также будет выполняться параллельно в вашем кластере. Имея modifiedFunction, вы можете получить окончательную RDD со всеми словами, просто вызвав wordRDD.flatMap(modifiedFunction).

    Важным моментом является использование flatMap (для map и flatten преобразований):

    def main(args: Array[String]) {
      val sparkConf = new SparkConf().setAppName("Test").setMaster("local[*]")
      val sc = new SparkContext(sparkConf)
    
      val input = sc.parallelize(Seq("apple", "ananas", "banana"))
    
      // RDD("pple", "aple", ..., "nanas", ..., "anana", "bnana", ...)
      val result = input.flatMap(modifiedFunction) 
    }
    
    def modifiedFunction(word: String): Seq[String] = {
      word.indices map {
        index => word.substring(0, index) + word.substring(index+1)
      }
    }
    
    10.09.2015
    Новые материалы

    Кластеризация: более глубокий взгляд
    Кластеризация — это метод обучения без учителя, в котором мы пытаемся найти группы в наборе данных на основе некоторых известных или неизвестных свойств, которые могут существовать. Независимо от..

    Как написать эффективное резюме
    Предложения по дизайну и макету, чтобы представить себя профессионально Вам не позвонили на собеседование после того, как вы несколько раз подали заявку на работу своей мечты? У вас может..

    Частный метод Python: улучшение инкапсуляции и безопасности
    Введение Python — универсальный и мощный язык программирования, известный своей простотой и удобством использования. Одной из ключевых особенностей, отличающих Python от других языков, является..

    Как я автоматизирую тестирование с помощью Jest
    Шутка для победы, когда дело касается автоматизации тестирования Одной очень важной частью разработки программного обеспечения является автоматизация тестирования, поскольку она создает..

    Работа с векторными символическими архитектурами, часть 4 (искусственный интеллект)
    Hyperseed: неконтролируемое обучение с векторными символическими архитектурами (arXiv) Автор: Евгений Осипов , Сачин Кахавала , Диланта Хапутантри , Тимал Кемпития , Дасвин Де Сильва ,..

    Понимание расстояния Вассерштейна: мощная метрика в машинном обучении
    В обширной области машинного обучения часто возникает необходимость сравнивать и измерять различия между распределениями вероятностей. Традиционные метрики расстояния, такие как евклидово..

    Обеспечение масштабируемости LLM: облачный анализ с помощью AWS Fargate и Copilot
    В динамичной области искусственного интеллекта все большее распространение получают модели больших языков (LLM). Они жизненно важны для различных приложений, таких как интеллектуальные..