Nano Hash - криптовалюты, майнинг, программирование

Pyspark несколько заданий параллельно

У меня следующая ситуация с моим Pyspark:

В моей программе-драйвере (driver.py) я вызываю функцию из другого файла (prod.py)

latest_prods = prod.featurize_prods(). 

Код драйвера:

from Featurize import Featurize
from LatestProd import LatestProd
from Oldprod import Oldprod

sc = SparkContext()

if __name__ == '__main__':
    print 'Into main'

featurize_latest = Featurize('param1', 'param2', sc)

latest_prod = LatestProd(featurize_latest)
latest_prods = latest_prod.featurize_prods()
featurize_old = Featurize('param3', 'param3', sc)

old_prods = Oldprod(featurize_old)
old_prods = oldprod.featurize_oldprods()
total_prods =  sc.union([latest_prods, old_prods])

Затем я делаю здесь некоторый код reduceByKey... который генерирует total_prods_processed.

Наконец я звоню:

total_prods_processed.saveAsTextFile(...)

Я хотел бы генерировать last_prods и old_prods параллельно. Оба созданы в одном и том же SparkContext. Возможно ли это сделать? Если нет, как я могу добиться этой функциональности?

Это то, что делает Spark автоматически? Я не вижу такого поведения при запуске кода, поэтому, пожалуйста, дайте мне знать, является ли это параметром конфигурации.


  • Не могли бы вы опубликовать реальный код и отформатировать его? Что вы имеете в виду под параллельно? Обрабатывать одновременно или в распределенной системе? 27.11.2015
  • Код выложен. Я хотел бы вычислить оба параллельно в кластере. 27.11.2015

Ответы:


1

После поиска в Интернете, я думаю, что ваша проблема может быть решена потоками. Это так же просто, как создать два потока для вашей работы old_prod и last_prod.

Проверьте этот пост для упрощенного примера. Поскольку Spark является потокобезопасным, вы получаете параллельную эффективность без каких-либо жертв.

13.11.2018

2

Короткий ответ: нет, вы не можете запланировать операции на двух разных RDD одновременно в одном и том же контексте искры. Однако есть некоторые обходные пути, вы можете обработать их в двух разных SparkContext в одном кластере и вызвать SaveAsTextFile. Затем прочитайте оба в другом задании, чтобы выполнить объединение. (это не рекомендуется документацией). Если вы хотите попробовать этот метод, он обсуждается здесь с использованием spark-jobserver, поскольку по умолчанию spark не поддерживает несколько контекстов: https://github.com/spark-jobserver/spark-jobserver/issues/147

Однако в зависимости от операций, которые вы выполняете, нет причин обрабатывать их одновременно, поскольку вам нужны полные результаты для выполнения объединения, искра разделит эти операции на 2 разных этапа, которые будут выполняться один за другим.

27.11.2015
  • Каждый из них занимает около 8-10 минут. Если я могу обрабатывать их одновременно, то я могу получить код объединения за 8-10 минут, а не за ~20 минут (выполняется последовательно). 27.11.2015
  • Spark пытается оптимизировать использование процессора и памяти, поэтому одновременное выполнение большего количества операций сделает их медленнее. Если вы хотите, чтобы обработка была быстрее, вы бы предпочли масштабировать свой кластер (обновите рабочие машины до большего числа процессоров или добавьте больше рабочих). 27.11.2015
  • Также ключевой причиной является избежание дискового ввода-вывода. Я хочу избежать написания SaveAsTextFile несколько раз. Поскольку у меня около 10 таких RDDS, которые нужно сначала обработать, а затем создать объединение. Здесь для простоты я только что показал 2. Каждый из этих RDD при обработке генерирует большой набор данных, и необходимость записывать их, а затем снова читать неэффективна. В моем кластере 20 узлов с несколькими сотнями ядер. 27.11.2015
  • Это не так. Вы можете планировать и выполнять несколько заданий одновременно, используя один контекст. Хотя обычно это не лучшая идея. 27.11.2015
  • Наличие нескольких больших RDD было бы достаточной причиной для записи на диск, поскольку вы не можете кэшировать все в памяти вашего кластера. Иногда вы не можете избежать дискового ввода-вывода, потому что у вас есть ограниченный объем памяти, доступный в вашем кластере, по сравнению с размером набора данных. 27.11.2015
  • @zero323, вы можете использовать пулы потоков, но Spark уже вычисляет задачи одновременно, не было бы никакой реальной причины делать это при такой обработке. Однако, если вы хотите посмотреть на это, здесь есть пример: stackoverflow.com/questions/30214474/ 27.11.2015
  • Сумма памяти всех рабочих узлов намного больше, чем набор данных, поэтому я очень склонен избегать дорогостоящего дискового ввода-вывода, если это вообще возможно. 27.11.2015
  • Кажется очевидным, что spark будет обрабатывать такое же количество задач одновременно в кластере, поскольку он уже использует максимум ресурсов, которые у вас есть. Так что я думаю, что у вас уже есть наилучший вариант, но я могу ошибаться. Вы по-прежнему можете искать другие узкие места в вашей программе spark, чтобы сделать ее быстрее. 27.11.2015
  • @PaulK Я утверждаю, что есть сценарии, в которых полезна параллельная или синхронная отправка заданий. Выполнение нескольких небольших (относительно количества ресурсов) действий одновременно. 27.11.2015
  • @zero323, есть какие-нибудь указания, как я могу сделать это для своего приложения? 27.11.2015
  • @ user3803714 Конечно, но вам нужно быть более точным в отношении того, что именно происходит в вашем коде. 28.11.2015
  • По сути, я хочу иметь программу-драйвер, в которой я создаю SparkContext, а затем разворачиваю и генерирую 10 RDD одновременно в моем кластере (разветвление), а затем выполняется операция объединения, которая зависит от всех этих вычисляемых RDD. Я могу тривиально сделать это, запустив 10 независимых скриптов pyspark в 10 контекстах, а затем каждый скрипт в конце запишет в файл. Затем создайте еще один скрипт, который читает эти 10 файлов, а затем вызывает union. Недостатком является больше дискового ввода-вывода. Я хочу использовать один и тот же контекст, создавать их параллельно, а затем вызывать объединение в моей программе драйвера. Рекомендуемый способ сделать это? 28.11.2015
  • Если все, что вам нужно, это процесс и объединение, тогда @PaulK. верно - это уже делается параллельно. Если вы считаете, что ресурсы не используются оптимально, вы можете уменьшить параллелизм на уровне RDD. 28.11.2015
  • Новые материалы

    Кластеризация: более глубокий взгляд
    Кластеризация — это метод обучения без учителя, в котором мы пытаемся найти группы в наборе данных на основе некоторых известных или неизвестных свойств, которые могут существовать. Независимо от..

    Как написать эффективное резюме
    Предложения по дизайну и макету, чтобы представить себя профессионально Вам не позвонили на собеседование после того, как вы несколько раз подали заявку на работу своей мечты? У вас может..

    Частный метод Python: улучшение инкапсуляции и безопасности
    Введение Python — универсальный и мощный язык программирования, известный своей простотой и удобством использования. Одной из ключевых особенностей, отличающих Python от других языков, является..

    Как я автоматизирую тестирование с помощью Jest
    Шутка для победы, когда дело касается автоматизации тестирования Одной очень важной частью разработки программного обеспечения является автоматизация тестирования, поскольку она создает..

    Работа с векторными символическими архитектурами, часть 4 (искусственный интеллект)
    Hyperseed: неконтролируемое обучение с векторными символическими архитектурами (arXiv) Автор: Евгений Осипов , Сачин Кахавала , Диланта Хапутантри , Тимал Кемпития , Дасвин Де Сильва ,..

    Понимание расстояния Вассерштейна: мощная метрика в машинном обучении
    В обширной области машинного обучения часто возникает необходимость сравнивать и измерять различия между распределениями вероятностей. Традиционные метрики расстояния, такие как евклидово..

    Обеспечение масштабируемости LLM: облачный анализ с помощью AWS Fargate и Copilot
    В динамичной области искусственного интеллекта все большее распространение получают модели больших языков (LLM). Они жизненно важны для различных приложений, таких как интеллектуальные..