Nano Hash - криптовалюты, майнинг, программирование

Spark выполняет каждое действие два раза

Я создал простое Java-приложение, которое использует Apache Spark для извлечения данных из Cassandra, выполняет некоторые преобразования и сохраняет их в другой таблице Cassandra.

Я использую Apache Spark 1.4.1, настроенный в автономном режиме кластера с одним ведущим и подчиненным, расположенными на моей машине.

DataFrame customers = sqlContext.cassandraSql("SELECT email, first_name, last_name FROM customer " +
    "WHERE CAST(store_id as string) = '" + storeId + "'");

DataFrame customersWhoOrderedTheProduct = sqlContext.cassandraSql("SELECT email FROM customer_bought_product " +
    "WHERE CAST(store_id as string) = '" + storeId + "' AND product_id = " + productId + "");

// We need only the customers who did not order the product
// We cache the DataFrame because we use it twice.
DataFrame customersWhoHaventOrderedTheProduct = customers
    .join(customersWhoOrderedTheProduct
    .select(customersWhoOrderedTheProduct.col("email")), customers.col("email").equalTo(customersWhoOrderedTheProduct.col("email")), "leftouter")
    .where(customersWhoOrderedTheProduct.col("email").isNull())
    .drop(customersWhoOrderedTheProduct.col("email"))
    .cache();

int numberOfCustomers = (int) customersWhoHaventOrderedTheProduct.count();

Date reportTime = new Date();

// Prepare the Broadcast values. They are used in the map below.
Broadcast<String> bStoreId = sparkContext.broadcast(storeId, classTag(String.class));
Broadcast<String> bReportName = sparkContext.broadcast(MessageBrokerQueue.report_did_not_buy_product.toString(), classTag(String.class));
Broadcast<java.sql.Timestamp> bReportTime = sparkContext.broadcast(new java.sql.Timestamp(reportTime.getTime()), classTag(java.sql.Timestamp.class));
Broadcast<Integer> bNumberOfCustomers = sparkContext.broadcast(numberOfCustomers, classTag(Integer.class));

// Map the customers to a custom class, thus adding new properties.
DataFrame storeCustomerReport = sqlContext.createDataFrame(customersWhoHaventOrderedTheProduct.toJavaRDD()
    .map(row -> new StoreCustomerReport(bStoreId.value(), bReportName.getValue(), bReportTime.getValue(), bNumberOfCustomers.getValue(), row.getString(0), row.getString(1), row.getString(2))), StoreCustomerReport.class);


// Save the DataFrame to cassandra
storeCustomerReport.write().mode(SaveMode.Append)
    .option("keyspace", "my_keyspace")
    .option("table", "my_report")
    .format("org.apache.spark.sql.cassandra")
    .save();

Как видите, я cache customersWhoHaventOrderedTheProduct DataFrame, после этого я выполняю countи вызываю toJavaRDD.

По моим расчетам эти действия должны быть выполнены только один раз. Но когда я захожу в пользовательский интерфейс Spark для текущего задания, я вижу следующие этапы: введите здесь описание изображения

Как видите, каждое действие выполняется дважды.

Я делаю что-то неправильно? Есть ли какие-либо настройки, которые я пропустил?

Любые идеи приветствуются.


ИЗМЕНИТЬ:

После того, как я позвонил System.out.println(storeCustomerReport.toJavaRDD().toDebugString());

Это строка отладки:

(200) MapPartitionsRDD[43] at toJavaRDD at DidNotBuyProductReport.java:93 []
  |   MapPartitionsRDD[42] at createDataFrame at DidNotBuyProductReport.java:89 []
  |   MapPartitionsRDD[41] at map at DidNotBuyProductReport.java:90 []
  |   MapPartitionsRDD[40] at toJavaRDD at DidNotBuyProductReport.java:89 []
  |   MapPartitionsRDD[39] at toJavaRDD at DidNotBuyProductReport.java:89 []
  |   MapPartitionsRDD[38] at toJavaRDD at DidNotBuyProductReport.java:89 []
  |   ZippedPartitionsRDD2[37] at toJavaRDD at DidNotBuyProductReport.java:89 []
  |   MapPartitionsRDD[31] at toJavaRDD at DidNotBuyProductReport.java:89 []
  |   ShuffledRDD[30] at toJavaRDD at DidNotBuyProductReport.java:89 []
  +-(2) MapPartitionsRDD[29] at toJavaRDD at DidNotBuyProductReport.java:89 []
     |  MapPartitionsRDD[28] at toJavaRDD at DidNotBuyProductReport.java:89 []
     |  MapPartitionsRDD[27] at toJavaRDD at DidNotBuyProductReport.java:89 []
     |  MapPartitionsRDD[3] at cache at DidNotBuyProductReport.java:76 []
     |  CassandraTableScanRDD[2] at RDD at CassandraRDD.scala:15 []
  |   MapPartitionsRDD[36] at toJavaRDD at DidNotBuyProductReport.java:89 []
  |   ShuffledRDD[35] at toJavaRDD at DidNotBuyProductReport.java:89 []
  +-(2) MapPartitionsRDD[34] at toJavaRDD at DidNotBuyProductReport.java:89 []
     |  MapPartitionsRDD[33] at toJavaRDD at DidNotBuyProductReport.java:89 []
     |  MapPartitionsRDD[32] at toJavaRDD at DidNotBuyProductReport.java:89 []
     |  MapPartitionsRDD[5] at cache at DidNotBuyProductReport.java:76 []
     |  CassandraTableScanRDD[4] at RDD at CassandraRDD.scala:15 []

ИЗМЕНИТЬ 2:

Итак, после некоторых исследований в сочетании с пробами и ошибками мне удалось оптимизировать работу.

Я создал RDD из customersWhoHaventOrderedTheProduct и кэширую его перед вызовом действия count(). (кэш я переместил с DataFrame на RDD).

После этого я использую этот RDD для создания storeCustomerReport DataFrame.

JavaRDD<Row> customersWhoHaventOrderedTheProductRdd = customersWhoHaventOrderedTheProduct.javaRDD().cache();

Сейчас этапы выглядят так:

введите здесь описание изображения

Как видите, двух count и cache больше нет, но есть еще два действия javaRDD. Я понятия не имею, откуда они берутся, так как я вызываю toJavaRDD только один раз в своем коде.


  • Вы можете распечатать toDebugString вместо storeCustomerReport и посмотреть, что там написано? 17.11.2015
  • @climbage Я отредактировал свой вопрос и добавил строку отладки 18.11.2015

Ответы:


1

Похоже, вы применяете два действия в сегменте кода ниже.

// Map the customers to a custom class, thus adding new properties.
DataFrame storeCustomerReport = sqlContext.createDataFrame(customersWhoHaventOrderedTheProduct.toJavaRDD()
    .map(row -> new StoreCustomerReport(bStoreId.value(), bReportName.getValue(), bReportTime.getValue(), bNumberOfCustomers.getValue(), row.getString(0), row.getString(1), row.getString(2))), StoreCustomerReport.class);


// Save the DataFrame to cassandra
storeCustomerReport.write().mode(SaveMode.Append)
    .option("keyspace", "my_keyspace")

Один в sqlContext.createDataFrame(), а другой в storeCustomerReport.write(), и оба они требуют customersWhoHaventOrderedTheProduct.toJavaRDD().

Сохранение созданного RDD должно решить эту проблему.

JavaRDD cachedRdd = customersWhoHaventOrderedTheProduct.toJavaRDD().persist(StorageLevel.DISK_AND_MEMORY) //Or any other storage level
25.11.2016
Новые материалы

Кластеризация: более глубокий взгляд
Кластеризация — это метод обучения без учителя, в котором мы пытаемся найти группы в наборе данных на основе некоторых известных или неизвестных свойств, которые могут существовать. Независимо от..

Как написать эффективное резюме
Предложения по дизайну и макету, чтобы представить себя профессионально Вам не позвонили на собеседование после того, как вы несколько раз подали заявку на работу своей мечты? У вас может..

Частный метод Python: улучшение инкапсуляции и безопасности
Введение Python — универсальный и мощный язык программирования, известный своей простотой и удобством использования. Одной из ключевых особенностей, отличающих Python от других языков, является..

Как я автоматизирую тестирование с помощью Jest
Шутка для победы, когда дело касается автоматизации тестирования Одной очень важной частью разработки программного обеспечения является автоматизация тестирования, поскольку она создает..

Работа с векторными символическими архитектурами, часть 4 (искусственный интеллект)
Hyperseed: неконтролируемое обучение с векторными символическими архитектурами (arXiv) Автор: Евгений Осипов , Сачин Кахавала , Диланта Хапутантри , Тимал Кемпития , Дасвин Де Сильва ,..

Понимание расстояния Вассерштейна: мощная метрика в машинном обучении
В обширной области машинного обучения часто возникает необходимость сравнивать и измерять различия между распределениями вероятностей. Традиционные метрики расстояния, такие как евклидово..

Обеспечение масштабируемости LLM: облачный анализ с помощью AWS Fargate и Copilot
В динамичной области искусственного интеллекта все большее распространение получают модели больших языков (LLM). Они жизненно важны для различных приложений, таких как интеллектуальные..