Nano Hash - криптовалюты, майнинг, программирование

Как я могу объяснить график происхождения Apache Spark RDD?

У меня есть несколько вопросов с этим кодом ниже:

val input1 = rawinput.map(_.split("\t")).map(x=>(x(6).trim(),x)).sortByKey()
val input2 = input1.map(x=> x._2.mkString("\t"))
val x0 = input2.map(_.split("\t")).map(x => (x(6),x(0))
val x1 = input2.map(_.split("\t")).map(x => (x(6),x(1))
val x2 = input2.map(_.split("\t")).map(x => (x(6),x(2))
val x3 = input2.map(_.split("\t")).map(x => (x(6),x(3))
val x4 = input2.map(_.split("\t")).map(x => (x(6),x(4))
val x5 = input2.map(_.split("\t")).map(x => (x(6),x(5))
val x6 = input2.map(_.split("\t")).map(x => (x(6),x(6))
val x = x0 union x1 union x2 union  x3 union x4 union x5 union x6


<pre>
**Lineage Graph:**
(7) UnionRDD[25] at union at rddCustUtil.scala:78 []
|  UnionRDD[24] at union at rddCustUtil.scala:78 []
|  UnionRDD[23] at union at rddCustUtil.scala:78 []
|  UnionRDD[22] at union at rddCustUtil.scala:78 []
|  UnionRDD[21] at union at rddCustUtil.scala:78 []
|  UnionRDD[20] at union at rddCustUtil.scala:78 []
|  MapPartitionsRDD[7] at map at rddCustUtil.scala:43 []
|  MapPartitionsRDD[6] at map at rddCustUtil.scala:43 []
|  MapPartitionsRDD[5] at map at rddCustUtil.scala:40 []
|  ShuffledRDD[4] at sortByKey at rddCustUtil.scala:38 []
+-(1) MapPartitionsRDD[3] at map at rddCustUtil.scala:38 []
    |  MapPartitionsRDD[2] at map at rddCustUtil.scala:38 []
    |  /Data/ MapPartitionsRDD[1] at textFile at rddCustUtil.scala:35 []
    |  /Data/ HadoopRDD[0] at textFile at rddCustUtil.scala:35 []
|  MapPartitionsRDD[9] at map at rddCustUtil.scala:48 []
|  MapPartitionsRDD[8] at map at rddCustUtil.scala:48 []
|  MapPartitionsRDD[5] at map at rddCustUtil.scala:40 []
|  ShuffledRDD[4] at sortByKey at rddCustUtil.scala:38 []
+-(1) MapPartitionsRDD[3] at map at rddCustUtil.scala:38 []
    |  MapPartitionsRDD[2] at map at rddCustUtil.scala:38 []
    |  /Data/ MapPartitionsRDD[1] at textFile at rddCustUtil.scala:35 []
    |  /Data/ HadoopRDD[0] at textFile at rddCustUtil.scala:35 []
|  MapPartitionsRDD[11] at map at rddCustUtil.scala:53 []
|  MapPartitionsRDD[10] at map at rddCustUtil.scala:53 []
|  MapPartitionsRDD[5] at map at rddCustUtil.scala:40 []
|  ShuffledRDD[4] at sortByKey at rddCustUtil.scala:38 []
+-(1) MapPartitionsRDD[3] at map at rddCustUtil.scala:38 []
    |  MapPartitionsRDD[2] at map at rddCustUtil.scala:38 []
    |  /Data/ MapPartitionsRDD[1] at textFile at rddCustUtil.scala:35 []
    |  /Data/ HadoopRDD[0] at textFile at rddCustUtil.scala:35 []
|  MapPartitionsRDD[13] at map at rddCustUtil.scala:58 []
|  MapPartitionsRDD[12] at map at rddCustUtil.scala:58 []
|  MapPartitionsRDD[5] at map at rddCustUtil.scala:40 []
|  ShuffledRDD[4] at sortByKey at rddCustUtil.scala:38 []
+-(1) MapPartitionsRDD[3] at map at rddCustUtil.scala:38 []
    |  MapPartitionsRDD[2] at map at rddCustUtil.scala:38 []
    |  /Data/ MapPartitionsRDD[1] at textFile at rddCustUtil.scala:35 []
    |  /Data/ HadoopRDD[0] at textFile at rddCustUtil.scala:35 []
|  MapPartitionsRDD[15] at map at rddCustUtil.scala:63 []
|  MapPartitionsRDD[14] at map at rddCustUtil.scala:63 []
|  MapPartitionsRDD[5] at map at rddCustUtil.scala:40 []
|  ShuffledRDD[4] at sortByKey at rddCustUtil.scala:38 []
+-(1) MapPartitionsRDD[3] at map at rddCustUtil.scala:38 []
    |  MapPartitionsRDD[2] at map at rddCustUtil.scala:38 []
    |  /Data/ MapPartitionsRDD[1] at textFile at rddCustUtil.scala:35 []
    |  /Data/ HadoopRDD[0] at textFile at rddCustUtil.scala:35 []
|  MapPartitionsRDD[17] at map at rddCustUtil.scala:68 []
|  MapPartitionsRDD[16] at map at rddCustUtil.scala:68 []
|  MapPartitionsRDD[5] at map at rddCustUtil.scala:40 []
|  ShuffledRDD[4] at sortByKey at rddCustUtil.scala:38 []
+-(1) MapPartitionsRDD[3] at map at rddCustUtil.scala:38 []
    |  MapPartitionsRDD[2] at map at rddCustUtil.scala:38 []
    |  /Data/ MapPartitionsRDD[1] at textFile at rddCustUtil.scala:35 []
    |  /Data/ HadoopRDD[0] at textFile at rddCustUtil.scala:35 []
|  MapPartitionsRDD[19] at map at rddCustUtil.scala:73 []
|  MapPartitionsRDD[18] at map at rddCustUtil.scala:73 []
|  MapPartitionsRDD[5] at map at rddCustUtil.scala:40 []
|  ShuffledRDD[4] at sortByKey at rddCustUtil.scala:38 []
+-(1) MapPartitionsRDD[3] at map at rddCustUtil.scala:38 []
    |  MapPartitionsRDD[2] at map at rddCustUtil.scala:38 []
    |  /Data/ MapPartitionsRDD[1] at textFile at rddCustUtil.scala:35 []
    |  /Data/ HadoopRDD[0] at textFile at rddCustUtil.scala:35 []
</pre>
  • Не могли бы вы объяснить мне, сколько этапов перетасовки будет выполнено, поскольку он показывает 7 ShuffledRDD[4]?
  • Не могли бы вы дать мне подробное объяснение ниже потока DAG?
  • Эта операция дорогая?

Ответы:


1

сколько этапов перетасовки будет выполнено

Действительно, перетасовка, необходимая для сортировки ваших данных, происходит 7 раз, потому что оценка Spark ленива и выполняется по запросу, и если кэшировать, она будет пересчитываться для каждой ветки в DAG, которая требует этого. Чтобы решить эту проблему (и, возможно, сделать этот расчет намного быстрее), вы можете кэшировать (или, в более общем случае, сохранять) input2, прежде чем использовать его несколько раз:

val input1 = rawinput.map(_.split("\t")).map(x=>(x(6).trim(),x)).sortByKey()
val input2 = input1.map(x=> x._2.mkString("\t")).cache()
// continue as before

Не могли бы вы дать мне подробное объяснение ниже потока DAG

Каждый из ваших x_ RDD рассчитывается "отдельно" с использованием следующего расчета:

+-(1) MapPartitionsRDD[3] at map at rddCustUtil.scala:38 []
    |  MapPartitionsRDD[2] at map at rddCustUtil.scala:38 []
    |  /Data/ MapPartitionsRDD[1] at textFile at rddCustUtil.scala:35 []
    |  /Data/ HadoopRDD[0] at textFile at rddCustUtil.scala:35 []
|  MapPartitionsRDD[9] at map at rddCustUtil.scala:48 []
|  MapPartitionsRDD[8] at map at rddCustUtil.scala:48 []
|  MapPartitionsRDD[5] at map at rddCustUtil.scala:40 []
|  ShuffledRDD[4] at sortByKey at rddCustUtil.scala:38 []

Который показывает вычисление, которое создало rawinput из textFile, а затем сортировку и три операции map.

Затем у вас есть 6 операций объединения, объединяющих эти 7 СДР.

Эта операция дорогая?

ДА, похоже, что да. Как было предложено выше, кэширование может сделать это намного быстрее, но есть лучший способ добиться этого - не разбивать RDD на множество отдельных:

val x = rawinput.map(_.split("\t"))
  .keyBy(_(6).trim()) // extract key
  .flatMap{ case (k, arr) => arr.take(7).zipWithIndex.map((k, _)) } // flatMap into (key, (value, index))
  .sortBy { case (k, (_, index)) => (index, k) } // sort by index first, key second
  .map    { case (k, (value, _)) => (k, value) } // remove index, it was just used for sorting

Это выполнит одну операцию перемешивания и не потребует сохранения данных. DAG будет выглядеть так:

(4) MapPartitionsRDD[9] at map at Test.scala:75 []
 |  MapPartitionsRDD[8] at sortBy at Test.scala:74 []
 |  ShuffledRDD[7] at sortBy at Test.scala:74 []
 +-(4) MapPartitionsRDD[4] at sortBy at Test.scala:74 []
    |  MapPartitionsRDD[3] at flatMap at Test.scala:73 []
    |  MapPartitionsRDD[2] at keyBy at Test.scala:72 []
    |  MapPartitionsRDD[1] at map at Test.scala:71 []
    |  ParallelCollectionRDD[0] at parallelize at Test.scala:64 []
03.01.2017
  • Очень признателен Цах, но в случае, если я использую кеш или метод сохранения, сколько потребуется перетасовки для сортировки данных? 03.01.2017
  • Один, я полагаю. Распечатка может не отражать тот факт, что этап тасования будет пропущен благодаря кэшированию. 03.01.2017
  • Привет, Цах, у меня есть один вопрос по следующему посту: 04.01.2017
  • .flatMap{ case (k, arr) => arr.take(7).zipWithIndex.map((k, _)) }. Вы использовали arr.take (7), предположим, у меня есть список индексов, например List (1, 3, 4, 5, 6), тогда как я могу выбрать только выбранный индекс списка. 04.01.2017
  • См. раздел stackoverflow.com/questions/25630475/< /а> 04.01.2017
  • Но Цах, в этом посте порядковый номер индекса изменен. Предположим, я передаю индекс в этом формате 3,2,6,2,8, он должен поддерживать состояние. Он всегда отдает мне приказ о присоединении. 05.01.2017
  • Я не понимаю вопроса, возможно, он должен быть в отдельном посте. Разве .flatMap{ case (k, arr) => arr.zipWithIndex.filter(t => colIndices.contains(t._2)).map((k, _)) } не сработает, предполагая colIndices = List(3,2,6,2,8)? 05.01.2017
  • Спасибо @Tzach, не могли бы вы помочь мне с другим решением: предположим, у меня есть RDD 'Array[(String, (String, String))] = Array((1001,(764839,id,q)), (1001,( Souvik,fnm,p)), (1001,(Das,lnm,p)), (1002,(764840,pid,q)), (1002,(Palash,fnm,p)), (1002,(Mandal, lnm,p)))' Формат такой: '(ROW_KEY, (VALUE, COLUMN_QUALIFIER, COLUMN_FAMILY))', но мне нужен этот формат '[ROW_KEY, Map[COLUMN_FAMILY, Array [(COLUMN_QUALIFIER, (VALUE, TIMESTAMP))] ])]' Не могли бы вы помочь мне получить это? 06.01.2017
  • Комментарии не для этого - если у вас есть другой вопрос, задайте новый вопрос. 06.01.2017
  • Новые материалы

    Кластеризация: более глубокий взгляд
    Кластеризация — это метод обучения без учителя, в котором мы пытаемся найти группы в наборе данных на основе некоторых известных или неизвестных свойств, которые могут существовать. Независимо от..

    Как написать эффективное резюме
    Предложения по дизайну и макету, чтобы представить себя профессионально Вам не позвонили на собеседование после того, как вы несколько раз подали заявку на работу своей мечты? У вас может..

    Частный метод Python: улучшение инкапсуляции и безопасности
    Введение Python — универсальный и мощный язык программирования, известный своей простотой и удобством использования. Одной из ключевых особенностей, отличающих Python от других языков, является..

    Как я автоматизирую тестирование с помощью Jest
    Шутка для победы, когда дело касается автоматизации тестирования Одной очень важной частью разработки программного обеспечения является автоматизация тестирования, поскольку она создает..

    Работа с векторными символическими архитектурами, часть 4 (искусственный интеллект)
    Hyperseed: неконтролируемое обучение с векторными символическими архитектурами (arXiv) Автор: Евгений Осипов , Сачин Кахавала , Диланта Хапутантри , Тимал Кемпития , Дасвин Де Сильва ,..

    Понимание расстояния Вассерштейна: мощная метрика в машинном обучении
    В обширной области машинного обучения часто возникает необходимость сравнивать и измерять различия между распределениями вероятностей. Традиционные метрики расстояния, такие как евклидово..

    Обеспечение масштабируемости LLM: облачный анализ с помощью AWS Fargate и Copilot
    В динамичной области искусственного интеллекта все большее распространение получают модели больших языков (LLM). Они жизненно важны для различных приложений, таких как интеллектуальные..