Nous allons ici comment utiliser des services RabbitMQ pour executer ses tâches dans Express Proprement.
Première étape, si vous êtes comme moi dans Express.js, il vous faut créer un Service RabbitMQ pour les publishers, ce sont les publishers qui remplisse les tâches à faire pour les consumers de Rabbit.
Voici un exemple avec la queue Rabbit "import"
// créé un fichier dans /services/rabbitmqpublisher.js
'use strict';
//lien rabbitMQ de connexion
let rabbitmq_connector = 'amqp://guest:root@localhost:5672';
//ouverture de la connexion rabbitMQ avec la librairie 'amqplib'
//pour l'installer sur votre projet avec npm : npm install amqplib --save
let Open = require('amqplib').connect(rabbitmq_connector);
exports.import_publish = function(msg) {
//définition de la queue import
let q = 'import';
Open.then(function(conn) {
return conn.createChannel();//ouverture du channel sur lequel nous allons diffuser.
}).then(function(ch) {
//définition d'une queue durable, c'est à dire qu'elle restera après traitement.
return ch.assertQueue(q, {durable: true}).then(function() {
//envoie du message à la queue, attention celà prend uniquement des strings, du fait que je fais JSON.stringify(msg) car msg dans mon exemple est un JSON.
ch.sendToQueue(q, new Buffer(JSON.stringify(msg)));
//fermeture du channel après envoie des informations, important car sinon les channels restent ouvert et celà fait planter les rabbitMQ à force :).
return ch.close();
});
}).catch(console.warn);
};
Et ensuite le worker (consumer) qui va faire le job, je vous invite à le déclarer comme ceci :
// créé un fichier dans /workers/rabbitmq_consumers.js
'use strict';
//lien rabbitMQ de connexion
let rabbitmq_connector = 'amqp://guest:root@localhost:5672';
//ouverture de la connexion rabbitMQ avec la librairie 'amqplib'
//pour l'installer sur votre projet avec npm : npm install amqplib --save
let Open = require('amqplib').connect(rabbitmq_connector);
let import_consume = function()
{
let q = 'import';
// Consumer
Open.then(function(conn) {
process.once('SIGINT', function() { conn.close(); });
return conn.createChannel()//connexion au channel ou creation de celui ci.
.then(function(ch) {
let ok = ch.assertQueue(q, {durable: true});// connexion à la queue durable
ok = ok.then(function () {
ch.prefetch(2);// nombre de traitement simultanée lancé
});
ok = ok.then(function () {
ch.consume(q, doWork, {noAck: false});//execution de la function doWork et à chaque retour, passage au message suivant.
//console.log(' [*] Waiting for messages. To exit press CTRL+C');
});
return ok;
function doWork(msg) {
let body = msg.content.toString();
//console.log(' [x] Received \'%s\'', body);
let secs = body.split('.').length - 1;
let thread = JSON.parse(body);//récupération du message et on reparse le JSON encodé précédemment en string.
//TODO traitement voulu (update de base, import de datas, etc...)
//console.log(' [x] Task takes %d seconds', secs);
setTimeout(function () {
//console.log(' [x] Done');
ch.ack(msg);//nettoyage du message.
}, secs * 1000);
}
});
}).catch(console.warn);
};
(function on_load() {
import_consume();
})();
Ainsi depuis vos controllers ou models Express, vous pourriez appeller le publisher import pour déclencher des tâches de traitement suivant le comportement de votre utilisateur de la façon suivante :
let ServiceRabbit = _app.services.rabbitmqpublisher;
/**
* {post} /api/url/todo
*
* @param req
* @param res
*/
exports.create = function (req, res) {
//todo traitement
ServiceRabbit.import_publish(message);
};