У меня есть потребитель Kafka:
consumer.subscribe(statusTopicList);
try {
ConsumerRecords<String, String> consumerRecords =
consumer.poll(Duration.ofSeconds(60));
System.out.println("SIZE IS: " + consumerRecords());
for (ConsumerRecord<String, String > record : consumerRecords) {
System.out.println(“Record is: “ + record.value());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
И в моих модульных тестах:
((MockConsumer<String, String>) consumer)
.addRecord(new ConsumerRecord<>(
topic, 1, 0, "test-application1", record1));
((MockConsumer<String, String >) consumer)
.addRecord( new ConsumerRecord<>(
topic, 1, 0, "test-application2", record2));
Размер consumerRecords по-прежнему равен 1, хотя я добавляю 2 записи. Как я могу прочитать оба сообщения в одном опросе?
Мои потребительские свойства:
private static Consumer<String, SecondaryJoinStatus> createStatusConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG,
groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put("schema.registry.url",
schemaRegistryUrl);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
“EARLIEST");
props.put("max.partition.fetch.bytes", 5242880);
return new KafkaConsumer<>(props);
}
MockConsumer
23.07.2019