Nano Hash - криптовалюты, майнинг, программирование

Spark сортирует уже отсортированные разделы, что приводит к потере производительности

Для кешированного фрейма данных, разделенного и отсортированного внутри разделов, я получаю хорошую производительность при запросе ключа с предложением where, но плохую производительность при выполнении соединения с небольшой таблицей по тому же ключу.

См. пример набора данных dftest ниже с 10Kx44K = 438M строк.

sqlContext.sql(f'set spark.sql.shuffle.partitions={32}')
sqlContext.clearCache()
sc.setCheckpointDir('/checkpoint/temp')
import datetime
from pyspark.sql.functions import *
from pyspark.sql import Row

start_date = datetime.date(1900, 1, 1)
end_date   = datetime.date(2020, 1, 1)

dates = [ start_date + datetime.timedelta(n) for n in range(int ((end_date - start_date).days))]

dfdates=spark.createDataFrame(list(map(lambda x: Row(date=x), dates))) # some dates
dfrange=spark.createDataFrame(list(map(lambda x: Row(number=x), range(10000)))) # some number range

dfjoin = dfrange.crossJoin(dfdates)
dftest = dfjoin.withColumn("random1", round(rand()*(10-5)+5,0)).withColumn("random2", round(rand()*(10-5)+5,0)).withColumn("random3", round(rand()*(10-5)+5,0)).withColumn("random4", round(rand()*(10-5)+5,0)).withColumn("random5", round(rand()*(10-5)+5,0)).checkpoint()
dftest = dftest.repartition("number").sortWithinPartitions("number", "date").cache()
dftest.count() # 438,290,000 rows

Следующий запрос теперь занимает примерно секунду (в небольшом кластере с двумя рабочими процессами):

dftest.where("number = 1000 and date = \"2001-04-04\"").count()

Однако, когда я пишу подобное условие как соединение, это занимает 2 минуты:

dfsub = spark.createDataFrame([(10,"1900-01-02",1),
  (1000,"2001-04-04",2),
  (4000,"2002-05-05",3),
  (5000,"1950-06-06",4),
  (9875,"1980-07-07",5)],
["number","date", "dummy"]).repartition("number").sortWithinPartitions("number", "date").cache()
df_result = dftest.join(dfsub, ( dftest.number == dfsub.number ) & ( dftest.date == dfsub.date ), 'inner').cache()
df_result.count() # takes 2 minutes (result = 5)

Я ожидал, что это будет примерно одинаково быстро. Тем более, что я надеюсь, что больший фрейм данных уже сгруппирован и кэширован. Глядя на план:

== Physical Plan ==
InMemoryTableScan [number#771L, date#769, random1#775, random2#779, random3#784, random4#790, random5#797, number#945L, date#946, dummy#947L]
   +- InMemoryRelation [number#771L, date#769, random1#775, random2#779, random3#784, random4#790, random5#797, number#945L, date#946, dummy#947L], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *(3) SortMergeJoin [number#771L, cast(date#769 as string)], [number#945L, date#946], Inner
            :- *(1) Sort [number#771L ASC NULLS FIRST, cast(date#769 as string) ASC NULLS FIRST], false, 0
            :  +- *(1) Filter (isnotnull(number#771L) && isnotnull(date#769))
            :     +- InMemoryTableScan [number#771L, date#769, random1#775, random2#779, random3#784, random4#790, random5#797], [isnotnull(number#771L), isnotnull(date#769)]
            :           +- InMemoryRelation [number#771L, date#769, random1#775, random2#779, random3#784, random4#790, random5#797], StorageLevel(disk, memory, deserialized, 1 replicas)
            :                 +- Sort [number#771L ASC NULLS FIRST, date#769 ASC NULLS FIRST], false, 0
            :                    +- Exchange hashpartitioning(number#771L, 32)
            :                       +- *(1) Scan ExistingRDD[number#771L,date#769,random1#775,random2#779,random3#784,random4#790,random5#797]
            +- *(2) Filter (isnotnull(number#945L) && isnotnull(date#946))
               +- InMemoryTableScan [number#945L, date#946, dummy#947L], [isnotnull(number#945L), isnotnull(date#946)]
                     +- InMemoryRelation [number#945L, date#946, dummy#947L], StorageLevel(disk, memory, deserialized, 1 replicas)
                           +- Sort [number#945L ASC NULLS FIRST, date#946 ASC NULLS FIRST], false, 0
                              +- Exchange hashpartitioning(number#945L, 32)
                                 +- *(1) Scan ExistingRDD[number#945L,date#946,dummy#947L]

Кажется, много времени уходит на сортировку большего фрейма данных по номеру и дате (эта строка: Sort [number#771L ASC NULLS FIRST, date#769 ASC NULLS FIRST], false, 0). Это оставляет меня со следующими вопросами:

  • внутри разделов порядок сортировки как для левой, так и для правой стороны одинаков и оптимален для предложения JOIN, почему Spark снова сортирует разделы?
  • поскольку 5 записей соединения соответствуют (до) 5 разделам, почему оцениваются все разделы?
  • Кажется, Catalyst не использует информацию repartition и sortWithinPartitions кэшированного фрейма данных. Имеет ли смысл использовать sortWithinPartitions в подобных случаях?

Ответы:


1

Позвольте мне попытаться ответить на ваши три вопроса:

внутри разделов порядок сортировки как для левой, так и для правой стороны одинаков и оптимален для предложения JOIN, почему Spark снова сортирует разделы?

Порядок сортировки в обоих кадрах данных НЕ одинаков из-за разных типов данных в вашем столбце сортировки date, в dfsub это StringType, а в dftest это DateType, поэтому во время соединения Spark видит, что порядок в обеих ветвях разный, и поэтому принудительно Sort.

поскольку 5 записей соединения соответствуют (до) 5 разделам, почему оцениваются все разделы?

Во время обработки плана запроса Spark не знает, сколько непустых разделов в небольшом DataFrame, и поэтому должен обрабатывать их все.

Кажется, Catalyst не использует информацию о перераспределении и sortWithinPartitions кэшированного фрейма данных. Имеет ли смысл использовать sortWithinPartitions в подобных случаях?

Оптимизатор Spark использует информацию из repartition и sortWithinPartitions, но есть некоторые оговорки о том, как это работает. Чтобы исправить ваш запрос, также важно перераспределить те же столбцы (оба из них), которые вы используете в объединении (а не только один столбец). В принципе, в этом нет необходимости, и существует соответствующая jira, которая пытается решить это.

Итак, вот мои предлагаемые изменения в вашем запросе:

  1. Измените тип столбца date на StringType в dftest (или аналогичным образом измените на DateType в dfsub):

    dftest.withColumn("date", col("date").cast('string'))
    
  2. В обоих DataFrames измените

    .repartition("number")
    

    to

    .repartition("number", "date")
    

После этих изменений у вас должен получиться вот такой план:

*(3) SortMergeJoin [number#1410L, date#1653], [number#1661L, date#1662], Inner
:- Sort [number#1410L ASC NULLS FIRST, date#1653 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(number#1410L, date#1653, 200)
:     +- *(1) Project [number#1410L, cast(date#1408 as string) AS date#1653, random1#1540, random2#1544, random3#1549, random4#1555, random5#1562]
:        +- *(1) Filter (isnotnull(number#1410L) && isnotnull(cast(date#1408 as string)))
:           +- *(1) Scan ExistingRDD[number#1410L,date#1408,random1#1540,random2#1544,random3#1549,random4#1555,random5#1562]
+- Sort [number#1661L ASC NULLS FIRST, date#1662 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(number#1661L, date#1662, 200)
      +- *(2) Filter (isnotnull(number#1661L) && isnotnull(date#1662))
         +- *(2) Scan ExistingRDD[number#1661L,date#1662,dummy#1663L]

поэтому в каждой ветви плана есть только один Exchange и один Sort, оба исходят из repartition и sortWithinPartition, которые вы вызываете в своих преобразованиях, и объединение больше не вызывает сортировки или перетасовки. Также обратите внимание, что в моем плане нет InMemoryTableScan, так как я не использовал кеш.

16.07.2019
Новые материалы

Кластеризация: более глубокий взгляд
Кластеризация — это метод обучения без учителя, в котором мы пытаемся найти группы в наборе данных на основе некоторых известных или неизвестных свойств, которые могут существовать. Независимо от..

Как написать эффективное резюме
Предложения по дизайну и макету, чтобы представить себя профессионально Вам не позвонили на собеседование после того, как вы несколько раз подали заявку на работу своей мечты? У вас может..

Частный метод Python: улучшение инкапсуляции и безопасности
Введение Python — универсальный и мощный язык программирования, известный своей простотой и удобством использования. Одной из ключевых особенностей, отличающих Python от других языков, является..

Как я автоматизирую тестирование с помощью Jest
Шутка для победы, когда дело касается автоматизации тестирования Одной очень важной частью разработки программного обеспечения является автоматизация тестирования, поскольку она создает..

Работа с векторными символическими архитектурами, часть 4 (искусственный интеллект)
Hyperseed: неконтролируемое обучение с векторными символическими архитектурами (arXiv) Автор: Евгений Осипов , Сачин Кахавала , Диланта Хапутантри , Тимал Кемпития , Дасвин Де Сильва ,..

Понимание расстояния Вассерштейна: мощная метрика в машинном обучении
В обширной области машинного обучения часто возникает необходимость сравнивать и измерять различия между распределениями вероятностей. Традиционные метрики расстояния, такие как евклидово..

Обеспечение масштабируемости LLM: облачный анализ с помощью AWS Fargate и Copilot
В динамичной области искусственного интеллекта все большее распространение получают модели больших языков (LLM). Они жизненно важны для различных приложений, таких как интеллектуальные..