有关贯彻一个因文件持久化的EventStore的主干思想

大家懂得enode框架的架是根据ddd+event
sourcing的考虑。我们持久化的不是聚合根的摩登状态,而是聚合根产生的世界事件。最近自己于考虑如何落实一个基于文件的eventstore。目标发出半点独:

1.务必使大性能;
2.支撑聚合根事件之并发持久化,要保管单个聚合根实例不见面保存版本号相同之轩然大波;

事件持久化高性能

经了一番调研,发现用文件存储事件特别适宜。要包强性能,我们可以顺序写文件(append),然后轻易读文件。之所以要自由读文件是以在当某些command由于操作同一个聚合根而碰到并发冲突的当儿,框架需要取该聚合根的所有最新的事件,然后经过event
sourcing重建起时的聚合根,然后重新重试这些遇到并发冲突的command。经过测试,顺序写文件以及无限制读文件都坏快速,每秒100W次顺序写及每秒10W软随机读在自我的记录本上无是题材;因为以enode中,domain是因in-memory架构的,所以我们大少会自eventstore读取事件。所以要是设优化持久化事件的特性。而念事件只有在command遇到并发冲突的下还是系再度开的时段,才发或要打eventstore读取事件。所以各秒10W不善随机读取应该无是问题。当然,关于文件如何勾勒,见底的遗留问题的辨析。

此外一个就是是刷磁盘的题目。我们了解,通过文件流写副数据及文件后,如果不Flush文件流,那数有或还无刷到磁盘。所以必须定时Flush文件流,出于性能及可靠性的权衡,选择定时1s刷一不好磁盘,通过异步线程刷盘。实际上,大部分NoSQL产品都是这般,比如Redis的fsync可以指定为各国隔1s粉一不善AOF日志到磁盘。这样做唯一的问题是断电后或者有失1s的数码,但这个可以经当服务器上配备UPS备用电源确保断电后服务器还会办事,来管断电后还能支撑足够的岁月确保我们把文件流的数量刷到磁盘。这样既解决性能问题,也能担保不丢数据。

事件并作控制

先是,每个聚合根实例有多单事件,每个时刻,每个聚合根可能都见面时有发生多只事件然后如保存到eventstore中。为什么也?因为咱们的domain
model所于的应用服务器一般是集群部署的,所以全有或同一个聚合根在不同的机上于为以以做不同之改,然后来的事件的版本号是一律之,从而就会见促成出现修改和一个聚合根的状态了。

为此,我们要要保的凡,对同一个聚合根实例,产生的轩然大波使版本号相同,则只能发出一个事件能保留成功,其他的道出现冲突,需要报告外部有出现冲突了,然后由外部控制属下去该怎么做。那么怎样保管这或多或少乎?

面前说交,所有聚合根的波还是逐一的法子append到跟一个文书,append事件到文件是手续本身没有办法检查是否来起冲突,文件只能救助我们持久化数据,不担当检查是否发起冲突。那怎么检查出现冲突呢?思路就是是于内存设计一个Dictionary,Dictionary的key为聚合根ID,value保存时聚合根产生的波之无比可怜版号,也不怕是最终一个事件的版本号。

然后有个别独办法可以实现产出冲突之检测:

  1. 富有的风波上eventstore服务器后,先经一个ConcurrentQueue进行排队。所有事件出现上ConcurrentQueue,然后ConcurrentQueue的主顾也单线程。然后我们在单线程内一个个取出ConcurrentQueue中之风波,然后因Dictionary里的情一个个判定当前事件是否生本冲突,如果没有冲突,则优先以事件写副文件,再创新Dictionary里时聚合根的最好充分版本号;这个艺术没有问题,只是效率不是异常大,因为如此相当给对有的聚合根实例的处理都线性化了。实际上,我们期望的凡,只有对同一个聚合根实例的操作是线性化的,而针对不同聚合根实例之间,完全好并行处理;那怎么开也?见第二种植思路。
  2. 首先,所有的风波不要排队了,可以并行处理。但是于各级一个聚合根实例的轩然大波之处理,需要通过原子锁的法子(CAS原理)做并作控制。关键思路是,通过一个字段存储每个聚合根的目前版本号信息,版本号信息遭到设计一个态位用来控制一样时刻只能发出一个线程在转移当前聚合根的版本信息。以这来贯彻对同一个聚合根的拍卖的线性化。然后,当前涂改版状态成功的线程,能够更加做持久化事件之逻辑,但持久化事件前还欲判定当前风波的本是否早已是直的版了(当前事变的版本一定当当前聚合根的最好老本子号+1),以这个来保管与一个聚合根的波序列一定是连接递增的。具体的实现思路见如下的demo代码。

DEMO代码示例、注解

/// <summary>一个结构体,记录当前聚合根的当前版本号,以及用于并发控制的一些状态信息
/// </summary>
class AggregateVersionInfo
{
    public const int Editing = 1;    //一个常量,表示当前聚合根的当前版本号正在被修改
    public const int UnEditing = 0;  //一个常量,表示当前聚合根的当前版本号未在被修改

    public int CurrentVersion = 0;   //记录当前聚合根的当前版本号,初始值为0,其实就是事件的个数
    public int Status = UnEditing;   //默认状态,未被修改
}
class Program
{
    static void Main(string[] args)
    {
        var aggregateCount = 4;                    //用于测试的聚合根的个数
        var eventCountPerAggregate = 10;           //单个聚合根产生的事件数
        var aggregateIdList = new List<string>();  //一个List,存放所有聚合根的ID
        var aggregateCurrentVersionDict = new ConcurrentDictionary<string, AggregateVersionInfo>();  //一个Dict,用于保存所有聚合根的当前版本信息
        var aggregateEventsDict = new Dictionary<string, IList<int>>();                      //一个Dict,用于模拟存储每个聚合根的所有事件

        //先生成所有聚合根ID
        for (var index = 1; index <= aggregateCount; index++)
        {
            aggregateIdList.Add("key-" + index);
        }
        //初始化每个聚合根的当前状态
        foreach (var aggregateId in aggregateIdList)
        {
            aggregateCurrentVersionDict[aggregateId] = new AggregateVersionInfo();
            aggregateEventsDict[aggregateId] = new List<int>();
        }

        //该方法用于实现事件的并发冲突检测和持久化逻辑。
        Action<string, int> persistEventAction = (aggregateId, currentEventVersion) =>
        {
            var aggregateVersionInfo = aggregateCurrentVersionDict[aggregateId];
            var originalStatus = Interlocked.CompareExchange(
                ref aggregateVersionInfo.Status,
                AggregateVersionInfo.Editing,
                AggregateVersionInfo.UnEditing);

            //这里两者不相等,说明aggregateVersionInfo.Status成功更新为Editing了
            if (originalStatus != aggregateVersionInfo.Status)
            {
                if (currentEventVersion == aggregateVersionInfo.CurrentVersion + 1)
                {
                    //这里,将事件加入到一个List,真实的eventstore会在这里持久化事件到文件;
                    aggregateEventsDict[aggregateId].Add(currentEventVersion);
                    //更新聚合根的最新版本
                    aggregateVersionInfo.CurrentVersion++;
                }
                else
                {
                    //进入这里,说明有别的线程已经添加了该版本,也就是遇到并发冲突了。
                }

                //处理完后,将聚合根的版本状态修改回UnEditing
                Interlocked.Exchange(ref aggregateVersionInfo.Status, AggregateVersionInfo.UnEditing);
            }
            else
            {
                //进入这里,说明有别的线程正在更改当前聚合根的版本信息,也可以认为是遇到并发冲突了。
            }
        };

        //该方法用于模拟并行产生事件并调用事件的持久化逻辑
        Action generateEventAction = () =>
        {
            foreach (var aggregateId in aggregateIdList) //循环处理每个聚合根
            {
                //对每个聚合根产生指定个数的事件,为了简化,仅使用事件版本号表示事件了
                for (var eventVersion = 1; eventVersion <= eventCountPerAggregate; eventVersion++)
                {
                    for (var i = 0; i < 100000; i++) //这里纯粹为了性能测试,对每个事件再循环10W次调用持久化逻辑
                    {
                        persistEventAction(aggregateId, eventVersion); //调用持久化方法持久化聚合根的当前事件
                    }
                }
            }    
        };

        var watch = Stopwatch.StartNew();
        //模拟同时4个线程同时产生事件并持久化,这里其实只要开2个够了,因为我的笔记本只有2个核
        Parallel.Invoke(generateEventAction, generateEventAction, generateEventAction, generateEventAction);
        watch.Stop();
        var time = watch.ElapsedMilliseconds;

        //最后输出结果,输出总运行时间,以及验证每个聚合根的当前版本以及聚合根的每个事件的版本是否是顺序逐个递增的。
        Console.WriteLine("total time:{0}ms", time);
        foreach (var aggregateId in aggregateIdList)
        {
            Console.WriteLine("aggregateId:{0}, currentVersion:{1}, events:{2}",
                aggregateId,
                aggregateCurrentVersionDict[aggregateId].CurrentVersion,
                string.Join(",", aggregateEventsDict[aggregateId].ToArray()));
        }

        Console.ReadLine();
    }
}

DEMO运行结果以及分析

图片 1

自从上图可以观看,开启4个线程,并行操作4单聚合根,每个聚合根产生10单例外版本的风波(事件版本号连续递增),每个事件再度发生10W次,只费了大体上1s时间。另外,最后每个聚合根的当下版本号以及所对应的风波吧都是科学的。所以,可以看,性能还不易。4个线程并行处理,每秒可处理400W个事件(当然实际得没这样大,这里是坐大部分处理还被CompareExchange方法判断掉了。所以,只有无起的状况,才是拔尖状态下的卓绝抢之性能点,因为每个事件还见面开持久化和创新当前本的逻辑,上面的代码主要是为证明并发情况下是否会见起重复版本的风波之效应。),且能担保不会见持久化重复版本的轩然大波。明天空余把持久化事件替换为实在的写照文件流的法门,看看性能会时有发生小,理论及要写文件流够快,那性能应还是十分高。

遗留问题

面还有一个题目本身还不曾提及,那便是光用一个文件来存储所有的轩然大波还不够的,我们还得一个文本存储每个事件于文书被的岗位及尺寸,否则我们从来不道知道每个事件存储于文件的哪里。也便是于当事件写副到文件后,我们用明白当前形容副的开场位置,然后我们可以这个开始位置信息再写副到其他一个一定给索引作用的文本。这个问题下次有机遇以详细分析吧,总体思路和淘宝开源的大性能分布式消息队列metaq的音讯存储架构非常相像。淘宝之metaq之所以会胜性能,很充分一点故为是设计为顺序写文件,随机读文件之笔触。如下图所示:

图片 2

 

落得图被之commitlog文件相当给本人点提到的之所以来存储事件之公文文件,commitlog在metaq消息队列中凡是故来囤消息之。index文件相当给用来囤积事件在commitlog中之位置以及长短。在metaq中,则是因此来储存消息于commitlog中之岗位与长。所以,从存储结构的角度来拘禁,metaq的音讯存储和eventstore的事件存储的结构同样;但无同等的是,metaq在存储信不时,不欲做并作控制,所有信息而append消息及commitlog即可,所有的index文件为如append写副即可,关于metaq具体还详实的计划性自己还无深入研讨,有趣味的朋友为堪和自我交流。而eventstore则要对事件的本号举行并作控制,这是不过老之区别。另外,实际上,事件的目录信息可以就待保护在内存中即可,因为这些索引信息于eventstore启动时老是好经过commitlog还原下。当然我们保护一客Index文件呢得以,只是会增加事件持久化时的复杂度,这里究竟是不是用之Index文件,我急需重新钻下metaq后才能够更为明确。

至于采取LevelDB的思维

以调研的长河中,无意中发现LevelDB的插入性能大大。它是由于Google的MapReduce和BigTable的作者设计之一个基于key/value结构的轻量级的雅快的开源的NoSQL数据库。它亦可支持10亿级别之数据量存储。LevelDB
是单独进程的服务,性能特别之大,在同大4独Q6600的CPU机器上,每秒钟写多少超过40w,而即兴读的特性每秒钟超过10w,足见性能的强。正因他的神速,所以现在无数任何NoSQL都采取它们来作为底层的数量持久化,比如淘宝之Tair支持用LevelDB来持久化缓存数据。所以有时间研究下LevelDB的宏图以及实现大有必不可少。但是LevelDB只提供最简易的key/value的操作。对于顺序插入事件之需要,可以调用LevelDB的put操作。但是此地的put操作不支持并发冲突的检测,也即是设连put了一定量只key相同之value,则前一个value就会见让后一个value所覆盖,这不是我们怀念如果的。所以我们若应用LevelDB,对于同一个聚合根不可知起半点独版本号相同的波之需要仍要我们团结一心来管,可以由此地方DEMO中的笔触来实现。也就是说,我们唯有用LevelDB来代替日志。其实这么都省去我们许多之工作量,因为我们团结写日记与记录每个事件之仓储位置及尺寸不是一样桩易之事体,要求对算法和逻辑很紧密,否则如果一个bit错位了,可能读取出来的具备数据还错了。而LevelDB帮咱好了太复杂与头疼的事务了。但不幸的是,LevelDB没有官方的windows版本。我能够找到.net平台下的贯彻,但若以生产条件下,还是如多做多证明才实施。另外,如果要用LevelDB来持久化事件,那我们的key可以是聚合根ID+事件版本号的字符串拼接。这点应该不难理解吧!

结束语

当下首文章洋洋洒洒,都是思路性的事物,希望大家看了不见面干瘪,呵呵。欢迎大家提出自己之观点和建议!

网站地图xml地图