Я пытаюсь создать Spark StreamingContext
для потоковой передачи сообщений. из темы Кафки. Поэтому я добавил в свою сборку следующую зависимость:
"org.apache.spark:spark-streaming-kafka_2.10:1.6.2"
Затем я создал следующий класс:
import org.apache.spark.SparkContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.KafkaUtils
object StreamingApp {
def main(args: Array[String]): Unit = {
def messageConsumer(): StreamingContext = {
val topicName : String = "my-topic"
val brokerHostAndPort : String = "mykafka.example.com:9092"
val ssc = new StreamingContext(SparkContext.getOrCreate(), Seconds(10))
createKafkaStream(ssc, topicName, brokerHostAndPort).foreachRDD(rdd => {
rdd.foreach { msg =>
// TODO: Implement message processing here.
}
})
ssc
}
StreamingContext.getActive.foreach {
_.stop(stopSparkContext = false)
}
val ssc = StreamingContext.getActiveOrCreate(messageConsumer)
ssc.start()
ssc.awaitTermination()
}
def createKafkaStream(ssc: StreamingContext,
kafkaTopics: String, brokers: String): DStream[(String,
String)] = {
val kafkaParams = Map[String, String](
"bootstrap.servers" -> brokers,
"key.deserializer" -> "StringDeserializer",
"value.deserializer" -> "StringDeserializer",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> "false"
)
KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](
ssc, kafkaParams, Set(kafkaTopics))
}
}
Когда я компилирую это (через Ant, но это не имеет значения), я получаю scalac
ошибок компилятора:
[scalac] /Users/myuser/workspace/myapp/src/main/groovy/com/me/myapp/utils/scala/StreamingApp.scala:11: error: not found: object kafka
[scalac] import kafka.serializer.StringDecoder
[scalac] ^
[scalac] /Users/myuser/workspace/myapp/src/main/groovy/com/me/myapp/utils/scala/StreamingApp.scala:12: error: object kafka is not a member of package org.apache.spark.streaming
[scalac] import org.apache.spark.streaming.kafka.KafkaUtils
[scalac] ^
[scalac] /Users/myuser/workspace/myapp/src/main/groovy/com/me/myapp/utils/scala/StreamingApp.scala:63: error: not found: value KafkaUtils
[scalac] KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams, Set(kafkaTopics))
[scalac] ^
[scalac] three errors found
Я пропустил какие-либо зависимости здесь? Или не использовать правильные зависимости? Или кодируете что-то неправильно?
Обновлять:
Интересно, что когда я меняю свою зависимость на:
"org.apache.spark:spark-streaming-kafka_2.10:1.6.1"
Эти ошибки компилятора исчезают...