Я отправил одно сообщение своей Кафке, используя следующий код:
def getHealthSink(kafkaHosts: String, zkHosts: String) = {
val kafkaHealth: Subscriber[String] = kafka.publish(ProducerProperties(
brokerList = kafkaHosts,
topic = "health_check",
encoder = new StringEncoder()
))
Sink.fromSubscriber(kafkaHealth).runWith(Source.single("test"))
}
val kafkaHealth = getHealthSink(kafkaHosts, zkHosts)
и я получил следующее сообщение об ошибке:
ОШИБКА kafka.utils.Utils$ при получении метаданных темы для тем [Set(health_check)] от брокера [ArrayBuffer(id:0,host:****,port:9092)] не удалось kafka.common.KafkaException: получение метаданных темы для темы [Set (health_check)] от брокера [ArrayBuffer (id: 0, host: ****, порт: 9092)] не удалось
Вы хоть представляете, в чем может быть проблема?