基于内存,持久化,Redis 和 RabbitMQ
在用户注册的场景中,服务端接收到提交的表单内容,将数据写入数据库,随后发送一封注册成功邮件。 通常处理请求参数,操作数据库,调用 STMP 邮件服务器发送邮件都写在一个请求处理函数中。这样的问题是任何一步发生错误,都需要重试。这还只是一个简单场景,如果流程复杂,依赖外部系统众多,显然问题就不可避免了。
我们引入消息队列,producer 负责向队列中推送消息,而 worker 负责消费。
使用消息队列的好处是显而易见的:
- 解耦了 producer 和 worker。尤其是像发送邮件这样对实时性要求不高的步骤,如果这一步操作失败,我们显然不需要回滚之前对数据库的修改,只需要在合适的时间重试就行了。
- 实现重试逻辑变得简单。另外,当进程出现错误,不至于丢失全部状态,甚至可以从错误中恢复。
- 分布式和扩展性。配合 cluster,创建多个 worker 变得简单。
- 让外部系统更容易集成进来,相比在一个请求回调函数中完成所有逻辑处理。
简单的内存队列
首先实现一个内存中的队列管理,使用 async 创建队列。
// 创建并发度为1的异步队列
const queue = async.queue(work, 1);
function work(item, cb) {
ensureConnected(function() {
domotic.command(item.command, item.options, callback);
});
function callback(err) {
// 出错重试
if (err && err.code == 'ECONN') {
connected = false;
work(item, cb);
} else cb(err);
}
}
// 推送消息到队列中
queue.push(work, cb);
这样一个简单的 demo 存在的问题就是所有状态存在于内存中,一旦崩溃就无法恢复了。另外内存毕竟有限,对于一个长时间运行的系统,迟早是会超出内存限制的。
通过 LevelDB 持久化
持久化队列是个不错的改进方向。LevelDB 是一个键值对数据库,不需要安装,可以在指定目录下创建。另外,使用配套的 level-jobs
可以快捷的创建队列。
const level = require('level');
// 创建本地数据库
const db = level('./event/db');
const Jobs = require('level-jobs');
const maxConcurrency = 1;
// 创建队列,并发度1
const queue = Jobs(db, worker, maxConcurrency);
module.exports = queue;
function worker(id, payload, cb) {
sendEventToRemoteService(payload, (err) => {
if (err) {
console.error('Error processing event %s: %s',payload.id, err.message);
}
else {
console.log('event %s successfully relayed', payload.id);
}
cb(err);
});
}
// 模拟复杂异步处理流程
function sendEventToRemoteService(payload, cb) {
setTimeout(() => {
let err;
if (Math.random() > 0.5) {
err = Error('something awful has happened');
}
cb(err);
}, 100);
}
通过queue.push(payload)
可以向队列中推送消息。
const relay = require('./event_relay');
for(let i = 0; i < 10; i++) {
relay.push({
id: i,
event: 'door opened'
});
}
现在我们解决了持久化队列的问题,但是我们的 worker 是单线程运行的。如果任务是 CPU 密集型的,还是需要分配负载到多个线程上。
Redis 和多线程
simple-redis-safe-work-queue
是一个基于 Redis 的分布式 worker/producer 的队列解决方案。
创建 worker,设置重试次数。
const request = require('request');
const Queue = require('simple-redis-safe-work-queue')
// 设置重试次数
const workerOptions = {
maxRetries: 2
};
const worker = Queue.worker('invoke webhook', invokeWebhook, workerOptions);
function invokeWebhook(webhook, cb) {
console.log('invoke webhook: %j', webhook);
request(webhook, done);
function done(err, res) {
if (!err && (res.statusCode < 200 || res.statusCode >= 300)) {
err = Error('response status code was ' + res.statusCode);
}
cb(err);
}
}
worker.on('max retries', function(err, payload) {
console.error(
'max retries reached trying to talk to %s.: %s\nrequest params: %j',
payload.url, err.stack, payload);
});
而对于 producer,在一个实际的例子中,在数据库操作完成后向队列推送消息,并在 server 停止时关闭队列。
const Queue = require('simple-redis-safe-work-queue')
const webhookQueueClient = Queue.client('invoke webhook');
const server = Server();
server.listen(8080);
server.post('/some/important/action', (req, res, next) => {
db.insert(someDoc, err => {
if (err) res.send(err);
else {
webhookQueueClient.push({
url: 'http://example.com',
method: 'POST',
json: {
a: 1,
b: 2
}
}, pushedWebhookWork);
}
function pushedWebhookWork(err) {
if (err) res.stats(500).send(err);
else res.stats(201).send({ok: true});
}
});
});
// 关闭队列
server.once('close', () => {
webhookQueueClient.quit();
});
这样我们可以启动多个 worker 线程消费队列。Redis 在处理简单的场景中足够了,如果对可扩展性和稳定性有更高要求,就需要更为成熟的平台了。
RabbitMQ
通过 homebrew 安装后。启动服务器:/usr/local/sbin/rabbitmq-server
。
创建 channel,无论成功与否都会触发回调函数。worker 和 producer 都会通过该方法连接队列。
const amqp = require('amqplib/callback_api');
const url = process.env.AMQP_URL || 'amqp://guest:guest@localhost:5672';
module.exports = createQueueChannel;
function createQueueChannel(queue, cb) {
amqp.connect(url, onceConnected);
function onceConnected(err, conn) {
if (err) {
console.error('Error connecting:', err.stack);
}
else {
console.log('connected');
conn.createChannel(onceChannelCreated);
}
function onceChannelCreated(err, channel) {
if (err) {
cb(err);
}
else {
channel.assertQueue(queue, {durable: true}, onceQueueCreated);
}
function onceQueueCreated(err) {
if (err) {
cb(err);
}
else {
cb(null, channel, conn);
}
}
}
}
}
对于 producer 而言,需要连接 channel,然后向队列发送消息,随后断开连接:
const Channel = require('./channel');
const QUEUE = 'queue';
Channel(QUEUE, (err, channel, conn) => {
if (err) {
console.error(err.stack);
}
else {
console.log('channel and queue created');
let work = 'make me a sandwich';
channel.sendToQueue(QUEUE, encode(work), {
persistent: true
});
setImmediate(() => {
channel.close();
conn.close();
});
}
});
而对于每一个 worker,连接成功后,取出队列中的消息内容载荷进行消费,完成后通过ack()
通知队列,继续消费下一个消息。而如果队列为空,通过定时器在一定延迟后继续消费。
Channel(QUEUE, (err, channel, conn) => {
if (err) {
console.error(err.stack);
}
else {
console.log('channel and queue created');
consume();
}
function consume() {
channel.get(QUEUE, {}, onConsume);
function onConsume(err, msg) {
if (err) {
console.warn(err.message);
}
else if (msg) {
console.log('consuming %j', msg.content.toString());
setTimeout(() => {
channel.ack(msg);
consume();
}, 1e3);
}
else {
console.log('no message, waiting...');
setTimeout(consume, 1e3);
}
}
}
});