Nano Hash - криптовалюты, майнинг, программирование

Почему все данные попадают в один раздел после reduceByKey?

У меня есть эта простая искровая программа. Мне интересно, почему все данные попадают в один раздел.

val l = List((30002,30000), (50006,50000), (80006,80000), 
             (4,0), (60012,60000), (70006,70000), 
             (40006,40000), (30012,30000), (30000,30000),
             (60018,60000), (30020,30000), (20010,20000), 
             (20014,20000), (90008,90000), (14,0), (90012,90000),
             (50010,50000), (100008,100000), (80012,80000),
             (20000,20000), (30010,30000), (20012,20000), 
             (90016,90000), (18,0), (12,0), (70016,70000), 
             (20,0), (80020,80000), (100016,100000), (70014,70000),
             (60002,60000), (40000,40000), (60006,60000), 
             (80000,80000), (50008,50000), (60008,60000), 
             (10002,10000), (30014,30000), (70002,70000),
             (40010,40000), (100010,100000), (40002,40000),
             (20004,20000), 
             (10018,10000), (50018,50000), (70004,70000),
             (90004,90000), (100004,100000), (20016,20000))

val l_rdd = sc.parallelize(l, 2)

// print each item and index of the partition it belongs to
l_rdd.mapPartitionsWithIndex((index, iter) => {
   iter.toList.map(x => (index, x)).iterator
}).collect.foreach(println)

// reduce on the second element of the list.
// alternatively you can use aggregateByKey  
val l_reduced = l_rdd.map(x => {
                    (x._2, List(x._1))
                  }).reduceByKey((a, b) => {b ::: a})

// print the reduced results along with its partition index
l_reduced.mapPartitionsWithIndex((index, iter) => {
      iter.toList.map(x => (index, x._1, x._2.size)).iterator
}).collect.foreach(println)

Когда вы запустите это, вы увидите, что данные (l_rdd) распределены по двум разделам. После того, как я уменьшил, результирующий RDD (l_reduced) также имеет два раздела, но все данные находятся в одном разделе (индекс 0), а другой пуст. Это происходит, даже если данные огромны (несколько ГБ). Разве l_reduced не следует также распределять на два раздела.

06.02.2017

Ответы:


1
val l_reduced = l_rdd.map(x => {
                    (x._2, List(x._1))
                  }).reduceByKey((a, b) => {b ::: a})

Ссылаясь на приведенный выше фрагмент, вы разбиваете по второму полю RDD. Все числа во втором поле заканчиваются на 0.

Когда вы вызываете HashPartitioner, номер раздела для записи определяется следующим функция:

  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

А Utils.nonNegativeMod определяется как следует:

 def nonNegativeMod(x: Int, mod: Int): Int = {
    val rawMod = x % mod
    rawMod + (if (rawMod < 0) mod else 0)
  }

Давайте посмотрим, что произойдет, когда мы применим две приведенные выше логики к вашему вводу:

scala> l.map(_._2.hashCode % 2) // numPartitions = 2
res10: List[Int] = List(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)

Таким образом, все ваши записи попадают в раздел 0.

Вы можете решить эту проблему путем переразметки:

val l_reduced = l_rdd.map(x => {
                    (x._2, List(x._1))
                  }).reduceByKey((a, b) => {b ::: a}).repartition(2)

который дает:

(0,100000,4)
(0,10000,2)
(0,0,5)
(0,20000,6)
(0,60000,5)
(0,80000,4)
(1,50000,4)
(1,30000,6)
(1,90000,4)
(1,70000,5)
(1,40000,4)

Кроме того, вы можете создать пользовательский разделитель.

06.02.2017

2

Если вы не укажете иное, разделение будет выполняться на основе хэш-кодов соответствующих ключей с предположением, что хэш-коды приведут к относительно равномерному распределению. В этом случае все ваши хэш-коды четные, и поэтому все они попадут в раздел 0.

Если это действительно репрезентативно для вашего набора данных, существует перегрузка для reduceByKey, которая использует разделитель, а также функцию сокращения. Я бы предложил предоставить альтернативный алгоритм разделения для такого набора данных.

06.02.2017
Новые материалы

Кластеризация: более глубокий взгляд
Кластеризация — это метод обучения без учителя, в котором мы пытаемся найти группы в наборе данных на основе некоторых известных или неизвестных свойств, которые могут существовать. Независимо от..

Как написать эффективное резюме
Предложения по дизайну и макету, чтобы представить себя профессионально Вам не позвонили на собеседование после того, как вы несколько раз подали заявку на работу своей мечты? У вас может..

Частный метод Python: улучшение инкапсуляции и безопасности
Введение Python — универсальный и мощный язык программирования, известный своей простотой и удобством использования. Одной из ключевых особенностей, отличающих Python от других языков, является..

Как я автоматизирую тестирование с помощью Jest
Шутка для победы, когда дело касается автоматизации тестирования Одной очень важной частью разработки программного обеспечения является автоматизация тестирования, поскольку она создает..

Работа с векторными символическими архитектурами, часть 4 (искусственный интеллект)
Hyperseed: неконтролируемое обучение с векторными символическими архитектурами (arXiv) Автор: Евгений Осипов , Сачин Кахавала , Диланта Хапутантри , Тимал Кемпития , Дасвин Де Сильва ,..

Понимание расстояния Вассерштейна: мощная метрика в машинном обучении
В обширной области машинного обучения часто возникает необходимость сравнивать и измерять различия между распределениями вероятностей. Традиционные метрики расстояния, такие как евклидово..

Обеспечение масштабируемости LLM: облачный анализ с помощью AWS Fargate и Copilot
В динамичной области искусственного интеллекта все большее распространение получают модели больших языков (LLM). Они жизненно важны для различных приложений, таких как интеллектуальные..