Nano Hash - криптовалюты, майнинг, программирование

Объединение наборов данных неравного размера в Spark

У меня есть следующие наборы данных:

Dataset 1:                 Dataset 2:                   Dataset 3:
id  field1                 l_id    r_id                 id field2

Вот их размеры: Dataset1: 20G Dataset2: 5T Dataset3: 20G

Цель: я хотел бы объединить все эти наборы данных в поле идентификатора (l_id с идентификатором из набора данных 1 и r_id с идентификатором из набора данных 3) с окончательным набором данных, чтобы он выглядел так:

l_id     r_id     field1      field2

Мой текущий подход: объедините Dataset1 и Dataset2 (по id и l_id), чтобы создать (l_id r_id field1), а затем присоедините его к Dataset3 (по r_id и id), чтобы создать (l_id r_id field1 field2) Я предполагая, что Spark автоматически использует разделитель хеша, просматривающий соединяемые поля. Однако этот подход приводит к тому, что одному из исполнителей не хватает места на диске, вероятно, из-за количества перетасовки.

Не могли бы вы подсказать, как мне присоединиться к этим наборам данных? Насколько я понимаю, Spark по умолчанию использует разделитель хешей, правильно глядя на соединяемые столбцы? Или мне нужно сначала вручную разделить данные, а затем выполнить соединения?

Обратите внимание, что широковещательная передача Dataset1 / 2 не является вариантом, поскольку они слишком велики и могут стать еще большими в будущем. Кроме того, все наборы данных не являются СДР с ключевыми значениями и содержат больше полей, чем перечисленные здесь. Поэтому я не уверен, как работает разделение по умолчанию и как я могу настроить собственный разделитель.

Спасибо.

Обновление 1:

Я использую hive SQL для выполнения всех соединений с параметром spark.sql.shuffle.partitions, установленным на 33000, и следующей конфигурацией:

sparkConf.set("spark.akka.frameSize", "500")
sparkConf.set("spark.storage.memoryFraction", "0.2")
sparkConf.set("spark.network.timeout", "1200")
sparkConf.set("spark.yarn.scheduler.heartbeat.interval-ms", "10000")
sparkConf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
sparkConf.set("spark.driver.maxResultSize", "0")
sparkConf.set("spark.shuffle.consolidateFiles", "true")

Я также контролирую, как создаются все эти наборы данных. Ни у одного из них, похоже, нет набора разделителей (если посмотреть на rdd.partitioner), и я не вижу никакого API в SQLContext, который позволил бы мне настроить разделитель при создании фрейма данных.

Я использую scala и Spark 1.3


  • Какая конфигурация у вашего кластера? 27.08.2015
  • Это кластер из 100 узлов с 60 ГБ ОЗУ и 745 ГБ дискового пространства на каждом. Моя конфигурация задания - это 20 ГБ памяти драйвера, 20 ГБ памяти исполнителя, 2 ядра исполнителя, 120 исполнителей числа, 33000 разделов spark.sql.shuffle и 2 spark.driver.cores. 28.08.2015
  • Можете ли вы опубликовать информацию о перетасованных записях и журналах от исполнителя, который не работает? Также, сколько места у вас есть в каталоге перемешивания, и, возможно, попробуйте настроить shuffle memoryFraction. 28.08.2015
  • Здравствуйте, в каталоге случайного воспроизведения около 15T свободного места. Тем не менее, задание приводит к примерно 18T при произвольной записи и 10T при произвольной записи. Я пытаюсь выяснить, есть ли способ перераспределить данные или прочитать более крупный набор данных партиями (выполнить соединения и объединить частичные наборы данных), чтобы уменьшить перетасовку 28.08.2015

Ответы:


1

Разделитель ваших данных зависит от того, откуда взялся RDD. Вам не нужно вручную повторно разбивать данные на разделы. Однако, если вы перераспределите свои данные, чтобы у них был один и тот же разделитель, тогда объединение (& cogrouping) приведет к узкому преобразованию вместо того, чтобы выполнять перемешивание как часть соединения. Обратите внимание, что в более новых версиях Spark (1.2+) перемешивание по умолчанию теперь является перемешиванием на основе сортировки вместо перемешивания на основе хеша.

Трудно сказать, как изменить ваши объединения без кода и журналов (и, возможно, было бы полезно также знать, как выглядит распределение идентификаторов).

Вы можете попробовать увеличить количество разделов (как входных, так и выходных), если возникнет проблема с несбалансированными данными. Одна из возможностей заключается в том, что ваше рабочее пространство слишком мало, вы можете настроить Spark для использования другого каталога для временного хранения с помощью spark.local.dir. Если ваш объект является kyro-сериализуемым (или если у вас есть время, чтобы добавить его), вы также можете посмотреть на изменение spark.serializer, поскольку другая сериализация может занять гораздо меньше места.

Хотя это напрямую не связано с завершением задания, вы также можете захотеть увеличить spark.shuffle.memoryFraction и уменьшить spark.storage.memoryFraction, чтобы уменьшить количество разлива на диск, требуемое во время перемешивания.

Один из вариантов, если у вас были данные с несколько иной структурой, было бы использовать cogroup, который поддерживает объединение многих RDD одновременно, но для этого требуется, чтобы все ключи были одинаковыми.

Примечание: все это предполагает, что вы работаете с необработанным Spark вместо Spark SQL. Для настройки объединений Spark SQL см. https://spark.apache.org/docs/latest/sql-programming-guide.html (особенно подумайте о настройке spark.sql.shuffle.partitions).

Надеюсь это поможет.

27.08.2015
  • Спасибо за ваш ответ. Я использую hive sql для выполнения соединений. Я обновил свой исходный пост, используя конфигурацию, которую использую. 28.08.2015
  • Новые материалы

    Кластеризация: более глубокий взгляд
    Кластеризация — это метод обучения без учителя, в котором мы пытаемся найти группы в наборе данных на основе некоторых известных или неизвестных свойств, которые могут существовать. Независимо от..

    Как написать эффективное резюме
    Предложения по дизайну и макету, чтобы представить себя профессионально Вам не позвонили на собеседование после того, как вы несколько раз подали заявку на работу своей мечты? У вас может..

    Частный метод Python: улучшение инкапсуляции и безопасности
    Введение Python — универсальный и мощный язык программирования, известный своей простотой и удобством использования. Одной из ключевых особенностей, отличающих Python от других языков, является..

    Как я автоматизирую тестирование с помощью Jest
    Шутка для победы, когда дело касается автоматизации тестирования Одной очень важной частью разработки программного обеспечения является автоматизация тестирования, поскольку она создает..

    Работа с векторными символическими архитектурами, часть 4 (искусственный интеллект)
    Hyperseed: неконтролируемое обучение с векторными символическими архитектурами (arXiv) Автор: Евгений Осипов , Сачин Кахавала , Диланта Хапутантри , Тимал Кемпития , Дасвин Де Сильва ,..

    Понимание расстояния Вассерштейна: мощная метрика в машинном обучении
    В обширной области машинного обучения часто возникает необходимость сравнивать и измерять различия между распределениями вероятностей. Традиционные метрики расстояния, такие как евклидово..

    Обеспечение масштабируемости LLM: облачный анализ с помощью AWS Fargate и Copilot
    В динамичной области искусственного интеллекта все большее распространение получают модели больших языков (LLM). Они жизненно важны для различных приложений, таких как интеллектуальные..