Nano Hash - криптовалюты, майнинг, программирование

Приложение Kafka streams не запускается

У меня есть потоковое приложение Kafka, которое считывает тему data, созданную производителем консоли. У меня есть несколько шагов в приложении, которые создают две таблицы KTable, к которым я затем хочу присоединиться.

Каждая KTable создается успешно, и я даже могу вызвать toStream, а затем peek значения для консоли по отдельности. Как только я пытаюсь соединить KTables вместе, приложение даже не запускается, т.е. введение строки bar.join(qux).toStream() вызывает панику ниже. Похоже, что KTables bar и qux созданы.

Вот вывод, который я получаю как сообщение об ошибке:

2020-02-14 15:56:28.599  INFO  AssignorConfiguration - stream-thread [foo-d2f546ef-f7eb-4088-ae04-1943ed71f7a4-StreamThread-1-consumer] Cooperative rebalancing enabled now
2020-02-14 15:56:28.630  WARN  ConsumerConfig - The configuration 'admin.retries' was supplied but isn't a known config.
2020-02-14 15:56:28.630  WARN  ConsumerConfig - The configuration 'admin.retry.backoff.ms' was supplied but isn't a known config.
2020-02-14 15:56:28.630  INFO  AppInfoParser - Kafka version: 2.4.0
2020-02-14 15:56:28.630  INFO  AppInfoParser - Kafka commitId: 77a89fcf8d7fa018
2020-02-14 15:56:28.630  INFO  AppInfoParser - Kafka startTimeMs: 1581695788630
2020-02-14 15:56:28.636  INFO  KafkaStreams - stream-client [foo-d2f546ef-f7eb-4088-ae04-1943ed71f7a4] State transition from CREATED to REBALANCING
2020-02-14 15:56:28.636  INFO  StreamThread - stream-thread [foo-d2f546ef-f7eb-4088-ae04-1943ed71f7a4-StreamThread-1] Starting
2020-02-14 15:56:28.636  INFO  StreamThread - stream-thread [foo-d2f546ef-f7eb-4088-ae04-1943ed71f7a4-StreamThread-1] State transition from CREATED to STARTING
2020-02-14 15:56:28.637  INFO  KafkaConsumer - [Consumer clientId=foo-d2f546ef-f7eb-4088-ae04-1943ed71f7a4-StreamThread-1-consumer, groupId=foo] Subscribed to pattern: 'foo-KSTREAM-AGGREGATE-STATE-STORE-0000000009-repartition|foo-KSTREAM-AGGREGATE-STATE-STORE-0000000016-repartition|foo-KSTREAM-AGGREGATE-STATE-STORE-0000000022-repartition|foo-KSTREAM-AGGREGATE-STATE-STORE-0000000029-repartition|data'
2020-02-14 15:56:28.906  INFO  Metadata - [Consumer clientId=foo-d2f546ef-f7eb-4088-ae04-1943ed71f7a4-StreamThread-1-consumer, groupId=foo] Cluster ID: ghhNsZUZRSGD984ra7fXRg
2020-02-14 15:56:28.907  INFO  AbstractCoordinator - [Consumer clientId=foo-d2f546ef-f7eb-4088-ae04-1943ed71f7a4-StreamThread-1-consumer, groupId=foo] Discovered group coordinator 10.1.36.24:9092 (id: 2147483647 rack: null)
2020-02-14 15:56:28.915  INFO  AbstractCoordinator - [Consumer clientId=foo-d2f546ef-f7eb-4088-ae04-1943ed71f7a4-StreamThread-1-consumer, groupId=foo] (Re-)joining group
2020-02-14 15:56:28.920  INFO  AbstractCoordinator - [Consumer clientId=foo-d2f546ef-f7eb-4088-ae04-1943ed71f7a4-StreamThread-1-consumer, groupId=foo] (Re-)joining group
2020-02-14 15:56:28.925  ERROR StreamThread - stream-thread [foo-d2f546ef-f7eb-4088-ae04-1943ed71f7a4-StreamThread-1] Encountered the following error during processing:
java.lang.IllegalArgumentException: Number of partitions must be at least 1.
    at org.apache.kafka.streams.processor.internals.InternalTopicConfig.setNumberOfPartitions(InternalTopicConfig.java:62) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:473) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:548) ~[kafka-clients-2.4.0.jar:?]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:650) ~[kafka-clients-2.4.0.jar:?]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1300(AbstractCoordinator.java:111) ~[kafka-clients-2.4.0.jar:?]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:572) ~[kafka-clients-2.4.0.jar:?]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:555) ~[kafka-clients-2.4.0.jar:?]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1026) ~[kafka-clients-2.4.0.jar:?]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1006) ~[kafka-clients-2.4.0.jar:?]
    at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) ~[kafka-clients-2.4.0.jar:?]
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) ~[kafka-clients-2.4.0.jar:?]
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) ~[kafka-clients-2.4.0.jar:?]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599) ~[kafka-clients-2.4.0.jar:?]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:409) ~[kafka-clients-2.4.0.jar:?]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294) ~[kafka-clients-2.4.0.jar:?]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) ~[kafka-clients-2.4.0.jar:?]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212) ~[kafka-clients-2.4.0.jar:?]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:400) ~[kafka-clients-2.4.0.jar:?]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340) ~[kafka-clients-2.4.0.jar:?]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471) ~[kafka-clients-2.4.0.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267) ~[kafka-clients-2.4.0.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) ~[kafka-clients-2.4.0.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) ~[kafka-clients-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:843) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:743) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671) ~[kafka-streams-2.4.0.jar:?]
2020-02-14 15:56:28.925  INFO  StreamThread - stream-thread [foo-d2f546ef-f7eb-4088-ae04-1943ed71f7a4-StreamThread-1] State transition from STARTING to PENDING_SHUTDOWN
2020-02-14 15:56:28.925  INFO  StreamThread - stream-thread [foo-d2f546ef-f7eb-4088-ae04-1943ed71f7a4-StreamThread-1] Shutting down
2020-02-14 15:56:28.925  INFO  KafkaConsumer - [Consumer clientId=foo-d2f546ef-f7eb-4088-ae04-1943ed71f7a4-StreamThread-1-restore-consumer, groupId=null] Unsubscribed all topics or patterns and assigned partitions
2020-02-14 15:56:28.932  INFO  StreamThread - stream-thread [foo-d2f546ef-f7eb-4088-ae04-1943ed71f7a4-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD
2020-02-14 15:56:28.932  INFO  KafkaStreams - stream-client [foo-d2f546ef-f7eb-4088-ae04-1943ed71f7a4] State transition from REBALANCING to ERROR
2020-02-14 15:56:28.932  ERROR KafkaStreams - stream-client [foo-d2f546ef-f7eb-4088-ae04-1943ed71f7a4] All stream threads have died. The instance will be in error state and should be closed.
2020-02-14 15:56:28.932  INFO  StreamThread - stream-thread [foo-d2f546ef-f7eb-4088-ae04-1943ed71f7a4-StreamThread-1] Shutdown complete
2020-02-14 15:56:28.934  INFO  KafkaStreams - stream-client [foo-d2f546ef-f7eb-4088-ae04-1943ed71f7a4] State transition from ERROR to PENDING_SHUTDOWN
2020-02-14 15:56:28,935 kafka-streams-close-thread WARN [AsyncContext@18b4aac2] Ignoring log event after log4j was shut down.
2020-02-14 15:56:28,936 kafka-streams-close-thread WARN Ignoring log event after log4j was shut down
2020-02-14 15:56:28,938 kafka-streams-close-thread WARN [AsyncContext@18b4aac2] Ignoring log event after log4j was shut down.
2020-02-14 15:56:28,939 kafka-streams-close-thread WARN Ignoring log event after log4j was shut down
2020-02-14 15:56:28,939 Thread-1 WARN [AsyncContext@18b4aac2] Ignoring log event after log4j was shut down.
2020-02-14 15:56:28,939 Thread-1 WARN Ignoring log event after log4j was shut down

В чем причина этого? Есть ли какая-то волшебная конфигурация, которую мне нужно включить, чтобы иметь дело с дополнительным хранилищем состояний соединения, которое я пытаюсь ввести?



Ответы:


1

Переход на версию 2.3.1 устранил проблему, описанную здесь: Issues.apache.org/jira/browse/KAFKA-9335.

Спасибо Спасое Петрониевич.

17.02.2020
Новые материалы

Кластеризация: более глубокий взгляд
Кластеризация — это метод обучения без учителя, в котором мы пытаемся найти группы в наборе данных на основе некоторых известных или неизвестных свойств, которые могут существовать. Независимо от..

Как написать эффективное резюме
Предложения по дизайну и макету, чтобы представить себя профессионально Вам не позвонили на собеседование после того, как вы несколько раз подали заявку на работу своей мечты? У вас может..

Частный метод Python: улучшение инкапсуляции и безопасности
Введение Python — универсальный и мощный язык программирования, известный своей простотой и удобством использования. Одной из ключевых особенностей, отличающих Python от других языков, является..

Как я автоматизирую тестирование с помощью Jest
Шутка для победы, когда дело касается автоматизации тестирования Одной очень важной частью разработки программного обеспечения является автоматизация тестирования, поскольку она создает..

Работа с векторными символическими архитектурами, часть 4 (искусственный интеллект)
Hyperseed: неконтролируемое обучение с векторными символическими архитектурами (arXiv) Автор: Евгений Осипов , Сачин Кахавала , Диланта Хапутантри , Тимал Кемпития , Дасвин Де Сильва ,..

Понимание расстояния Вассерштейна: мощная метрика в машинном обучении
В обширной области машинного обучения часто возникает необходимость сравнивать и измерять различия между распределениями вероятностей. Традиционные метрики расстояния, такие как евклидово..

Обеспечение масштабируемости LLM: облачный анализ с помощью AWS Fargate и Copilot
В динамичной области искусственного интеллекта все большее распространение получают модели больших языков (LLM). Они жизненно важны для различных приложений, таких как интеллектуальные..