Я не мог понять этот дизайн API!
В приведенном ниже коде мы подписываемся на список тем с динамически назначаемым разделом. Это совершенно нормально.
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Arrays.asList("some-topic"));
while(true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
StreamSupport.stream(records.spliterator(), false)
.forEach(r -> {
System.out.println(r.key() + "::" + r.value());
});
}
Путаница здесь.
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
//seek for specific partition
TopicPartition partition = new TopicPartition("some-topic", 0);
consumer.assign(Arrays.asList(partition));
consumer.seek(partition, 0);
while(true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
StreamSupport.stream(records.spliterator(), false)
.forEach(r -> {
System.out.println(r.key() + "::" + r.value());
});
}
Вопрос:
- Мы уже присвоили список разделов с помощью метода
assign
. Почему методseek
также ищет информацию о разделе? Почему-то мне кажется, что это избыточно. - Метод
seek
имеет раздел с темой и смещением. Почему для вызоваseek
сначала требуетсяassign
?
consumer.seek(offset)
. зачем еще и раздел? по крайней мере, это нормально 2) назначение обязательно -java.lang.IllegalStateException: No current assignment for partition <topic>
- не имеет смысла! 28.07.2019