基于内存,持久化,Redis 和 RabbitMQ

在用户注册的场景中,服务端接收到提交的表单内容,将数据写入数据库,随后发送一封注册成功邮件。 通常处理请求参数,操作数据库,调用 STMP 邮件服务器发送邮件都写在一个请求处理函数中。这样的问题是任何一步发生错误,都需要重试。这还只是一个简单场景,如果流程复杂,依赖外部系统众多,显然问题就不可避免了。

我们引入消息队列,producer 负责向队列中推送消息,而 worker 负责消费。

使用消息队列的好处是显而易见的:

  1. 解耦了 producer 和 worker。尤其是像发送邮件这样对实时性要求不高的步骤,如果这一步操作失败,我们显然不需要回滚之前对数据库的修改,只需要在合适的时间重试就行了。
  2. 实现重试逻辑变得简单。另外,当进程出现错误,不至于丢失全部状态,甚至可以从错误中恢复。
  3. 分布式和扩展性。配合 cluster,创建多个 worker 变得简单。
  4. 让外部系统更容易集成进来,相比在一个请求回调函数中完成所有逻辑处理。

简单的内存队列

首先实现一个内存中的队列管理,使用 async 创建队列。

// 创建并发度为1的异步队列
const queue = async.queue(work, 1);

function work(item, cb) {
    ensureConnected(function() {
        domotic.command(item.command, item.options, callback);
    });

    function callback(err) {
        // 出错重试
        if (err && err.code == 'ECONN') {
            connected = false;
            work(item, cb);
        } else cb(err);
    }
}
// 推送消息到队列中
queue.push(work, cb);

这样一个简单的 demo 存在的问题就是所有状态存在于内存中,一旦崩溃就无法恢复了。另外内存毕竟有限,对于一个长时间运行的系统,迟早是会超出内存限制的。

通过 LevelDB 持久化

持久化队列是个不错的改进方向。LevelDB 是一个键值对数据库,不需要安装,可以在指定目录下创建。另外,使用配套的 level-jobs 可以快捷的创建队列。

const level = require('level');
// 创建本地数据库
const db = level('./event/db');
const Jobs = require('level-jobs');
const maxConcurrency = 1;
// 创建队列,并发度1
const queue = Jobs(db, worker, maxConcurrency);

module.exports = queue;

function worker(id, payload, cb) {
    sendEventToRemoteService(payload, (err) => {
        if (err) {
            console.error('Error processing event %s: %s',payload.id, err.message);
        }
        else {
            console.log('event %s successfully relayed', payload.id);
        }
        cb(err);
    });
}
// 模拟复杂异步处理流程
function sendEventToRemoteService(payload, cb) {
    setTimeout(() => {
        let err;
        if (Math.random() > 0.5) {
            err = Error('something awful has happened');
        }
        cb(err);
    }, 100);
}

通过queue.push(payload)可以向队列中推送消息。

const relay = require('./event_relay');
for(let i = 0; i < 10; i++) {
    relay.push({
        id: i,
        event: 'door opened'
    });
}

现在我们解决了持久化队列的问题,但是我们的 worker 是单线程运行的。如果任务是 CPU 密集型的,还是需要分配负载到多个线程上。

Redis 和多线程

simple-redis-safe-work-queue 是一个基于 Redis 的分布式 worker/producer 的队列解决方案。 创建 worker,设置重试次数。

const request = require('request');
const Queue = require('simple-redis-safe-work-queue')
// 设置重试次数
const workerOptions = {
  maxRetries: 2
};

const worker = Queue.worker('invoke webhook', invokeWebhook, workerOptions);

function invokeWebhook(webhook, cb) {
    console.log('invoke webhook: %j', webhook);

    request(webhook, done);

    function done(err, res) {
        if (!err && (res.statusCode < 200 || res.statusCode >= 300)) {
            err = Error('response status code was ' + res.statusCode);
        }
        cb(err);
    }
}

worker.on('max retries', function(err, payload) {
    console.error(
        'max retries reached trying to talk to %s.: %s\nrequest params: %j',
        payload.url, err.stack, payload);
});

而对于 producer,在一个实际的例子中,在数据库操作完成后向队列推送消息,并在 server 停止时关闭队列。

const Queue = require('simple-redis-safe-work-queue')
const webhookQueueClient = Queue.client('invoke webhook');
const server = Server();
server.listen(8080);
server.post('/some/important/action', (req, res, next) => {
    db.insert(someDoc, err => {
        if (err) res.send(err);
        else {
            webhookQueueClient.push({
                url: 'http://example.com',
                method: 'POST',
                json: {
                    a: 1,
                    b: 2
                }
            }, pushedWebhookWork);
        }

        function pushedWebhookWork(err) {
            if (err) res.stats(500).send(err);
            else res.stats(201).send({ok: true});
        }
    });
});
// 关闭队列
server.once('close', () => {
    webhookQueueClient.quit();
});

这样我们可以启动多个 worker 线程消费队列。Redis 在处理简单的场景中足够了,如果对可扩展性和稳定性有更高要求,就需要更为成熟的平台了。

RabbitMQ

通过 homebrew 安装后。启动服务器:/usr/local/sbin/rabbitmq-server

创建 channel,无论成功与否都会触发回调函数。worker 和 producer 都会通过该方法连接队列。

const amqp = require('amqplib/callback_api');
const url = process.env.AMQP_URL || 'amqp://guest:guest@localhost:5672';

module.exports = createQueueChannel;

function createQueueChannel(queue, cb) {
    amqp.connect(url, onceConnected);

    function onceConnected(err, conn) {
        if (err) {
            console.error('Error connecting:', err.stack);
        }
        else {
            console.log('connected');
            conn.createChannel(onceChannelCreated);
        }

        function onceChannelCreated(err, channel) {
            if (err) {
                cb(err);
            }
            else {
                channel.assertQueue(queue, {durable: true}, onceQueueCreated);
            }

            function onceQueueCreated(err) {
                if (err) {
                    cb(err);
                }
                else {
                    cb(null, channel, conn);
                }
            }
        }
    }
}

对于 producer 而言,需要连接 channel,然后向队列发送消息,随后断开连接:

const Channel = require('./channel');
const QUEUE = 'queue';

Channel(QUEUE, (err, channel, conn) => {
    if (err) {
        console.error(err.stack);
    }
    else {
        console.log('channel and queue created');
        let work = 'make me a sandwich';
        channel.sendToQueue(QUEUE, encode(work), {
            persistent: true
        });
        setImmediate(() => {
            channel.close();
            conn.close();
        });
    }
});

而对于每一个 worker,连接成功后,取出队列中的消息内容载荷进行消费,完成后通过ack()通知队列,继续消费下一个消息。而如果队列为空,通过定时器在一定延迟后继续消费。

Channel(QUEUE, (err, channel, conn) => {
    if (err) {
        console.error(err.stack);
    }
    else {
        console.log('channel and queue created');
        consume();
    }

    function consume() {
        channel.get(QUEUE, {}, onConsume);

        function onConsume(err, msg) {
            if (err) {
                console.warn(err.message);
            }
            else if (msg) {
                console.log('consuming %j', msg.content.toString());
                setTimeout(() => {
                    channel.ack(msg);
                    consume();
                }, 1e3);
            }
            else {
                console.log('no message, waiting...');
                setTimeout(consume, 1e3);
            }
        }
    }
});

参考资料