Мой сценарий: JdbcPagingItemReader
читает из Oracle db и возвращает объект, скажем, «Сотрудник». Затем этот объект «Employee
» передается процессору, чтобы сделать еще один вызов db, чтобы извлечь дополнительную информацию из нескольких таблиц и вернуть объект «AggregatedEmployee
» (фактически он расширяет Employee). Я использую KafkaItemWriter
для записи обработанного объекта в Kafka, но вместо записи AggregatedEmployee
писатель пытается написать сам «Сотрудник».
@Mahmoud Ben Hassine: Я видел много ваших предложений по Spring Batch. Пожалуйста, поделитесь своими мыслями.
Код интерфейса процессора:
public interface PageProcessor<T> {
<R extends Employee> R process(T page);
}
Код Step Bean:
@Bean
protected Step step1 (CompositeJdbcPagingItemReader <Employee> reader, KafkaItemWriter <String, AggregatedEmployee> writer) {
return steps.get("step1")
.<Employee, AggregatedEmployee>chunk(5).
reader(reader).
writer(writer).build();
}
Класс реализации кода интерфейса процессора:
public class EmployeeProcessor implements PageProcessor<Employee> {
private NamedParameterJdbcTemplate jdbcTemplate;
public void setDataSource(DataSource dataSource) {
jdbcTemplate = new NamedParameterJdbcTemplate(dataSource);
}
@SuppressWarnings("unchecked")
@Override
public <R extends Employee> R process(Employee page) {
... implementation goes here
}
KafkaItemWriter Bean:
@Bean
KafkaItemWriter<String,AggregatedEmployee> writer(){
return new KafkaItemWriterBuilder<String, AggregatedEmployee>()
.kafkaTemplate(aggregatedEmployeekafkaTemplate)
.itemKeyMapper(aggregatedEmployee -> String.valueOf(aggregatedEmployee.getEmployeeId()))
.build();
}
Отредактировано, чтобы показать процессор:
public class CompositeJdbcPagingItemReader<T> extends JdbcPagingItemReader<T> {
private PageProcessor<T> pageProcessor;
public void setPageProcessor(PageProcessor<T> pageProcessor) {
this.pageProcessor = pageProcessor;
}
И когда создается bean-компонент Reader, объект процессора также создается и устанавливается в считыватель с помощью показанного выше установщика, и логика процессора, написанная в EmployeeProcessor, также выполняется.
Ошибка:
java.lang.ClassCastException: class com.sample.model.Employee cannot be cast to class com.sample.model.AggregatedEmployee (com.sample.model.Employee and com.sample.model.AggregatedEmployee are in unnamed module of loader 'app')
at org.springframework.batch.item.KeyValueItemWriter.write(KeyValueItemWriter.java:43)
Employee
вAggregatedEmployee
не произойдет, что является причиной вашей проблемы. 19.09.2020