У меня есть следующие наборы данных:
Dataset 1: Dataset 2: Dataset 3:
id field1 l_id r_id id field2
Вот их размеры: Dataset1: 20G Dataset2: 5T Dataset3: 20G
Цель: я хотел бы объединить все эти наборы данных в поле идентификатора (l_id с идентификатором из набора данных 1 и r_id с идентификатором из набора данных 3) с окончательным набором данных, чтобы он выглядел так:
l_id r_id field1 field2
Мой текущий подход: объедините Dataset1 и Dataset2 (по id и l_id), чтобы создать (l_id r_id field1), а затем присоедините его к Dataset3 (по r_id и id), чтобы создать (l_id r_id field1 field2) Я предполагая, что Spark автоматически использует разделитель хеша, просматривающий соединяемые поля. Однако этот подход приводит к тому, что одному из исполнителей не хватает места на диске, вероятно, из-за количества перетасовки.
Не могли бы вы подсказать, как мне присоединиться к этим наборам данных? Насколько я понимаю, Spark по умолчанию использует разделитель хешей, правильно глядя на соединяемые столбцы? Или мне нужно сначала вручную разделить данные, а затем выполнить соединения?
Обратите внимание, что широковещательная передача Dataset1 / 2 не является вариантом, поскольку они слишком велики и могут стать еще большими в будущем. Кроме того, все наборы данных не являются СДР с ключевыми значениями и содержат больше полей, чем перечисленные здесь. Поэтому я не уверен, как работает разделение по умолчанию и как я могу настроить собственный разделитель.
Спасибо.
Обновление 1:
Я использую hive SQL для выполнения всех соединений с параметром spark.sql.shuffle.partitions, установленным на 33000, и следующей конфигурацией:
sparkConf.set("spark.akka.frameSize", "500")
sparkConf.set("spark.storage.memoryFraction", "0.2")
sparkConf.set("spark.network.timeout", "1200")
sparkConf.set("spark.yarn.scheduler.heartbeat.interval-ms", "10000")
sparkConf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
sparkConf.set("spark.driver.maxResultSize", "0")
sparkConf.set("spark.shuffle.consolidateFiles", "true")
Я также контролирую, как создаются все эти наборы данных. Ни у одного из них, похоже, нет набора разделителей (если посмотреть на rdd.partitioner), и я не вижу никакого API в SQLContext, который позволил бы мне настроить разделитель при создании фрейма данных.
Я использую scala и Spark 1.3