Nano Hash - криптовалюты, майнинг, программирование

kafka.common.FailedToSendMessageException: ошибка kafka

Я читаю файл json и пытаюсь создать его с помощью kafka. Вот мой код:

public class FlatFileDataProducer {

    private String topic = "JsonTopic";
    private Producer<String, String> producer = null;
    KeyedMessage<String, String> message = null;
    public JsonReader reader;

    public void run(String jsonPath) throws ClassNotFoundException, FileNotFoundException, IOException, ParseException{
        reader = new JsonReader();
        System.out.println("---------------------");
        System.out.println("JSON FILE PATH IS : "+jsonPath);
        System.out.println("---------------------");
        Properties prop = new Properties();
        prop.put("metadata.broker.list", "192.168.63.145:9092");
        prop.put("serializer.class", "kafka.serializer.StringEncoder");
        // prop.put("partitioner.class", "example.producer.SimplePartitioner");
        prop.put("request.required.acks", "1");


        ProducerConfig config = new ProducerConfig(prop);
        producer = new Producer<String, String>(config);
        List<Employee> emp = reader.readJsonFile(jsonPath);     
        for (Employee employee : emp) 
        {
            System.out.println("---------------------");
            System.out.println(employee.toString());
            System.out.println("---------------------");
            message = new KeyedMessage<String, String>(topic, employee.toString());

            producer.send(message);
            producer.close();

        }
         System.out.println("Messages to Kafka successfully");
    }

И код для чтения файла json:

public List<Employee> readJsonFile(String path) throws FileNotFoundException, IOException, ParseException{
        Employee employee = new Employee();
        parser=new JSONParser();
        Object obj = parser.parse(new FileReader(path));
        JSONObject jsonObject = (JSONObject) obj;
        employee.setId(Integer.parseInt(jsonObject.get("id").toString()));      
        employee.setName((String)jsonObject.get("name"));
        employee.setSalary(Integer.parseInt(jsonObject.get("salary").toString()));
        list.add(employee);
        return list;
    }

Но когда я запускаю программу, ПРОБЛЕМА 1:

> [root@sandbox ~]# java -jar sparkkafka.jar /root/customer.json
> JSON FILE PATH IS : /root/customer.json
>  log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties). log4j:WARN Please
> initialize the log4j system properly.
> 1,Smith,25
> Exception in thread "main" kafka.common.FailedToSendMessageException: Failed to send messages
> after 3 tries.
>         at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:91)
>         at kafka.producer.Producer.send(Producer.scala:77)
>         at kafka.javaapi.producer.Producer.send(Producer.scala:33)
>         at com.up.jsonType.FlatFileDataProducer.run(FlatFileDataProducer.java:41)
>         at com.up.jsonType.FlatFileDataProducer.main(FlatFileDataProducer.java:49)

Это дает ошибку, но когда я проверяю оболочку cosumer, я получаю следующее: ДЛЯ ОДНОЙ СТРОКИ В ФАЙЛЕ JSON Я ВИЖУ 4 записи в оболочке.. ПРОБЛЕМА 2:

[root@sandbox bin]# [root@sandbox bin]# ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic JsonTopic --from-beginning

1,Smith,25
1,Smith,25
1,Smith,25
1,Smith,25

Я получаю в 4 раза больше строк для одних и тех же данных.


Ответы:


1

Вам необходимо удалить оба следующих свойства:

    //prop.put("request.required.acks", "1");
    //prop.put("producer.type","async");

Это свойство действительно позаботится о подтверждении.

07.04.2016

2

Можете ли вы попробовать добавить свойство ниже:

prop.put("producer.type","async");
01.04.2016
Новые материалы

Кластеризация: более глубокий взгляд
Кластеризация — это метод обучения без учителя, в котором мы пытаемся найти группы в наборе данных на основе некоторых известных или неизвестных свойств, которые могут существовать. Независимо от..

Как написать эффективное резюме
Предложения по дизайну и макету, чтобы представить себя профессионально Вам не позвонили на собеседование после того, как вы несколько раз подали заявку на работу своей мечты? У вас может..

Частный метод Python: улучшение инкапсуляции и безопасности
Введение Python — универсальный и мощный язык программирования, известный своей простотой и удобством использования. Одной из ключевых особенностей, отличающих Python от других языков, является..

Как я автоматизирую тестирование с помощью Jest
Шутка для победы, когда дело касается автоматизации тестирования Одной очень важной частью разработки программного обеспечения является автоматизация тестирования, поскольку она создает..

Работа с векторными символическими архитектурами, часть 4 (искусственный интеллект)
Hyperseed: неконтролируемое обучение с векторными символическими архитектурами (arXiv) Автор: Евгений Осипов , Сачин Кахавала , Диланта Хапутантри , Тимал Кемпития , Дасвин Де Сильва ,..

Понимание расстояния Вассерштейна: мощная метрика в машинном обучении
В обширной области машинного обучения часто возникает необходимость сравнивать и измерять различия между распределениями вероятностей. Традиционные метрики расстояния, такие как евклидово..

Обеспечение масштабируемости LLM: облачный анализ с помощью AWS Fargate и Copilot
В динамичной области искусственного интеллекта все большее распространение получают модели больших языков (LLM). Они жизненно важны для различных приложений, таких как интеллектуальные..