Nano Hash - криптовалюты, майнинг, программирование

Обещание закрытия в цикле

Каждую секунду я получаю строки данных от Kafka. Для каждой партии данных я вставляю в свою базу данных.

Мое приложение продолжает читать последние message и id каждой партии. Проблема здесь в том, что обещания выполняются не последовательно, а одновременно после завершения одного пакета, и они продолжают читать одни и те же message и id. Я хочу, чтобы каждое обещание имело свои собственные message и id, как определено порядком, в котором они пришли из цикла for в первой функции.

Я думаю, что мне нужно использовать закрытие, однако я не уверен, как я могу применить их здесь. Я не хочу использовать таймеры!

Спасибо!

// This is live data, coming in concurrently, forever. Promises from previous batch must be resolved before the next batch is received.
batchOfRows.on('message', function (data) {
    for (var i = 0; i < batchOfRows.rows.length; i++) {
        validate(batchOfRows.rows[i])
            .then(result => console.log(result))
            .catch(error => console.log(error));
    }
});

// For each row received, give it an ID and then insert into the DB
function validate(data) {
    return new Promise((resolve, reject) => {
        message = data;
        id = message.date + message.location
        DB.execute('select * from table1 where id = ?', id) // This is a promise function provided by the database driver (Cassandra)
            .then(result => {
                // Insert into the table at this ID
                insertIntoDB(message, id)
                    .then(result => resolve(result))
                    .catch(error => reject(error));
            })
            .catch(error => {
                reject(error);
            });
    });
}

// Inserting into DB
function insertIntoDB(message, id) {
    return new Promise((resolve, reject) => {
        query = "insert into table2 where id = ? and messageBody = ?";

        DB.execute(query, [id, JSON.Stringify(message)])
            .then(result => resolve("Successfully inserted message ID " + id))
            .catch(error => reject("Error inserting!"));
    });
}

EDIT (решение Danh):

var kafka = require('kafka-node');
client = new kafka.Client("localhost:2181"), Consumer = kafka.Consumer;
// This is like an event listener.
batchOfRows = new Consumer(
    client, [{
        topic: 'my_topic',
        partition: 0,
        offset: 0
    }], {
        fromOffset: false
    }
);

let results = [];
let promises = Promise.resolve();

function processQueue() {
    queue.forEach(element => {
        promises = promises.then(element.map(processElement)).then(elementResult => {
            // results.push(elementResult); // Don't want result to increase in size! I have put this inside insertDB then I clear it below
            console.log(results.length); // First received batch prints: 0. Second received batch prints 72. Third received batch prints 75
            results = [];  
            queue.shift();
        });
    });
}

batchOfRows.on('message', function (data) {
    console.log(batchOfRows.value.length); // First received batch prints: 72. Second received batch prints 75. Third received batch prints 76
    queue.push(batchOfRows.rows);
    processQueue();
});

function processElement(data) {
    const id = data.date + data.location
    return  DB.execute('select * from table1 where id = ?', id)
              .then(result => insertIntoDB(data, id).then(() => result));
}

function insertIntoDB(message, id) {
    const query = "insert into table2 where id = ? and messageBody = ?";
    return DB.execute(query, [id, JSON.Stringify(message)])
        .then(result => {
            // Pushing the result here
            results.push(result); // Seems like it does not push the results from the first batch from batchOfRows until it receives the second batch
            console.log("Test") // On the first batch prints "Test" 72 times right away
        });
}

EDIT Я немного изменил функцию processQueue, добавив element.map(processUpdate), потому что пакеты, полученные от batchOfRows, на самом деле являются массивами, и мне нужно выполнить этот запрос БД для каждого элемента внутри этого массива.

Я также удалил results.push(elementResult), потому что по какой-то причине elementResult на самом деле не определен. Я переместил results.push(elementResult) в insertIntoDB и назвал его results.push(result). Возможно, здесь возникает ошибка (я не знаю, как вернуть результат из insertIntoDB обратно в вызывающую функцию обещания processQueue).

Если вы взглянете на insertIntoDB, если я выполню console.log("test"), он напечатает test столько же раз, сколько элементов в массиве batchOfRows, что означает, что он разрешил все промисы в этом пакете. Таким образом, в первом пакете/сообщении, если есть 72 строки, будет напечатано «Тест» 72 раза. Но если я изменю этот console.log("Test") на просто results.push(result) или даже results.push("test"), а затем напечатаю results.length, он все равно даст мне 0, пока не завершится второй пакет хотя я ожидаю, что длина будет 72.


  • Возможный дубликат обещания JavaScript ES6 для цикла 23.06.2018
  • Вот что меня беспокоило, когда мы обсуждали этот пост: одновременный доступ к очереди. См. этот пост (на нем присутствовали самые выдающиеся люди с тегом обещания на SO, но без ответа). stackoverflow.com/questions/26756463/ Одна из причин, по которой я предложил запись всех данных в базу данных в том виде, в каком она появилась, а затем обработка очереди, сохраненной в базе данных, в отдельном процессе должна была позволить базе данных обеспечить атомарность этих операций. Я думаю, что это было бы более профессиональным решением, чем то, что я предложил 25.06.2018
  • Говоря более прямо, может ли предложенное мной решение выполняться push в очереди в тот самый момент, когда в ней выполняется shift другое обещание? 25.06.2018

Ответы:


1

Может быть полезно немного абстрагироваться от идей и явно представить их в данных (а не в данных, неявно сохраненных в промисах). Начните с очереди:

let queue = [];

Добавляйте вещи в очередь с помощью queue.push(element) и получайте и удаляйте в порядке поступления с помощью element = queue.shift()

Наша цель — обработать все, что находится в очереди, по порядку, сохранив результаты по порядку. Сама обработка является асинхронной, и мы хотим закончить один элемент очереди перед запуском следующего, поэтому нам нужна цепочка промисов (называемая promises) для обработки очереди:

let results = [];
let promises = Promise.resolve();

function processQueue() {
    queue.forEach(element => {
        promises = promises.then(processElement(element)).then(elementResult => {
            results.push(elementResult);
            queue.shift();
        });
    });
}

Мы можем убедить себя, что это правильно, даже не задумываясь о том, что делает processElement(), пока он возвращает обещание. (В случае OP это обещание — это обещание иметь дело с массивом «строк»). processElement() сделает свое дело, и результат (массив результатов в случае OP) будет перемещен в results.

Уверен, что порядок операций имеет смысл, когда поступает новый пакет, добавьте его в очередь, а затем обработайте все, что находится в очереди:

batchOfRows.on('message', function (data) {
    queue.push(batchOfRows.rows);
    processQueue();
});

Нам просто нужно определить processElement(). Используйте полезные предложения @YuryTarabanko для этого здесь (и оставьте его ответ помеченным как правильный, ИМО)

function processElement(data) {
    const id = data.date + data.location
    return  DB.execute('select * from table1 where id = ?', id)
              .then(result => insertIntoDB(data, id).then(() => result));
}

function insertIntoDB(message, id) {
    const query = "insert into table2 where id = ? and messageBody = ?";
    return DB.execute(query, [id, JSON.Stringify(message)])
}

Одним приятным побочным эффектом этого является то, что вы можете измерить прогресс. Если входные данные поступают слишком быстро, то выражение:

queue.length - results.length

... будет расти со временем.

EDIT Глядя на новый код, я недоумеваю, почему запрос выполняется для каждой строки (каждого элемента в batchOfRows.rows). Поскольку результат этого запроса игнорируется, не делайте этого...

function processElement(data) {
    const id = data.date + data.location
    // we know everything we need to know to call insert (data and id)
    // just call it and return what it returns :-)
    return insertIntoDB(data, id);
}

Теперь я понимаю, что это будет долгоиграющая задача, и она не должна накапливать результаты (даже линейно). Более чистое исправление для этого — удалить все ссылки на массив results, который я предложил. Минимальная версия вставки просто вставляет и возвращает результат вставки...

function insertIntoDB(message, id) {
    const query = "insert into table2 where id = ? and messageBody = ?";
    return DB.execute(query, [id, JSON.Stringify(message)]);
}

Я думаю, вы добавили некоторый код для регистрации результатов (лучше проверить, что это сработало, было бы проверить базу данных через какой-то внешний процесс, но если вы хотите войти в журнал, просто не забудьте пропустить значение результата после регистрации.

anyPromise.then(result => {
    console.log(result);
    return result;  // IMPORTANT
})
23.06.2018
  • Спасибо за это отличное решение! Я уверен, что многие люди найдут это решение таким же полезным, как и я. Я на самом деле собираюсь попытаться реализовать этот метод тоже. 23.06.2018
  • Один вопрос: возможно ли оставить его работающим на долгое-долгое время? (Возможно, навсегда)? Не станет ли память проблемой? 23.06.2018
  • Рост памяти будет (весьма нелинейной) функцией скорости поступления пакета и времени, необходимого для его обработки. Если вы можете выполнить партию до того, как прибудет следующая (в среднем), то объем памяти вообще не увеличится. Однако, поскольку новые пакеты появляются до того, как предыдущие пакеты будут завершены, память будет увеличиваться экспоненциально. 23.06.2018
  • Пакет каждую секунду, каждый пакет содержит ~70 строк. думаю должно быть нормально? 23.06.2018
  • Мое предположение тоже подойдет. Попробуйте с ведением журнала, чтобы увидеть, растет ли очередь. 23.06.2018
  • Я заменил results.push внутри processElement на console.log(results.length). Я помещаю результат своего запроса в массив результатов после завершения вставкиIntoDB. Всякий раз, когда я получаю первую партию (т.е. на первой итерации), длина массива результатов равна 0, хотя console.log(batchOfRows.rows) дает фактическое значение для первого сообщения. Что здесь может быть не так? Вроде есть смещение. 24.06.2018
  • Я не уверен. Просматривая код еще раз, я не очень понимаю параметр данных анонимной функции, передаваемой в .on('message, function(data)...` Я также не вижу определения batchOfRows. Может быть, добавить немного больше контекста в вопрос? 24.06.2018
  • Я думаю, что если бы я знал, как вернуть результат из insertIntoDB обратно в вызывающую функцию обещания processQueue, это, вероятно, решило бы проблему. 24.06.2018
  • Я могу продолжить изучение кода, но беглый просмотр показывает, что ваша отладочная модификация insertIntoDB добавила блок then в db.execute. Это обещание выполнения db теперь вернет все, что вернет функция then, а это ничего. После console.log("Test") просто добавьте возврат того, что вы хотите вернуть, например return results; 24.06.2018
  • Обратите внимание, что функции одиночного выражения со стрелкой => возвращаются неявно, но когда есть фигурные скобки { }, необходимо возвращать явно с оператором return. 24.06.2018
  • Я все еще получаю неопределенность для elementResult даже после явного возврата результатов. Я делаю что-то неправильно в этой строке: promises.then(element.map(processElement)).then(elementResult =› {//здесь }); Примечание. Я также делаю небольшие изменения непосредственно перед вызовом insertIntoDB внутри .then() вызывающего db.execute, однако я также удостоверился, что возвращает insertIntoDB. Я обновлю пост. 24.06.2018
  • Давайте продолжим обсуждение в чате. 24.06.2018

  • 2

    В вашем коде есть различные антипаттерны. Во-первых, вам не нужно вручную создавать обещание, скорее всего, вам никогда не понадобится вызывать new Promise. Во-вторых, вы нарушаете цепочку обещаний, не возвращая вложенное обещание из обработчика onFulfill. И, наконец, вы загрязняете глобальную область видимости, когда не объявляете переменные id = message.date + message.location

    // This is live data, coming in concurrently, forever. Promises from previous batch must be resolved before the next batch is received.
    let pending = Promise.resolve([]); // previous batch starting w/ resolved promise
    batchOfRows.on('message', function (data) {
        // not sure where was batchRows comming from in your code
        const nextBatch = () => Promise.all(
          data.batchOfRows.rows.map(validate)
        );
    
        // reassign pending to a new promise
        // whatever happend to previous promise we keep running
        pending = pending
          .then(nextBatch)
          .catch(e => console.error(e))
    });
    
    // For each row received, give it an ID and then insert into the DB
    function validate(data) {
        const id = data.date + data.location
        return  DB.execute('select * from table1 where id = ?', id)
                  .then(result => insertIntoDB(data, id).then(() => result));
    }
    
    // Inserting into DB
    function insertIntoDB(message, id) {
        const query = "insert into table2 where id = ? and messageBody = ?";
        return DB.execute(query, [id, JSON.Stringify(message)])
    }
    
    23.06.2018
  • Большое спасибо! Не могли бы вы объяснить мне по-английски, как именно const nextBatch = () => Promise.all(data.batchOfRows.rows.map(validate)); работает? Для чего нужна анонимная функция? Кроме того, вы делаете () => результат несколько раз внутри возвращаемых промисов, для чего это нужно и где они заканчиваются? 23.06.2018
  • @ stark0323, только что читал эту ветку, как раз собирался предложить более явную структуру данных (по крайней мере, для ясности), но похоже, что вы ее решили. В этом ответе объявляется небольшая вспомогательная функция с именем nextBatch, которая создает массив обещаний, вызывая проверку каждого элемента (map) из batchOfRows.rows, а затем передает этот массив обещаний Promise.all(). 23.06.2018
  • @danh Я (и, возможно, многие другие) определенно нашел бы ваше решение полезным (если не более полезным). Не могли бы вы поделиться своим кодом для ясности? 23.06.2018
  • Я согласен с этим ответом. Я добавил один ниже, в котором изложены цели с немного большим количеством данных, представленных в явном виде, но он заимствован из приведенного здесь кода, поэтому я бы посоветовал оставить этот пункт помеченным как правильный. 23.06.2018
  • Новые материалы

    Кластеризация: более глубокий взгляд
    Кластеризация — это метод обучения без учителя, в котором мы пытаемся найти группы в наборе данных на основе некоторых известных или неизвестных свойств, которые могут существовать. Независимо от..

    Как написать эффективное резюме
    Предложения по дизайну и макету, чтобы представить себя профессионально Вам не позвонили на собеседование после того, как вы несколько раз подали заявку на работу своей мечты? У вас может..

    Частный метод Python: улучшение инкапсуляции и безопасности
    Введение Python — универсальный и мощный язык программирования, известный своей простотой и удобством использования. Одной из ключевых особенностей, отличающих Python от других языков, является..

    Как я автоматизирую тестирование с помощью Jest
    Шутка для победы, когда дело касается автоматизации тестирования Одной очень важной частью разработки программного обеспечения является автоматизация тестирования, поскольку она создает..

    Работа с векторными символическими архитектурами, часть 4 (искусственный интеллект)
    Hyperseed: неконтролируемое обучение с векторными символическими архитектурами (arXiv) Автор: Евгений Осипов , Сачин Кахавала , Диланта Хапутантри , Тимал Кемпития , Дасвин Де Сильва ,..

    Понимание расстояния Вассерштейна: мощная метрика в машинном обучении
    В обширной области машинного обучения часто возникает необходимость сравнивать и измерять различия между распределениями вероятностей. Традиционные метрики расстояния, такие как евклидово..

    Обеспечение масштабируемости LLM: облачный анализ с помощью AWS Fargate и Copilot
    В динамичной области искусственного интеллекта все большее распространение получают модели больших языков (LLM). Они жизненно важны для различных приложений, таких как интеллектуальные..