Я никогда раньше не пользовался Кафкой. У меня есть две тестовые программы Go, обращающиеся к локальному экземпляру kafka: программа чтения и записи. Я пытаюсь настроить параметры моего производителя, потребителя и сервера kafka, чтобы получить определенное поведение.
Мой писатель:
package main
import (
"fmt"
"math/rand"
"strconv"
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
rand.Seed(time.Now().UnixNano())
topics := []string{
"policymanager-100",
"policymanager-200",
"policymanager-300",
}
progress := make(map[string]int)
for _, t := range topics {
progress[t] = 0
}
producer, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "localhost",
"group.id": "0",
})
if err != nil {
panic(err)
}
defer producer.Close()
fmt.Println("producing messages...")
for i := 0; i < 30; i++ {
index := rand.Intn(len(topics))
topic := topics[index]
num := progress[topic]
num++
fmt.Printf("%s => %d\n", topic, num)
msg := &kafka.Message{
Value: []byte(strconv.Itoa(num)),
TopicPartition: kafka.TopicPartition{
Topic: &topic,
},
}
err = producer.Produce(msg, nil)
if err != nil {
panic(err)
}
progress[topic] = num
time.Sleep(time.Millisecond * 100)
}
fmt.Println("DONE")
}
На моей локальной кафке есть три темы: policymanager-100, policymanager-200, policymanager-300. У каждого из них есть только 1 раздел, чтобы гарантировать, что все сообщения будут отсортированы по времени их получения kafka. Мой писатель случайным образом выберет одну из этих тем и выдаст сообщение, состоящее из числа, которое увеличивается исключительно для этой темы. Когда он закончит работу, я ожидаю, что очереди будут выглядеть примерно так (названия тем сокращены для разборчивости):
100: 1 2 3 4 5 6 7 8 9 10 11
200: 1 2 3 4 5 6 7
300: 1 2 3 4 5 6 7 8 9 10 11 12
Все идет нормально. Я пытаюсь настроить все так, чтобы любое количество потребителей могло быть раскручено и потреблять эти сообщения по порядку. Под «по порядку» я подразумеваю, что ни один потребитель не должен получать сообщение 2 для темы 100 до тех пор, пока сообщение 1 не будет ЗАВЕРШЕНО (а не только что запущено). Если сообщение 1 для темы 100 обрабатывается, потребители могут свободно потреблять из других тем, в которых в настоящее время нет обрабатываемого сообщения. Если сообщение темы было отправлено потребителю, вся эта тема должна стать «заблокированной» до тех пор, пока либо тайм-аут не предполагает, что потребитель не прошел, либо потребитель не зафиксирует сообщение, а затем тема «разблокируется», чтобы сделать следующее сообщение. доступны для потребления.
Мой читатель:
package main
import (
"fmt"
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
count := 2
for i := 0; i < count; i++ {
go consumer(i + 1)
}
fmt.Println("cosuming...")
// hold this thread open indefinitely
select {}
}
func consumer(id int) {
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost",
"group.id": "0", // strconv.Itoa(id),
"enable.auto.commit": "false",
})
if err != nil {
panic(err)
}
c.SubscribeTopics([]string{`^policymanager-.+$`}, nil)
for {
msg, err := c.ReadMessage(-1)
if err != nil {
panic(err)
}
fmt.Printf("%d) Message on %s: %s\n", id, msg.TopicPartition, string(msg.Value))
time.Sleep(time.Second)
_, err = c.CommitMessage(msg)
if err != nil {
fmt.Printf("ERROR commiting: %+v\n", err)
}
}
}
Насколько я понимаю, способ, которым я, скорее всего, добьюсь этого, заключается в правильной настройке моего потребителя. Я пробовал много разных вариантов этой программы. Я пробовал, чтобы все мои горутины использовали одного и того же потребителя. Я пытался использовать разные group.id
для каждой горутины. Ни одна из них не была правильной конфигурацией, чтобы получить поведение, которое мне нужно.
То, что делает опубликованный код, — это удаление одной темы за раз. Несмотря на наличие нескольких горутин, процесс будет читать все 100, затем перейдет к 200, затем к 300, и только одна горутина фактически выполнит все чтение. Когда я позволяю каждой горутине иметь разные group.id
, тогда сообщения считываются несколькими горутинами, что я хотел бы предотвратить.
В моем примере потребитель просто разбивает вещи с помощью горутин, но когда я начну внедрять этот проект в свой вариант использования на работе, мне понадобится это для работы с несколькими экземплярами kubernetes, которые не будут взаимодействовать друг с другом, поэтому использовать все, что взаимодействует между горутины не будут работать, как только на 2 кубах будет 2 экземпляра. Вот почему я надеюсь, что Кафка будет охранять ворота, как я хочу.