• 售前

  • 售后

热门帖子
入门百科

[NewLife.XCode]实体队列(多线程生产的大数据会合生存)

[复制链接]
凌善慧 显示全部楼层 发表于 2022-1-16 22:57:31 |阅读模式 打印 上一主题 下一主题
Python微信订餐小步调课程视频

https://edu.caogenba.net/course/detail/36074
Python实战量化生意业务理财体系

https://edu.caogenba.net/course/detail/35475阅读目次

NewLife.XCode是一个有15年汗青的开源数据中心件,支持netcore/net45/net40,由新生命团队(2002~2020)开辟完成并维护至今,以下简称XCode。
整个系列教程会大量团结示例代码和运行日记来举行深入分析,蕴含多年开辟履历于此中,代表作有百亿级大数据及时盘算项目。
开源所在:https://github.com/NewLifeX/X(求star, 1067+)

在大数据分析处置惩罚中,必要对海量数据举行添编削操纵,通例单行操纵难以满意要求,批量操纵势在必行!
飞仙(http://feixian.newlifex.com/)有收藏各种数据库批量插入数据的性能排行榜,此中MySql冠军是60万tps,SQLite冠军是56.6万tps
然而许多时间,数据来自多个渠道(多线程、多网络毗连),单个渠道数据量不大,以致只有一行,就难以利用批量添编削操纵了。比方物联网数据收罗、埋点日记等,在多线程上有大量数据必要写入。因此,XCode创造性筹划了实体队列技能
!!阅读本文之前,发起阅读:https://www.yuque.com/smartstone/xcode/batch
回到目次# 什么是实体队列
要说实体队列EntityDeferredQueue,就不得不提它的基类延伸队列DeferredQueue。
延伸队列DeferredQueue的核心头脑就是“凑批”,把要处置惩罚的零星数据放入一个“队列”,然后定时会合处置惩罚
比方物联网收罗服务端从多个毗连收到数据,必要写入数据库,为了提升吞吐,可以把实体数据放入延伸队列,然后定时的落库,此时,延伸队列得到一批数据,可以利用批量插入技能。

现实上DeferredQueue内部并不是一个队列,而是一个并发字典,由于有些业务场景,必要在“入队列”时去重,比方统计数据,必要拿出某省份的统计数据,多次累加后会合生存。
  1. <code>private static readonly DeferredQueue _statCache = new EntityDeferredQueue { Name = "Gun", Action = EntityActions.Save };
  2. private static void SaveStat(DateTime date, Int32 provinceID, String kind, ScanKinds scanKind, String code)
  3. {
  4.     var key = $"{date:yyMMdd}\_{provinceID}\_{kind}";
  5.     var stat = _statCache.GetOrAdd(key, k => GunProvinceStat.FindByKey(k, true) ?? new GunProvinceStat());
  6.     stat.StatDate = date;
  7.     stat.Kind = kind;
  8.     stat.ProvinceID = provinceID;
  9.     stat.LastCode = code;
  10.     stat.ProcessStat(scanKind);
  11.     _statCache.Commit(key);
  12. }
复制代码
重要流程

对于统计型数据来说,可以在内存内里多次累加盘算指标,然后一次性生存,而且是批量生存,极大淘汰了数据库写入次数。这是大数据分析必备利器!
延伸队列重要属性
  1. <code>/// 跟踪数。达到该值时输出跟踪日志,默认1000
  2. public Int32 TraceCount { get; set; } = 1000;
  3. /// 周期。默认10\_000毫秒
  4. public Int32 Period { get; set; } = 10_000;
  5. /// 最大个数。超过该个数时,进入队列将产生堵塞。默认100\_000
  6. public Int32 MaxEntity { get; set; } = 100_000;
  7. /// 批大小。默认5\_000
  8. public Int32 BatchSize { get; set; } = 5_000;
  9. /// 等待借出对象确认修改的时间,默认3000ms
  10. public Int32 WaitForBusy { get; set; } = 3_000;
  11. /// 保存速度,每秒保存多少个实体
  12. public Int32 Speed { get; private set; }
  13. /// 是否异步处理。默认true表示异步处理,共用DQ定时调度;false表示同步处理,独立线程
  14. public Boolean Async { get; set; } = true;
复制代码
回过头来,实体队列EntityDeferredQueue作为延伸队列的扩展延伸,现实上是界说了“队列数据”的处置惩罚举动。延伸队列只负责网络数据和定时调理,现实处置惩罚举动Process必要扩展。
EntityDeferredQueue界说了 Save/Insert/Update/Upsert/Delete 等举动供选择。
回到目次# 怎样利用实体队列提升吞吐
再次深入分析前文的例子
  1. <code>private static readonly DeferredQueue _statCache = new EntityDeferredQueue { Name = "Gun", Action = EntityActions.Save };
  2. private static void SaveStat(DateTime date, Int32 provinceID, String kind, ScanKinds scanKind, String code)
  3. {
  4.     var key = $"{date:yyMMdd}\_{provinceID}\_{kind}";
  5.     var stat = _statCache.GetOrAdd(key, k => GunProvinceStat.FindByKey(k, true) ?? new GunProvinceStat());
  6.     stat.StatDate = date;
  7.     stat.Kind = kind;
  8.     stat.ProvinceID = provinceID;
  9.     stat.LastCode = code;
  10.     stat.ProcessStat(scanKind);
  11.     _statCache.Commit(key);
  12. }
复制代码
这是一个非常简单的数据分析项目,统计每天各省每一种扫描范例的操纵次数。日均分析处置惩罚5亿行数据,每一行数据都要辨认出日期、省份、种别等字段,也就是SaveStat每天要调用5亿次,效果数据分类存入统计表。共31省份27种种别,逐日统计行数约800行(并非每个省都有全部种别)。普通来讲,5亿行数据,分组聚合得到800行,及时盘算,每5秒盘算一次。
采取流式盘算框架,逐行遍历5亿行及时数据,假如Insert/Update数据库5亿次,显然很不现实!
匀称每行写入62.5万次(5亿/800),假如可以或许在内存内里“凑一凑”,每1000次更新,才写入一次数据库,那么总写入次数低沉为50万次,匀称每行写入625次。
实体队列/延伸队列,正是为了这类场景而筹划!
起首,根据业务去构造一个唯一key,在这里就是日期+省份+种别;
其次,GetOrAdd实行从队列里获取该key对应的统计对象,99%时间内存掷中,假如不存在,则查数据库大概new一个;
再次,取得统计对象后,可以举行字段累加,stat.ProcessStat(scanKind);
末了,Commit告诉队列,该key对应的实体对象已经利用完成,可以提交;
在延伸队列内部,定时(Period=10_000ms)实行一次生存,把内存内里的统计对象批量生存到数据库,并清空队列。

这里碰到的第一个题目就是,少量统计对象仍旧利用怎么办?请放心,定时任务会期待肯定时间(WaitForBusy=3000ms),假如利用方Commit则提前完成。因此,上面的Commit可以不要,效果会变差一些,同时,统计逻辑必须尽快完成(Table>[/code]
回到目次# 系列教程
NewLife.XCode教程系列[2019版]

  1. <code>public void ProcessStat(ScanKinds kind)
  2. {
  3.     //stat.Total++;
  4.     Interlocked.Increment(ref _Total);
  5.     switch (kind)
  6.     {
  7.         case ScanKinds.Receipt:
  8.             //stat.Receipts++;
  9.             Interlocked.Increment(ref _Receipts);
  10.             break;
  11.         case ScanKinds.SendBill:
  12.         case ScanKinds.SendAir:
  13.             //stat.Sends++;
  14.             Interlocked.Increment(ref _Sends);
  15.             break;
  16.         case ScanKinds.SendBag:
  17.             Interlocked.Increment(ref _SendBags);
  18.             break;
  19.         case ScanKinds.ComeBill:
  20.         case ScanKinds.ComeAir:
  21.             //stat.Comes++;
  22.             Interlocked.Increment(ref _Comes);
  23.             break;
  24.         case ScanKinds.ComeBag:
  25.             Interlocked.Increment(ref _ComeBags);
  26.             break;
  27.         case ScanKinds.SendCar:
  28.         case ScanKinds.ComeCar:
  29.             Interlocked.Increment(ref _Cars);
  30.             break;
  31.         case ScanKinds.Dispatch:
  32.             //stat.Dispatchs++;
  33.             Interlocked.Increment(ref _Dispatchs);
  34.             break;
  35.         case ScanKinds.Sign:
  36.             //stat.Signs++;
  37.             Interlocked.Increment(ref _Signs);
  38.             break;
  39.         case ScanKinds.Back:
  40.             Interlocked.Increment(ref _Backs);
  41.             break;
  42.         case ScanKinds.Problem:
  43.             Interlocked.Increment(ref _Problems);
  44.             break;
  45.         case ScanKinds.Stay:
  46.         case ScanKinds.Other:
  47.         case ScanKinds.Input:
  48.         case ScanKinds.Order:
  49.         case ScanKinds.Electronic:
  50.         default:
  51.             Interlocked.Increment(ref _Others);
  52.             break;
  53.     }
  54. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有帐号?立即注册

x

帖子地址: 

回复

使用道具 举报

分享
推广
火星云矿 | 预约S19Pro,享500抵1000!
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

草根技术分享(草根吧)是全球知名中文IT技术交流平台,创建于2021年,包含原创博客、精品问答、职业培训、技术社区、资源下载等产品服务,提供原创、优质、完整内容的专业IT技术开发社区。
  • 官方手机版

  • 微信公众号

  • 商务合作