У меня есть потоковое приложение Kafka, которое считывает тему data
, созданную производителем консоли. У меня есть несколько шагов в приложении, которые создают две таблицы KTable, к которым я затем хочу присоединиться.
Каждая KTable создается успешно, и я даже могу вызвать toStream
, а затем peek
значения для консоли по отдельности. Как только я пытаюсь соединить KTables вместе, приложение даже не запускается, т.е. введение строки bar.join(qux).toStream()
вызывает панику ниже. Похоже, что KTables bar
и qux
созданы.
Вот вывод, который я получаю как сообщение об ошибке:
2020-02-14 15:56:28.599 INFO AssignorConfiguration - stream-thread [foo-d2f546ef-f7eb-4088-ae04-1943ed71f7a4-StreamThread-1-consumer] Cooperative rebalancing enabled now
2020-02-14 15:56:28.630 WARN ConsumerConfig - The configuration 'admin.retries' was supplied but isn't a known config.
2020-02-14 15:56:28.630 WARN ConsumerConfig - The configuration 'admin.retry.backoff.ms' was supplied but isn't a known config.
2020-02-14 15:56:28.630 INFO AppInfoParser - Kafka version: 2.4.0
2020-02-14 15:56:28.630 INFO AppInfoParser - Kafka commitId: 77a89fcf8d7fa018
2020-02-14 15:56:28.630 INFO AppInfoParser - Kafka startTimeMs: 1581695788630
2020-02-14 15:56:28.636 INFO KafkaStreams - stream-client [foo-d2f546ef-f7eb-4088-ae04-1943ed71f7a4] State transition from CREATED to REBALANCING
2020-02-14 15:56:28.636 INFO StreamThread - stream-thread [foo-d2f546ef-f7eb-4088-ae04-1943ed71f7a4-StreamThread-1] Starting
2020-02-14 15:56:28.636 INFO StreamThread - stream-thread [foo-d2f546ef-f7eb-4088-ae04-1943ed71f7a4-StreamThread-1] State transition from CREATED to STARTING
2020-02-14 15:56:28.637 INFO KafkaConsumer - [Consumer clientId=foo-d2f546ef-f7eb-4088-ae04-1943ed71f7a4-StreamThread-1-consumer, groupId=foo] Subscribed to pattern: 'foo-KSTREAM-AGGREGATE-STATE-STORE-0000000009-repartition|foo-KSTREAM-AGGREGATE-STATE-STORE-0000000016-repartition|foo-KSTREAM-AGGREGATE-STATE-STORE-0000000022-repartition|foo-KSTREAM-AGGREGATE-STATE-STORE-0000000029-repartition|data'
2020-02-14 15:56:28.906 INFO Metadata - [Consumer clientId=foo-d2f546ef-f7eb-4088-ae04-1943ed71f7a4-StreamThread-1-consumer, groupId=foo] Cluster ID: ghhNsZUZRSGD984ra7fXRg
2020-02-14 15:56:28.907 INFO AbstractCoordinator - [Consumer clientId=foo-d2f546ef-f7eb-4088-ae04-1943ed71f7a4-StreamThread-1-consumer, groupId=foo] Discovered group coordinator 10.1.36.24:9092 (id: 2147483647 rack: null)
2020-02-14 15:56:28.915 INFO AbstractCoordinator - [Consumer clientId=foo-d2f546ef-f7eb-4088-ae04-1943ed71f7a4-StreamThread-1-consumer, groupId=foo] (Re-)joining group
2020-02-14 15:56:28.920 INFO AbstractCoordinator - [Consumer clientId=foo-d2f546ef-f7eb-4088-ae04-1943ed71f7a4-StreamThread-1-consumer, groupId=foo] (Re-)joining group
2020-02-14 15:56:28.925 ERROR StreamThread - stream-thread [foo-d2f546ef-f7eb-4088-ae04-1943ed71f7a4-StreamThread-1] Encountered the following error during processing:
java.lang.IllegalArgumentException: Number of partitions must be at least 1.
at org.apache.kafka.streams.processor.internals.InternalTopicConfig.setNumberOfPartitions(InternalTopicConfig.java:62) ~[kafka-streams-2.4.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:473) ~[kafka-streams-2.4.0.jar:?]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:548) ~[kafka-clients-2.4.0.jar:?]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:650) ~[kafka-clients-2.4.0.jar:?]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1300(AbstractCoordinator.java:111) ~[kafka-clients-2.4.0.jar:?]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:572) ~[kafka-clients-2.4.0.jar:?]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:555) ~[kafka-clients-2.4.0.jar:?]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1026) ~[kafka-clients-2.4.0.jar:?]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1006) ~[kafka-clients-2.4.0.jar:?]
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) ~[kafka-clients-2.4.0.jar:?]
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) ~[kafka-clients-2.4.0.jar:?]
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) ~[kafka-clients-2.4.0.jar:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599) ~[kafka-clients-2.4.0.jar:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:409) ~[kafka-clients-2.4.0.jar:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294) ~[kafka-clients-2.4.0.jar:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) ~[kafka-clients-2.4.0.jar:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212) ~[kafka-clients-2.4.0.jar:?]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:400) ~[kafka-clients-2.4.0.jar:?]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340) ~[kafka-clients-2.4.0.jar:?]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471) ~[kafka-clients-2.4.0.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267) ~[kafka-clients-2.4.0.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) ~[kafka-clients-2.4.0.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) ~[kafka-clients-2.4.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:843) ~[kafka-streams-2.4.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:743) ~[kafka-streams-2.4.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) ~[kafka-streams-2.4.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671) ~[kafka-streams-2.4.0.jar:?]
2020-02-14 15:56:28.925 INFO StreamThread - stream-thread [foo-d2f546ef-f7eb-4088-ae04-1943ed71f7a4-StreamThread-1] State transition from STARTING to PENDING_SHUTDOWN
2020-02-14 15:56:28.925 INFO StreamThread - stream-thread [foo-d2f546ef-f7eb-4088-ae04-1943ed71f7a4-StreamThread-1] Shutting down
2020-02-14 15:56:28.925 INFO KafkaConsumer - [Consumer clientId=foo-d2f546ef-f7eb-4088-ae04-1943ed71f7a4-StreamThread-1-restore-consumer, groupId=null] Unsubscribed all topics or patterns and assigned partitions
2020-02-14 15:56:28.932 INFO StreamThread - stream-thread [foo-d2f546ef-f7eb-4088-ae04-1943ed71f7a4-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD
2020-02-14 15:56:28.932 INFO KafkaStreams - stream-client [foo-d2f546ef-f7eb-4088-ae04-1943ed71f7a4] State transition from REBALANCING to ERROR
2020-02-14 15:56:28.932 ERROR KafkaStreams - stream-client [foo-d2f546ef-f7eb-4088-ae04-1943ed71f7a4] All stream threads have died. The instance will be in error state and should be closed.
2020-02-14 15:56:28.932 INFO StreamThread - stream-thread [foo-d2f546ef-f7eb-4088-ae04-1943ed71f7a4-StreamThread-1] Shutdown complete
2020-02-14 15:56:28.934 INFO KafkaStreams - stream-client [foo-d2f546ef-f7eb-4088-ae04-1943ed71f7a4] State transition from ERROR to PENDING_SHUTDOWN
2020-02-14 15:56:28,935 kafka-streams-close-thread WARN [AsyncContext@18b4aac2] Ignoring log event after log4j was shut down.
2020-02-14 15:56:28,936 kafka-streams-close-thread WARN Ignoring log event after log4j was shut down
2020-02-14 15:56:28,938 kafka-streams-close-thread WARN [AsyncContext@18b4aac2] Ignoring log event after log4j was shut down.
2020-02-14 15:56:28,939 kafka-streams-close-thread WARN Ignoring log event after log4j was shut down
2020-02-14 15:56:28,939 Thread-1 WARN [AsyncContext@18b4aac2] Ignoring log event after log4j was shut down.
2020-02-14 15:56:28,939 Thread-1 WARN Ignoring log event after log4j was shut down
В чем причина этого? Есть ли какая-то волшебная конфигурация, которую мне нужно включить, чтобы иметь дело с дополнительным хранилищем состояний соединения, которое я пытаюсь ввести?