Я создал простое Java-приложение, которое использует Apache Spark для извлечения данных из Cassandra, выполняет некоторые преобразования и сохраняет их в другой таблице Cassandra.
Я использую Apache Spark 1.4.1, настроенный в автономном режиме кластера с одним ведущим и подчиненным, расположенными на моей машине.
DataFrame customers = sqlContext.cassandraSql("SELECT email, first_name, last_name FROM customer " +
"WHERE CAST(store_id as string) = '" + storeId + "'");
DataFrame customersWhoOrderedTheProduct = sqlContext.cassandraSql("SELECT email FROM customer_bought_product " +
"WHERE CAST(store_id as string) = '" + storeId + "' AND product_id = " + productId + "");
// We need only the customers who did not order the product
// We cache the DataFrame because we use it twice.
DataFrame customersWhoHaventOrderedTheProduct = customers
.join(customersWhoOrderedTheProduct
.select(customersWhoOrderedTheProduct.col("email")), customers.col("email").equalTo(customersWhoOrderedTheProduct.col("email")), "leftouter")
.where(customersWhoOrderedTheProduct.col("email").isNull())
.drop(customersWhoOrderedTheProduct.col("email"))
.cache();
int numberOfCustomers = (int) customersWhoHaventOrderedTheProduct.count();
Date reportTime = new Date();
// Prepare the Broadcast values. They are used in the map below.
Broadcast<String> bStoreId = sparkContext.broadcast(storeId, classTag(String.class));
Broadcast<String> bReportName = sparkContext.broadcast(MessageBrokerQueue.report_did_not_buy_product.toString(), classTag(String.class));
Broadcast<java.sql.Timestamp> bReportTime = sparkContext.broadcast(new java.sql.Timestamp(reportTime.getTime()), classTag(java.sql.Timestamp.class));
Broadcast<Integer> bNumberOfCustomers = sparkContext.broadcast(numberOfCustomers, classTag(Integer.class));
// Map the customers to a custom class, thus adding new properties.
DataFrame storeCustomerReport = sqlContext.createDataFrame(customersWhoHaventOrderedTheProduct.toJavaRDD()
.map(row -> new StoreCustomerReport(bStoreId.value(), bReportName.getValue(), bReportTime.getValue(), bNumberOfCustomers.getValue(), row.getString(0), row.getString(1), row.getString(2))), StoreCustomerReport.class);
// Save the DataFrame to cassandra
storeCustomerReport.write().mode(SaveMode.Append)
.option("keyspace", "my_keyspace")
.option("table", "my_report")
.format("org.apache.spark.sql.cassandra")
.save();
Как видите, я cache
customersWhoHaventOrderedTheProduct
DataFrame, после этого я выполняю count
и вызываю toJavaRDD
.
По моим расчетам эти действия должны быть выполнены только один раз. Но когда я захожу в пользовательский интерфейс Spark для текущего задания, я вижу следующие этапы:
Как видите, каждое действие выполняется дважды.
Я делаю что-то неправильно? Есть ли какие-либо настройки, которые я пропустил?
Любые идеи приветствуются.
ИЗМЕНИТЬ:
После того, как я позвонил System.out.println(storeCustomerReport.toJavaRDD().toDebugString());
Это строка отладки:
(200) MapPartitionsRDD[43] at toJavaRDD at DidNotBuyProductReport.java:93 []
| MapPartitionsRDD[42] at createDataFrame at DidNotBuyProductReport.java:89 []
| MapPartitionsRDD[41] at map at DidNotBuyProductReport.java:90 []
| MapPartitionsRDD[40] at toJavaRDD at DidNotBuyProductReport.java:89 []
| MapPartitionsRDD[39] at toJavaRDD at DidNotBuyProductReport.java:89 []
| MapPartitionsRDD[38] at toJavaRDD at DidNotBuyProductReport.java:89 []
| ZippedPartitionsRDD2[37] at toJavaRDD at DidNotBuyProductReport.java:89 []
| MapPartitionsRDD[31] at toJavaRDD at DidNotBuyProductReport.java:89 []
| ShuffledRDD[30] at toJavaRDD at DidNotBuyProductReport.java:89 []
+-(2) MapPartitionsRDD[29] at toJavaRDD at DidNotBuyProductReport.java:89 []
| MapPartitionsRDD[28] at toJavaRDD at DidNotBuyProductReport.java:89 []
| MapPartitionsRDD[27] at toJavaRDD at DidNotBuyProductReport.java:89 []
| MapPartitionsRDD[3] at cache at DidNotBuyProductReport.java:76 []
| CassandraTableScanRDD[2] at RDD at CassandraRDD.scala:15 []
| MapPartitionsRDD[36] at toJavaRDD at DidNotBuyProductReport.java:89 []
| ShuffledRDD[35] at toJavaRDD at DidNotBuyProductReport.java:89 []
+-(2) MapPartitionsRDD[34] at toJavaRDD at DidNotBuyProductReport.java:89 []
| MapPartitionsRDD[33] at toJavaRDD at DidNotBuyProductReport.java:89 []
| MapPartitionsRDD[32] at toJavaRDD at DidNotBuyProductReport.java:89 []
| MapPartitionsRDD[5] at cache at DidNotBuyProductReport.java:76 []
| CassandraTableScanRDD[4] at RDD at CassandraRDD.scala:15 []
ИЗМЕНИТЬ 2:
Итак, после некоторых исследований в сочетании с пробами и ошибками мне удалось оптимизировать работу.
Я создал RDD из customersWhoHaventOrderedTheProduct
и кэширую его перед вызовом действия count()
. (кэш я переместил с DataFrame
на RDD
).
После этого я использую этот RDD
для создания storeCustomerReport
DataFrame
.
JavaRDD<Row> customersWhoHaventOrderedTheProductRdd = customersWhoHaventOrderedTheProduct.javaRDD().cache();
Сейчас этапы выглядят так:
Как видите, двух count
и cache
больше нет, но есть еще два действия javaRDD. Я понятия не имею, откуда они берутся, так как я вызываю toJavaRDD
только один раз в своем коде.