ENode 1.0 – 消息队列的设计思路

开源地址:https://github.com/tangxuehua/enode

达等同篇稿子,简单介绍了enode框架内部的共同体实现思路,用到了staged
event-driven
architecture的思考。通过前同篇文章,我们清楚了enode内部有星星点点栽阵:command
queue、event queue;用户发送的command会进入command queue排队,domain
model产生的domain event会进event
queue,然后等待被dispatch到具备的event
handlers。本文介绍一下enode框架受到即有限栽信息队列到底是什么样设计的。

先贴一下enode框架底里边贯彻架构图,这样针对性大家知道后面的解析有拉。

图片 1

俺们要什么样的音队列

enode的计划初衷是以单个进程内提供依据DDD+CQRS+EDA的用开发。假设我们的政工需与任何系统相互,那也足以,就是经在event
handler中以及其它外部系统相互,比如广播消息出来要调用远程接口,都可。也许将来,enode也会见安放支持远程消息通信的效用。但是非支持远程通信并无意味enode只能开单机应用了。enode框架需要仓储的数目要有三栽:

  1. 消息,包括command消息及event信息,目前出于性能方面的设想,是储存于mongodb中;之所以要持久化消息是以消息队列里之音未可知少;
  2. 聚合根,聚合根会被序列化,然后存储于内存缓存中,如redis或memcached中;
  3. 事件,就是由聚合根产生的波,事件存储于eventstore中,如mongodb中;

哼,通过地方的解析,我们解enode框架运行时的保有数据,就囤于mongodb和redis这半独地方。而就半栽存储都是布局于单独的服务器上,与web服务器无关。所以运行enode框架的各台web服务器上是不管状态的。所以,我们虽可知便于的对web服务器进行集群,我们得以天天当用户访问量的增加时多新的web服务器,以增长系统的应能力;当然,当你意识随着web服务器的长,导致单台mongodb服务器或单台redis服务器处理不过来成为瓶颈时,也可本着mongodb和redis做集群,或者对数据做sharding(当然这半栽做法不是十分好做,需要对mongodb,redis很熟稔才行),这样尽管好增进mongodb,redis的吞吐量了。

吓了,上面的分析主要是为说明enode框架的采取范围,讨论清楚这无异触及对咱分析需要什么样的音队列有坏死帮助。

现今我们掌握,我们了无待分布式的信息队列了,比如不需要MSMQ、RabbitMQ,等重级成熟之支撑远程消息传递的音信队列了。我们用的音讯队列的特性是:

  1. 基于内存的消息队列;
  2. 则根据内存,但消息不可知少,也便是信而支持持久化;
  3. 消息队列要性能尽量高;
  4. 消息队列里没有信息之时刻,队列的买主未克于CPU空转,CPU空转会直接招CPU占用100%,导致机器无法工作;
  5. 要是支持多只顾客线程同时起队列取消息,但是跟一个音只能给一个顾客处理,也就是一个消息未克同时深受简单独顾客获得走,也就是是要是支持并发的dequeue;
  6. 亟需平等栽设计,实现信息至少会于处理同不良;具体指:消息于消费者获得走然后让处理的经过被,如果没拍卖成(消费者自己清楚有没有出处理成)或者从未曾来得急处理(比如那时正断电了),那用一致种设计,可以我们来机遇再次消费该消息;
  7. 盖咱们做不至100%勿见面更处理一个音讯,所以我们的有所消息消费者若硬着头皮做到支持等幂操作,就是再的操作不会见滋生副作用;比如插入前先查询是否在即是同样栽支持等覆盖的点子;这或多或少,框架会尽可能提供支持等覆盖的逻辑,当然,用户自己在筹划command
    handler或event handler时,也要尽可能考虑当覆盖的问题。注意:一般command
    handler不用考虑,我们主要要考虑的是event
    handler。原因,下次篇被又精心谈吧。

外存队列的宏图

内存队列,特点是快。但是咱不光是用赶紧,还要会支撑并发的入队和出对。那么看起ConcurrentQueue<T>似乎能满足我们的求了,一方面性能还可,另一方面内置支持了起操作。但是发生同等沾并未满足,那就算是我们要当排里无消息的下,队列的消费者莫可知叫CPU空转,CPU空转会直接导致CPU占用100%,导致机器无法工作。幸运的是,.net中为出一个支持这种功效的聚众,那就是:BlockingCollection<T>,这种集能提供于排内无元素的时光block当前线程的功力。我们得以为此以下的方来实例化一个行:

private BlockingCollection<T> _queue = new BlockingCollection<T>(new ConcurrentQueue<T>());

连发入队的时段,我们设写下面的代码即可:

_queue.Add(message);

并发出队的时节,只要:

_queue.Take();

俺们不难看出,ConcurrentQueue<T>是提供了队列加并发访问的支撑,而BlockingCollection<T>是在这基础及又增加blocking线程的功用。

举凡不是非常简单,经过我的测试,BlockingCollection<T>的特性已颇好,每秒10万糟入队出对一定没问题,所以无需顾虑成为瓶颈。

关于Disruptor的调研:

了解过LMAX搭的心上人应该听说过Disruptor,LMAX架构能支持各国秒处理600W订单,而且是单线程。这个速度是勿是死惊人?大家发出趣味之得错过了解下。LMAX架构是完全in
memory的架构,所有的工作逻辑依据纯内存实现,粗粒度的架构图如下:

图片 2

  1. Business Logic Processor完全在in memory中跑,简称BLP;
  2. Input Disruptor是一模一样种奇特之基于内存运行的环形队列(基于相同栽为Ring
    Buffer的环形数据结构),负责接收信息,然后于BLP处理消息;
  3. Output
    Disruptor也是同一的班,负责用BLP产生的事件发布出去,给外部组件消费,外部组件消费后或者而见面发新的音塞入到Input
    Disruptor;

LMAX架构之所以会如此快,除了一心依据in
memory的架构外,还归功给延迟率在纳秒级别的disruptor队列组件。下面是disruptor与java中之Array
Blocking Queue的延迟率对比图:

图片 3

ns是纳秒,我们得起数额上看到,Disruptor的延迟时间比Array Blocking
Queue快的莫是一个多少级。所以,当初LMAX架构下时,一时很轰动。我早已为本着斯架构很奇异,但以小细节问题没想掌握,就非敢贸然实施。

透过上面的辨析,我们知晓,Disruptor也是同样种队列,并且也全然可替代BlockingCollection,但是以我们的BlockingCollection目前早就满足我们的得,且少未见面成为瓶颈,所以,我小没有行使Disruptor来兑现我们的内存队列。关于LMAX架构,大家还可看一下这篇自己原先写的文章。

班消息之持久化

俺们不光要一个赛性能都支持并发的内存队列,还要支撑排消息的持久化功能,这样我们才会管信息未会见掉,从而才能够提信息至少给处理同不成。

那么消息啊时持久化?

当我们发送一个音于班,一旦发生成功,我们终将认为信已经不见面废弃了。所以,很显著,消息队列中肯定是如以接到到入队之信不时事先持久化该消息,然后才会回到。

那么哪些快速之持久化呢?

先是个想法:

基于txt文本文件之相继写。原理是:当消息入队时,将信息序列化为文本,然后append到一个txt1文书;当消息给处理了以后,再管欠消息append到外一个txt2文件;然后,如果手上机械没再开,那内存队列里时存在的音信就是是还无被拍卖的音讯;如果机器还开了,那什么样掌握什么样信息还从未为拍卖?很简短,就是针对性比txt1,txt2眼看有限只公文文件,然后使是txt1面临在,但是txt2遭到莫设有的消息,就觉得是没为处理了,那要在enode框架启动时读取txt1受到这些从没给处理的音文本,反序列化为信息对象,然后再放入内存队列,然后起拍卖。这个思路其实十分好,关键之某些,这种做法性质大强。因为我们掌握各个写文本文件是挺快的,经过自身之测试,每秒200W行普通消息的文件不在话下。这代表我们各秒可以持久化200W独消息,当然实际上我们一定及不交者大之速,因为消息的序列化性能上不顶这速度,所以瓶颈是当序列化上面。但是,通过这种持久化消息之思路,也会见产生很多细节问题较为难化解,比如txt文件进一步老,怎么惩罚?txt文件不好管理及掩护,万一不小心给人抹了为?还有,如何比较及时半只txt文件?按行比较呢?不行,因为消息入队的逐条及处理的逐条不必然同,比如command就是这么,当用户发送一个command到行列,但是处理的时节发现第一软由出现冲突,导致command执行没有得逞,所以会重试command,如果重试成功了,然后持久化该command,但是咱领略,此时持久化的时,它的依次也许就于后头的command的末端了。所以,我们不克按行比较;那么就使按部就班信息的ID比较了?就算能够成功,那这比较过程为是雅耗时的,假设txt1发出100W个信息;txt2遭遇来80W单消息,那如以ID来比较txt1遭哪20W独消息还尚未被处理,有啊算法能高效比较出来吧?所以,我们发现,这个思路或生诸多细节问题用考虑。

老二个想法:

用NoSQL来存储消息,通过一些思维与比后,觉得还是MongoDB比较适当。一方面MongoDB实际上有的存取操作优先采取内存,也就是说不会见就持久化到磁盘。所以性能非常快。另一方面,mongodb支持保险的持久化功能,可以放心的用来持久化消息。性能方面,虽然没有写txt那么快,但为基本会接受了。因为咱们究竟非是漫天网站的富有用户要的command都是身处一个排,如果我们的网站用户量很充分,那必将会为此web服务器集群,且每个集群机器及还见面起随地一个command
queue,所以,单个command
queue里的信我们可以操纵也非会见太多,而且,单个command
queue里的音信都是坐落不同之mongodb
collection中储存;当然持久化瓶颈永远是IO,所以真要快,那只能一个独的mongodb
server上统筹一个collection,该collection存放一个command
queue里的音;其他的command
queue的音信就是吧使用这样的做法在另外的mongodb
server上;这样便会成就IO的互相,从而从上增强持久化速度。但是这么做代价十分酷之,可能要广大机械也,整个系统发生微个queue,那即便需要多少台机器,呵呵。总而言之,持久化方面,我们尚是来一部分措施可以去尝试,还有优化的退路。

更回过头来简单说一下,采用mongodb来持久化消息的兑现思路:入队之时光持久化消息,出队的时去该消息;这样当机还开时,要查有队列有微消息,只要经过一个简的查询返回mongodb
collection中即在的音即可。这种做法设计简单,稳定,性能方面即该还好承受。所以,目前enode就是应用这种方式来持久化所有enode用到之内存队列的信息。

代码示意,有趣味之好望:

图片 4图片 5

    public abstract class QueueBase<T> : IQueue<T> where T : class, IMessage
    {
        #region Private Variables

        private IMessageStore _messageStore;
        private BlockingCollection<T> _queue = new BlockingCollection<T>(new ConcurrentQueue<T>());
        private ReaderWriterLockSlim _enqueueLocker = new ReaderWriterLockSlim();
        private ReaderWriterLockSlim _dequeueLocker = new ReaderWriterLockSlim();

        #endregion

        public string Name { get; private set; }
        protected ILogger Logger { get; private set; }

        public QueueBase(string name)
        {
            if (string.IsNullOrEmpty(name))
            {
                throw new ArgumentNullException("name");
            }

            Name = name;
            _messageStore = ObjectContainer.Resolve<IMessageStore>();
            Logger = ObjectContainer.Resolve<ILoggerFactory>().Create(GetType().Name);
        }

        public void Initialize()
        {
            _messageStore.Initialize(Name);
            var messages = _messageStore.GetMessages<T>(Name);
            foreach (var message in messages)
            {
                _queue.Add(message);
            }
            OnInitialized(messages);
        }
        protected virtual void OnInitialized(IEnumerable<T> initialQueueMessages) { }

        public void Enqueue(T message)
        {
            _enqueueLocker.AtomWrite(() =>
            {
                _messageStore.AddMessage(Name, message);
                _queue.Add(message);
            });
        }
        public T Dequeue()
        {
            return _queue.Take();
        }
        public void Complete(T message)
        {
            _dequeueLocker.AtomWrite(() =>
            {
                _messageStore.RemoveMessage(Name, message);
            });
        }
    }

View Code

何以确保信息至少吃处理同涂鸦

思路应该好易想到,就是先期管信从内存队列dequeue出来,然后交由消费者处理,然后由消费者告诉我们当下消息是否受处理了,如果无给拍卖好,那要尝试重试处理,如果重试几潮后还是十分,那也不能够将消息丢弃了,但为无克凭终止的直接只处理这个信息,所以需要把欠消息丢到另外一个特别用来拍卖得重试的当地纯内存队列。如果消息给处理成了,那便把欠消息从持久化设备中删去即可。看一下代码比较清晰吧:

    private void ProcessMessage(TMessageExecutor messageExecutor)
    {
        var message = _bindingQueue.Dequeue();
        if (message != null)
        {
            ProcessMessageRecursively(messageExecutor, message, 0, 3);
        }
    }
    private void ProcessMessageRecursively(TMessageExecutor messageExecutor, TMessage message, int retriedCount, int maxRetryCount)
    {
        var result = ExecuteMessage(messageExecutor, message); //这里表示在消费(即处理)消息

        //如果处理成功了,就通知队列从持久化设备删除该消息,通过调用Complete方法实现
        if (result == MessageExecuteResult.Executed)
        {
            _bindingQueue.Complete(message);
        }
        //如果处理失败了,就重试几次,目前是3次,如果还是失败,那就丢到一个重试队列,进行永久的定时重试
        else if (result == MessageExecuteResult.Failed)
        {
            if (retriedCount < maxRetryCount)
            {
                _logger.InfoFormat("Retring to handle message:{0} for {1} times.", message.ToString(), retriedCount + 1);
                ProcessMessageRecursively(messageExecutor, message, retriedCount + 1, maxRetryCount);
            }
            else
            {
                //这里是丢到一个重试队列,进行永久的定时重试,目前是每隔5秒重试一下,_retryQueue是一个简单的内存队列,也是一个BlockingCollection<T>
                _retryQueue.Add(message);
            }
        }
    }

代码应该好知了,我虽不多举行说明了。

总结:

本文主要介绍了enode框架中信息队列的设计思路,因为enode中起command
queue和event
queue,两栽queue,所以逻辑是看似之;所以当还眷恋讨论一下如何抽象和计划性这些queue,已错过丢重代码。但时间未早了,下次复详尽讲吧。

网站地图xml地图