Я некоторое время возился с Redis. И, честно говоря, я поражен тем, насколько это мощно.

Мой мини-проект, чтобы узнать это

Чтобы изучить это должным образом, я создал API для планирования заданий cron, которые будут вызывать событие в заданное время.

Эти события будут использоваться рабочими процессами (которых может быть много), что позволит масштабировать процесс.

Проблемы с уже существующими статьями, которые учат этому

Я не смог найти в сети хороших статей о запланированных задачах в узле — в большинстве из них использовалась разновидность setTimeout или node-cron, которые, на мой взгляд, были недостаточно хороши.

Проблема, которая возникла у меня с setTimeout/setInterval, заключалась в том, что они не подходили для работы по настройке запланированных задач.

Кроме того, не было хорошего способа перезапустить их, если сервер простоял.

Мне пришлось бы пересчитывать их интервалы/длительность из некоторых сохраненных данных, что могло вызвать beautiful ошибок.

Проблема с node-cron была похожей — я не мог найти хороший способ удалить эти задания таким образом, чтобы сохранить безгражданство.

Следовательно,

Очень простой сервис cron с вашими собственными работниками

Работников вы можете добавить столько, сколько хотите масштабировать.

Примечание. Также используйте Redis в качестве хранилища данных «ключ-значение». не стесняйтесь заменить ваш выбор базы данных.

Вы можете найти код всего приложения по следующей ссылке https://github.com/Julias0/NodeJobWorker.

Модель работы

Всегда сначала стройте модели

module.exports = function (id, name, jobId, event, createdAt, userId) {
    return {
        id,
        name,
        jobId,
        event,
        createdAt,
        userId
    }
}

API

var router = require('express').Router(); 
    var redisClient = require('../db');
    var uuid = require('uuid').v4;
    var job = require('../models/job');
    var q = require('bull'); // creating an instance of a redis queue
    var taskQ = q('taskQueue');
    var processJob = require('../jobHandler').processJob; // this is the next part

    // on GET request to /job
    router.get('/', function (req, res) {
        redisClient.keys('jobs:*', function (err, response) {
            if (err) {
                console.error('ERROR!'); // lol, i am being lazy here  - but in real life you would throw a 500
                console.log(err);
            }
            redisClient.mget([...response], function (err, response) {
                res.json((response.map(r => JSON.parse(r))));
            });
        });
    });
    
    // if an id is provided, fetch corresponding job
    router.get('/:jobId', function (req, res) {
        redisClient.get(req.params.jobId, function (err, response) {
            if (err) {
                console.log(err);
            }
            res.json(JSON.parse(response));
        });
    });
    
    // to delete an existing job
    router.delete('/:jobId', function (req, res) {
    
        redisClient.get('jobs:' + req.params.jobId, function (err, response) {
            if (err) {
                console.log(err);
            }
    
            taskQ.getRepeatableJobs().then(function (jobs) {
                let jobMap = {};
                jobs.forEach(function (job) {
                    jobMap[job.name] = job.key;
                });
                // there were methods for deleting by name
                // but delete by key was much clean imho, hence
               taskQ.removeRepeatableByKey(jobMap[JSON.parse(response).name]).then(function (response) {
                    redisClient.del('jobs:' + req.params.jobId, function (err, response) {
                        res.json(response);
                    });
                });
            });
        });
    });

    /**
        Sample payload - 
        {
            "event": "eventName", // the event to generate when the cron job triggers
            "userId": "NaNqwe34NANA", // details of the user
            "limit": 100,
            "cron": "* * * * *" 
        }
    **/
    router.post('/', function (req, res) {
        // checking for required fields - i know there are better ways to do this, but lalalalala
        let event = typeof (req.body.event) !== 'undefined' ? req.body.event : false; // to see the paylaods properly, check the github readme
        let userId = typeof (req.body.userId) !== 'undefined' ? req.body.userId : false;
        let limit = typeof (req.body.limit) !== 'undefined' ? req.body.limit : false;
        let cron = typeof (req.body.cron) !== 'undefined' ? req.body.cron : false;
    
        if (event && userId && limit && cron) {
            let jobName = uuid();
            let id = uuid();
    
            // assigning a handler for the given job by name - this is required by the bull queue
            // this will trigger sending the event into the event queue upon the triggering of the  cronjob
            taskQ.process(jobName, function (response) {
                redisClient.get('jobs:' + id, function (err, response) {
                    if (!err) {
                        processJob(JSON.parse(response.data));                    
                    }
                });
            });
    
            taskQ.add(jobName, { id }, {
                repeat: {
                    cron,
                    limit // set limit 1 if you want it to execute once only
                }
            }).then(function (createdJob) {
                let newJob = job(id, jobName, createdJob.id, event, new Date(), userId);
                redisClient.set('jobs:' + id, JSON.stringify(newJob), function (err, response) { // using redis like a database here. Feel free to replace with your choice
                    if (!err) {
                        res.json(newJob);
                    } else {
                        res.status(500).json(err);
                    }
                });
            }).catch(function (err) {
                res.status(500).json(err);
            });
        } else {
            res.status(401).json({
                error: 'required data is not provided'
            });
        }
    });
    
    module.exports = router;

Несколько вспомогательных функций

var q = require('bull');
    var taskQ = q('taskQueue');
    var eventQueue = q('eventQueue');
    var redisClient = require('./db');
    
    
    // this is reponsible for restarting the watchers for the repeating jobs that are running in case the application is stopped. This makes recovery from crashes very easy 
    module.exports.init = function (jobId) {
        redisClient.keys('jobs:*', function (err, jobKeys) { // the process jobs need to be connected again in case of failure
            jobKeys.forEach(function (jobKey) {
                redisClient.get(jobKey, function (err, job) {
                    var parsedJob = JSON.parse(job);
                    taskQ.process(parsedJob.name, function (response) {
                        processJob(parsedJob);
                    });
                });
            });
        });
    };
    
    let processJob = function (job) {
        eventQueue.add({ event: job.event, userId: job.userId});
    };
    
    module.exports.processJob = processJob;

Внутри работник 1

let q = require('bull');
    var eventQueue = q('eventQueue');
    eventQueue.process(function (response) {
        console.log('Handled by worker 1!!!!');
        console.log(response.data); 
        console.log('Handled by worker 1!!!!');
    });

Внутри работник 2

let q = require('bull');
    var eventQueue = q('eventQueue');
    eventQueue.process(function (response) {
        console.log('Handled by worker 2!!!!');
        console.log(response.data); 
        console.log('Handled by worker 2!!!!');
    });

Вывод

Это представляет собой масштабируемое решение для работы с заданиями cron. Если вы хотите расширить это, не стесняйтесь пинговать меня/отправлять запрос на включение в мой репозиторий git.

Читайте больше моих постов на https://www.thegammaray.com/