Я должен создать компонент Spark, который подписывается на темы, получает данные в формате JSON, применяет указанные пользователем запросы к данным и в конечном итоге записывает результаты в другую тему Kafka. С моим нынешним пониманием и исследованиями я пришел к следующему скелету реализации:
SparkSession spark = SparkSession
.builder()
.appName("StructuredStreamingTest")
.master("local[*]")
.getOrCreate();
// Take a batch of data to infer the schema
Dataset<String> sample = spark
.read()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", mytopic)
.load()
.selectExpr("CAST(value AS STRING)")
.as(Encoders.STRING());
StructType sch = spark.read().json(sample).schema();
// The actual stream
Dataset<String> lines = spark
.readStream()
.format("kafka")
.options(kafkaParams)
.option(subscribeType, topics)
.load()
.selectExpr("CAST(value AS STRING)")
.as(Encoders.STRING());
Это должно вернуть объект набора данных (я полагаю, Dataframe в Scala) со столбцом «значение» и фрагментом данных JSON под ним. Теперь фактический вопрос заключается в том, как мы можем разобрать JSON в набор данных в соответствии со схемой, которую мы получили выше, сделать и SQL запросить данные в формате одной строки, преобразовать полученный набор данных обратно в JSON и записать его в Kafka?
РЕДАКТИРОВАТЬ: Действительно ли это дубликат, поскольку указанный пост находится в Scala?