Каждую секунду я получаю строки данных от Kafka. Для каждой партии данных я вставляю в свою базу данных.
Мое приложение продолжает читать последние message
и id
каждой партии. Проблема здесь в том, что обещания выполняются не последовательно, а одновременно после завершения одного пакета, и они продолжают читать одни и те же message
и id
. Я хочу, чтобы каждое обещание имело свои собственные message
и id
, как определено порядком, в котором они пришли из цикла for в первой функции.
Я думаю, что мне нужно использовать закрытие, однако я не уверен, как я могу применить их здесь. Я не хочу использовать таймеры!
Спасибо!
// This is live data, coming in concurrently, forever. Promises from previous batch must be resolved before the next batch is received.
batchOfRows.on('message', function (data) {
for (var i = 0; i < batchOfRows.rows.length; i++) {
validate(batchOfRows.rows[i])
.then(result => console.log(result))
.catch(error => console.log(error));
}
});
// For each row received, give it an ID and then insert into the DB
function validate(data) {
return new Promise((resolve, reject) => {
message = data;
id = message.date + message.location
DB.execute('select * from table1 where id = ?', id) // This is a promise function provided by the database driver (Cassandra)
.then(result => {
// Insert into the table at this ID
insertIntoDB(message, id)
.then(result => resolve(result))
.catch(error => reject(error));
})
.catch(error => {
reject(error);
});
});
}
// Inserting into DB
function insertIntoDB(message, id) {
return new Promise((resolve, reject) => {
query = "insert into table2 where id = ? and messageBody = ?";
DB.execute(query, [id, JSON.Stringify(message)])
.then(result => resolve("Successfully inserted message ID " + id))
.catch(error => reject("Error inserting!"));
});
}
EDIT (решение Danh):
var kafka = require('kafka-node');
client = new kafka.Client("localhost:2181"), Consumer = kafka.Consumer;
// This is like an event listener.
batchOfRows = new Consumer(
client, [{
topic: 'my_topic',
partition: 0,
offset: 0
}], {
fromOffset: false
}
);
let results = [];
let promises = Promise.resolve();
function processQueue() {
queue.forEach(element => {
promises = promises.then(element.map(processElement)).then(elementResult => {
// results.push(elementResult); // Don't want result to increase in size! I have put this inside insertDB then I clear it below
console.log(results.length); // First received batch prints: 0. Second received batch prints 72. Third received batch prints 75
results = [];
queue.shift();
});
});
}
batchOfRows.on('message', function (data) {
console.log(batchOfRows.value.length); // First received batch prints: 72. Second received batch prints 75. Third received batch prints 76
queue.push(batchOfRows.rows);
processQueue();
});
function processElement(data) {
const id = data.date + data.location
return DB.execute('select * from table1 where id = ?', id)
.then(result => insertIntoDB(data, id).then(() => result));
}
function insertIntoDB(message, id) {
const query = "insert into table2 where id = ? and messageBody = ?";
return DB.execute(query, [id, JSON.Stringify(message)])
.then(result => {
// Pushing the result here
results.push(result); // Seems like it does not push the results from the first batch from batchOfRows until it receives the second batch
console.log("Test") // On the first batch prints "Test" 72 times right away
});
}
EDIT Я немного изменил функцию processQueue, добавив element.map(processUpdate), потому что пакеты, полученные от batchOfRows, на самом деле являются массивами, и мне нужно выполнить этот запрос БД для каждого элемента внутри этого массива.
Я также удалил results.push(elementResult), потому что по какой-то причине elementResult на самом деле не определен. Я переместил results.push(elementResult) в insertIntoDB и назвал его results.push(result). Возможно, здесь возникает ошибка (я не знаю, как вернуть результат из insertIntoDB обратно в вызывающую функцию обещания processQueue).
Если вы взглянете на insertIntoDB, если я выполню console.log("test"), он напечатает test столько же раз, сколько элементов в массиве batchOfRows, что означает, что он разрешил все промисы в этом пакете. Таким образом, в первом пакете/сообщении, если есть 72 строки, будет напечатано «Тест» 72 раза. Но если я изменю этот console.log("Test") на просто results.push(result) или даже results.push("test"), а затем напечатаю results.length, он все равно даст мне 0, пока не завершится второй пакет хотя я ожидаю, что длина будет 72.
.on('message
, function(data)...` Я также не вижу определения batchOfRows. Может быть, добавить немного больше контекста в вопрос? 24.06.2018insertIntoDB
добавила блок then в db.execute. Это обещание выполнения db теперь вернет все, что вернет функция then, а это ничего. Послеconsole.log("Test")
просто добавьте возврат того, что вы хотите вернуть, напримерreturn results;
24.06.2018=>
возвращаются неявно, но когда есть фигурные скобки{ }
, необходимо возвращать явно с оператором return. 24.06.2018