Nano Hash - криптовалюты, майнинг, программирование

Как я могу гарантировать, что мои потребители обрабатывают сообщения в темах кафки по порядку, только один раз?

Я никогда раньше не пользовался Кафкой. У меня есть две тестовые программы Go, обращающиеся к локальному экземпляру kafka: программа чтения и записи. Я пытаюсь настроить параметры моего производителя, потребителя и сервера kafka, чтобы получить определенное поведение.

Мой писатель:

package main

import (
    "fmt"
    "math/rand"
    "strconv"
    "time"

    "github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
    rand.Seed(time.Now().UnixNano())

    topics := []string{
        "policymanager-100",
        "policymanager-200",
        "policymanager-300",
    }
    progress := make(map[string]int)
    for _, t := range topics {
        progress[t] = 0
    }

    producer, err := kafka.NewProducer(&kafka.ConfigMap{
        "bootstrap.servers": "localhost",
        "group.id":          "0",
    })
    if err != nil {
        panic(err)
    }
    defer producer.Close()

    fmt.Println("producing messages...")
    for i := 0; i < 30; i++ {
        index := rand.Intn(len(topics))
        topic := topics[index]
        num := progress[topic]
        num++
        fmt.Printf("%s => %d\n", topic, num)
        msg := &kafka.Message{
            Value: []byte(strconv.Itoa(num)),
            TopicPartition: kafka.TopicPartition{
                Topic: &topic,
            },
        }
        err = producer.Produce(msg, nil)
        if err != nil {
            panic(err)
        }
        progress[topic] = num
        time.Sleep(time.Millisecond * 100)
    }
    fmt.Println("DONE")
}

На моей локальной кафке есть три темы: policymanager-100, policymanager-200, policymanager-300. У каждого из них есть только 1 раздел, чтобы гарантировать, что все сообщения будут отсортированы по времени их получения kafka. Мой писатель случайным образом выберет одну из этих тем и выдаст сообщение, состоящее из числа, которое увеличивается исключительно для этой темы. Когда он закончит работу, я ожидаю, что очереди будут выглядеть примерно так (названия тем сокращены для разборчивости):

100: 1 2 3 4 5 6 7 8 9 10 11
200: 1 2 3 4 5 6 7
300: 1 2 3 4 5 6 7 8 9 10 11 12

Все идет нормально. Я пытаюсь настроить все так, чтобы любое количество потребителей могло быть раскручено и потреблять эти сообщения по порядку. Под «по порядку» я подразумеваю, что ни один потребитель не должен получать сообщение 2 для темы 100 до тех пор, пока сообщение 1 не будет ЗАВЕРШЕНО (а не только что запущено). Если сообщение 1 для темы 100 обрабатывается, потребители могут свободно потреблять из других тем, в которых в настоящее время нет обрабатываемого сообщения. Если сообщение темы было отправлено потребителю, вся эта тема должна стать «заблокированной» до тех пор, пока либо тайм-аут не предполагает, что потребитель не прошел, либо потребитель не зафиксирует сообщение, а затем тема «разблокируется», чтобы сделать следующее сообщение. доступны для потребления.

Мой читатель:

package main

import (
    "fmt"
    "time"

    "github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
    count := 2
    for i := 0; i < count; i++ {
        go consumer(i + 1)
    }
    fmt.Println("cosuming...")
    // hold this thread open indefinitely
    select {}
}

func consumer(id int) {
    c, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers":  "localhost",
        "group.id":           "0", // strconv.Itoa(id),
        "enable.auto.commit": "false",
    })
    if err != nil {
        panic(err)
    }

    c.SubscribeTopics([]string{`^policymanager-.+$`}, nil)
    for {
        msg, err := c.ReadMessage(-1)
        if err != nil {
            panic(err)
        }

        fmt.Printf("%d) Message on %s: %s\n", id, msg.TopicPartition, string(msg.Value))
        time.Sleep(time.Second)
        _, err = c.CommitMessage(msg)
        if err != nil {
            fmt.Printf("ERROR commiting: %+v\n", err)
        }
    }
}

Насколько я понимаю, способ, которым я, скорее всего, добьюсь этого, заключается в правильной настройке моего потребителя. Я пробовал много разных вариантов этой программы. Я пробовал, чтобы все мои горутины использовали одного и того же потребителя. Я пытался использовать разные group.id для каждой горутины. Ни одна из них не была правильной конфигурацией, чтобы получить поведение, которое мне нужно.

То, что делает опубликованный код, — это удаление одной темы за раз. Несмотря на наличие нескольких горутин, процесс будет читать все 100, затем перейдет к 200, затем к 300, и только одна горутина фактически выполнит все чтение. Когда я позволяю каждой горутине иметь разные group.id, тогда сообщения считываются несколькими горутинами, что я хотел бы предотвратить.

В моем примере потребитель просто разбивает вещи с помощью горутин, но когда я начну внедрять этот проект в свой вариант использования на работе, мне понадобится это для работы с несколькими экземплярами kubernetes, которые не будут взаимодействовать друг с другом, поэтому использовать все, что взаимодействует между горутины не будут работать, как только на 2 кубах будет 2 экземпляра. Вот почему я надеюсь, что Кафка будет охранять ворота, как я хочу.



Ответы:


1

Вообще говоря, вы не можете. Даже если бы у вас был один потребитель, использующий все разделы для темы, разделы использовались бы в недетерминированном порядке, и ваш общий порядок во всех разделах не был бы гарантирован.

Попробуйте Keyed Messages, думаю, вы можете найти это полезным для вашего варианта использования.

24.01.2019
Новые материалы

Кластеризация: более глубокий взгляд
Кластеризация — это метод обучения без учителя, в котором мы пытаемся найти группы в наборе данных на основе некоторых известных или неизвестных свойств, которые могут существовать. Независимо от..

Как написать эффективное резюме
Предложения по дизайну и макету, чтобы представить себя профессионально Вам не позвонили на собеседование после того, как вы несколько раз подали заявку на работу своей мечты? У вас может..

Частный метод Python: улучшение инкапсуляции и безопасности
Введение Python — универсальный и мощный язык программирования, известный своей простотой и удобством использования. Одной из ключевых особенностей, отличающих Python от других языков, является..

Как я автоматизирую тестирование с помощью Jest
Шутка для победы, когда дело касается автоматизации тестирования Одной очень важной частью разработки программного обеспечения является автоматизация тестирования, поскольку она создает..

Работа с векторными символическими архитектурами, часть 4 (искусственный интеллект)
Hyperseed: неконтролируемое обучение с векторными символическими архитектурами (arXiv) Автор: Евгений Осипов , Сачин Кахавала , Диланта Хапутантри , Тимал Кемпития , Дасвин Де Сильва ,..

Понимание расстояния Вассерштейна: мощная метрика в машинном обучении
В обширной области машинного обучения часто возникает необходимость сравнивать и измерять различия между распределениями вероятностей. Традиционные метрики расстояния, такие как евклидово..

Обеспечение масштабируемости LLM: облачный анализ с помощью AWS Fargate и Copilot
В динамичной области искусственного интеллекта все большее распространение получают модели больших языков (LLM). Они жизненно важны для различных приложений, таких как интеллектуальные..