Я пытаюсь прочитать сообщения Kafka с помощью Spark Streaming, выполнить некоторые вычисления и отправить результаты другому процессу.
val jsonObject = new JSONObject
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
stream.foreachRDD { rdd => {
val jsonDF = spark.read.json(rdd.map(_._2))
val res = jsonDF.groupBy("artist").count.sort(col("count").desc).take(10)
/*Some Transformations => create jsonArray*/
jsonObject.put("Key", jsonArray)
}}
ssc.start()
Мне нужно накопить JSONObject (глобальную переменную) для моего требования. put
выдает исключение NotSerializable.
java.io.NotSerializableException: объект org.apache.spark.streaming.kafka.DirectKafkaInputDStream$MappedDStream сериализуется, возможно, как часть закрытия операции RDD. Это связано с тем, что на объект DStream ссылаются внутри замыкания. Пожалуйста, перепишите операцию RDD внутри этого DStream, чтобы избежать этого. Это было сделано, чтобы избежать раздувания задач Spark ненужными объектами.
Можно ли отправить этот jsonArray из этого блока foreahRDD? Я не хочу писать в файлы или базы данных.