电脑知识|欧美黑人一区二区三区|软件|欧美黑人一级爽快片淫片高清|系统|欧美黑人狂野猛交老妇|数据库|服务器|编程开发|网络运营|知识问答|技术教程文章 - 好吧啦网

您的位置:首頁(yè)技術(shù)文章
文章詳情頁(yè)

.NETCore基于RabbitMQ實(shí)現(xiàn)延時(shí)隊(duì)列的兩方法

瀏覽:239日期:2022-06-08 17:02:45
目錄
  • 前言
  • 實(shí)現(xiàn)延時(shí)隊(duì)列的兩種方式
    • 利用rabbitmq死信隊(duì)列x-dead-letter-exchange和x-dead-letter-routing-key
    • .NETCore實(shí)現(xiàn)方式
    • rabbitmq通過(guò)安裝插件的形式實(shí)現(xiàn)(推薦)
    • .NET Core 實(shí)現(xiàn)
  • 第一種方式的缺陷以及解決方案

    前言

    此文章用來(lái)記錄自己學(xué)習(xí)延時(shí)隊(duì)列過(guò)程的文章,并用.NET這兩種方式實(shí)現(xiàn)了簡(jiǎn)單的Demo。

    延時(shí)隊(duì)列的應(yīng)用場(chǎng)景 應(yīng)用下單后,30分鐘沒(méi)有支付的話(huà),則自動(dòng)取消訂單活動(dòng)開(kāi)始前30分鐘,提醒參賽者參加活動(dòng)。活動(dòng)結(jié)束后,30分鐘后提醒未進(jìn)行評(píng)價(jià)的參賽人員進(jìn)行評(píng)價(jià)…

    上述的場(chǎng)景都可以使用延時(shí)隊(duì)列進(jìn)行對(duì)應(yīng)的處理。

    上面的場(chǎng)景雖說(shuō)可以通過(guò)定時(shí)器也可以處理,但有點(diǎn)浪費(fèi)資源, 而上述的場(chǎng)景時(shí)間是不定的,例如有兩個(gè)活動(dòng)需要提醒參賽者參加,一個(gè)是7點(diǎn)開(kāi)始 ,另一個(gè)是8點(diǎn)開(kāi)始,那么觸發(fā)處理的一個(gè)是6點(diǎn)半,一個(gè)是7點(diǎn)半。

    實(shí)現(xiàn)延時(shí)隊(duì)列的兩種方式

    使用Rabbitmq實(shí)現(xiàn)延時(shí)隊(duì)列可以讓消息持久化,也支持分布式

    缺點(diǎn)第一種第一種方式的缺陷以及解決方案第二種這個(gè)插件的當(dāng)前設(shè)計(jì)并不真正適合具有大量延遲消息(例如成百上千或數(shù)百萬(wàn))的場(chǎng)景。詳情信息

    利用rabbitmq死信隊(duì)列x-dead-letter-exchange和x-dead-letter-routing-key

    實(shí)現(xiàn)需要?jiǎng)?chuàng)建兩對(duì)交換機(jī)和隊(duì)列,其中需要對(duì)其中一對(duì)的隊(duì)列進(jìn)行設(shè)置x-dead-letter-exchange和x-dead-letter-routing-key屬性,屬性指定轉(zhuǎn)發(fā)到另一對(duì)的交換機(jī),

    隨后實(shí)現(xiàn)流程圖如下:

    .NETCore實(shí)現(xiàn)方式

    項(xiàng)目:.NET Core 控制臺(tái)項(xiàng)目

    install-package RabbitMQ.Client

    生產(chǎn)者代碼:

        ConnectionFactory connectionFactory = new ConnectionFactory    {UserName = "guest",Password = "guest",HostName = "127.0.0.1"    };    //創(chuàng)建連接    var connection = connectionFactory.CreateConnection();    //創(chuàng)建通道    var channl = connection.CreateModel();   //指定隊(duì)列的x-dead-letter-exchange和x-dead-letter-routing-key    Dictionary<string, object> queueArgs = new Dictionary<string, object>()    {{ "x-dead-letter-exchange","exchange.business.test" },{"x-dead-letter-routing-key","businessRoutingkey" }    };    //延時(shí)的交換機(jī)和隊(duì)列綁定    channl.ExchangeDeclare("exchange.business.dlx", "direct", true, false, null);    channl.QueueDeclare("queue.business.dlx", true, false, false, queueArgs);    channl.QueueBind("queue.business.dlx", "exchange.business.dlx", "");    //業(yè)務(wù)的交換機(jī)和隊(duì)列綁定    channl.ExchangeDeclare("exchange.business.test", "direct", true, false, null);    channl.QueueDeclare("queue.business.test", true, false, false, null);    channl.QueueBind("queue.business.test", "exchange.business.test", "businessRoutingkey", null);    Console.WriteLine("生產(chǎn)者開(kāi)始發(fā)送消息");    while (true)    {string message = Console.ReadLine();var body = Encoding.UTF8.GetBytes(message);var properties = channl.CreateBasicProperties();properties.Persistent = true;properties.Expiration = "5000";//發(fā)送一條延時(shí)5秒的消息channl.BasicPublish("exchange.business.dlx", "", properties, body);    }

    消費(fèi)者

        ConnectionFactory connectionFactory = new ConnectionFactory    {UserName = "guest",Password = "guest",HostName = "127.0.0.1"    };    //創(chuàng)建連接    var connection = connectionFactory.CreateConnection();    var channel = connection.CreateModel();    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);    //給消費(fèi)時(shí)添加一個(gè)委托    consumer.Received += (obj, ea) =>    {var message = Encoding.UTF8.GetString(ea.Body.ToArray());//打印消費(fèi)的消息Console.WriteLine(message);channel.BasicAck(ea.DeliveryTag, false);    };    //消費(fèi)queue.business.test隊(duì)列的消息    channel.BasicConsume("queue.business.test", false, consumer);    Console.ReadKey();    channel.Dispose();    connection.Close();

    實(shí)現(xiàn)效果:

    rabbitmq通過(guò)安裝插件的形式實(shí)現(xiàn)(推薦)

    使用rabbitmq_delayed_message_exchange 插件提供的x-delayed-message類(lèi)型的交換機(jī)

    下載插件的地址:https://www.rabbitmq.com/community-plugins.html
    選中rabbitmq_delayed_message_exchange插件

    該插件使用只需要聲明交換機(jī)的時(shí)候,指定x-delayed-message類(lèi)型,然后添加x-delayed-type參數(shù)即可

    .NET Core 實(shí)現(xiàn)

    生產(chǎn)者

        ConnectionFactory connectionFactory = new ConnectionFactory()    {UserName = "guest",Password = "guest",HostName = "127.0.0.1"    };    var connection = connectionFactory.CreateConnection();    var channel = connection.CreateModel();    Dictionary<string, object> exchangeArgs = new Dictionary<string, object>()    {{"x-delayed-type","direct" }    };    //指定x-delayed-message 類(lèi)型的交換機(jī),并且添加x-delayed-type屬性    channel.ExchangeDeclare("plug.delay.exchange", "x-delayed-message", true, false, exchangeArgs);    channel.QueueDeclare("plug.delay.queue", true, false, false, null);    channel.QueueBind("plug.delay.queue", "plug.delay.exchange", "plugdelay");    var properties = channel.CreateBasicProperties();    Console.WriteLine("生產(chǎn)者開(kāi)始發(fā)送消息");    Dictionary<string, object> headers = new Dictionary<string, object>()    {{"x-delay","5000" }    };    properties.Persistent = true;    properties.Headers = headers;    while (true)    {string message = Console.ReadLine();var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish("plug.delay.exchange", "plugdelay", properties, body);    }

    消費(fèi)者:

        ConnectionFactory connectionFactory = new ConnectionFactory    {UserName = "guest",Password = "guest",HostName = "127.0.0.1"    };    //創(chuàng)建連接    var connection = connectionFactory.CreateConnection();    var channel = connection.CreateModel();    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);    consumer.Received += (obj, ea) =>    {var message = Encoding.UTF8.GetString(ea.Body.ToArray());Console.WriteLine(message);channel.BasicAck(ea.DeliveryTag, false);    };    channel.BasicConsume("plug.delay.queue", false, consumer);    Console.ReadKey();    channel.Dispose();    connection.Close();

    實(shí)現(xiàn)效果:

    第一種方式的缺陷以及解決方案

    如果存在A、B消息進(jìn)入了隊(duì)列中,A在前,B在后,如果B消息的過(guò)期時(shí)間比A的過(guò)期時(shí)間要早,消費(fèi)的時(shí)候,并不會(huì)先消費(fèi)B,再消費(fèi)A,而是B會(huì)等A先消費(fèi),即使A要晚過(guò)期

    舉例

    生產(chǎn)者代碼修改成如下:

        ConnectionFactory connectionFactory = new ConnectionFactory    {UserName = "guest",Password = "guest",HostName = "127.0.0.1"    };    //創(chuàng)建連接    var connection = connectionFactory.CreateConnection();    //創(chuàng)建通道    var channl = connection.CreateModel();    Dictionary<string, object> queueArgs = new Dictionary<string, object>()    {{ "x-dead-letter-exchange","exchange.business.test" },{"x-dead-letter-routing-key","businessRoutingkey" }    };    //延時(shí)的交換機(jī)和隊(duì)列綁定    channl.ExchangeDeclare("exchange.business.dlx", "direct", true, false, null);    channl.QueueDeclare("queue.business.dlx", true, false, false, queueArgs);    channl.QueueBind("queue.business.dlx", "exchange.business.dlx", "");    //業(yè)務(wù)的交換機(jī)和隊(duì)列綁定    channl.ExchangeDeclare("exchange.business.test", "direct", true, false, null);    channl.QueueDeclare("queue.business.test", true, false, false, null);    channl.QueueBind("queue.business.test", "exchange.business.test", "businessRoutingkey", null);    string message1 = "Hello Word!1";    string message2 = "Hello Word!2";    var body1 = Encoding.UTF8.GetBytes(message1);    var body2 = Encoding.UTF8.GetBytes(message2);    var properties = channl.CreateBasicProperties();    properties.Persistent = true;    //先發(fā)送過(guò)期時(shí)間5秒的消息    properties.Expiration = "5000";    channl.BasicPublish("exchange.business.dlx", "", properties, body2);    //再發(fā)送過(guò)期時(shí)間3秒的消息    properties.Expiration = "3000";    channl.BasicPublish("exchange.business.dlx", "", properties, body1);

    結(jié)果:

    這里先發(fā)了延時(shí)20秒的A消息,然后又發(fā)了延時(shí)10秒的B消息,但是最終結(jié)果并不是先消費(fèi)了B消息,而是等A消息過(guò)期后,立刻再去消費(fèi)B。

    這個(gè)會(huì)影響什么業(yè)務(wù)呢?好比兩個(gè)C、D活動(dòng),C活動(dòng)開(kāi)始時(shí)間是7點(diǎn),D活動(dòng)開(kāi)始時(shí)間是5點(diǎn),那么D活動(dòng)提醒需要等到C活動(dòng)提醒后,才會(huì)立刻提醒,這明顯不符合我們的業(yè)務(wù)需求。

    解決方案 每個(gè)活動(dòng)都是單獨(dú)的創(chuàng)建自己的交換機(jī)和隊(duì)列使用第二種實(shí)現(xiàn)方式,即使用插件的形式。

    第一種不太現(xiàn)實(shí),因?yàn)槿绻顒?dòng)多的話(huà),則會(huì)創(chuàng)建很多的隊(duì)列,而且只會(huì)使用一次。

    業(yè)務(wù)上還是推薦使用插件的實(shí)現(xiàn)方式。

    第二種方式的效果

    github地址:

    https://github.com/MDZZ3/RabbitmqDelay

    到此這篇關(guān)于.NETCore基于RabbitMQ實(shí)現(xiàn)延時(shí)隊(duì)列的兩方法的文章就介紹到這了,更多相關(guān).NETCore RabbitMQ 內(nèi)容請(qǐng)搜索以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持!

    標(biāo)簽: ASP.NET
    相關(guān)文章:
    主站蜘蛛池模板: 电销卡_北京电销卡_包月电话卡-豪付网络| 科研ELISA试剂盒,酶联免疫检测试剂盒,昆虫_植物ELISA酶免试剂盒-上海仁捷生物科技有限公司 | 蜘蛛车-高空作业平台-升降机-高空作业车租赁-臂式伸缩臂叉装车-登高车出租厂家 - 普雷斯特机械设备(北京)有限公司 | 闪蒸干燥机-喷雾干燥机-带式干燥机-桨叶干燥机-[常州佳一干燥设备] | 字典-新华字典-在线字典查字-字典趣| 噪声治理公司-噪音治理专业隔音降噪公司 | 数控车床-立式加工中心-多功能机床-小型车床-山东临沂金星机床有限公司 | Safety light curtain|Belt Sway Switches|Pull Rope Switch|ultrasonic flaw detector-Shandong Zhuoxin Machinery Co., Ltd | 大型果蔬切片机-水果冬瓜削皮机-洗菜机切菜机-肇庆市凤翔餐饮设备有限公司 | CNC机加工-数控加工-精密零件加工-ISO认证厂家-鑫创盟 | 丹佛斯变频器-丹佛斯压力开关-变送器-广州市风华机电设备有限公司 | 深圳激光打标机_激光打标机_激光焊接机_激光切割机_同体激光打标机-深圳市创想激光科技有限公司 深圳快餐店设计-餐饮设计公司-餐饮空间品牌全案设计-深圳市勤蜂装饰工程 | 换网器_自动换网器_液压换网器--郑州海科熔体泵有限公司 | 动物麻醉机-数显脑立体定位仪-北京易则佳科技有限公司 | 插针变压器-家用电器变压器-工业空调变压器-CD型电抗器-余姚市中驰电器有限公司 | 杭州成人高考_浙江省成人高考网上报名 | 代写标书-专业代做标书-商业计划书代写「深圳卓越创兴公司」 | 天津仓储物流-天津电商云仓-天津云仓一件代发-博程云仓官网 | 开平机_纵剪机厂家_开平机生产厂家|诚信互赢-泰安瑞烨精工机械制造有限公司 | 微量水分测定仪_厂家_卡尔费休微量水分测定仪-淄博库仑 | 天命文免费算命堂_自助算命_自由算命系统_长文周易 | uv固化机-丝印uv机-工业烤箱-五金蚀刻机-分拣输送机 - 保定市丰辉机械设备制造有限公司 | 高楼航空障碍灯厂家哪家好_航空障碍灯厂家_广州北斗星障碍灯有限公司 | ASA膜,ASA共挤料,篷布色母料-青岛未来化学有限公司 | 卸料器-卸灰阀-卸料阀-瑞安市天蓝环保设备有限公司 | 丝印油墨_水性油墨_环保油墨油漆厂家_37国际化工 | 佛山市钱丰金属不锈钢蜂窝板定制厂家|不锈钢装饰线条|不锈钢屏风| 电梯装饰板|不锈钢蜂窝板不锈钢工艺板材厂家佛山市钱丰金属制品有限公司 | 盘古网络技术有限公司| 废气处理设备-工业除尘器-RTO-RCO-蓄热式焚烧炉厂家-江苏天达环保设备有限公司 | 全自动在线分板机_铣刀式在线分板机_曲线分板机_PCB分板机-东莞市亿协自动化设备有限公司 | 北京包装设计_标志设计公司_包装设计公司-北京思逸品牌设计 | 除尘器布袋骨架,除尘器滤袋,除尘器骨架,电磁脉冲阀膜片,卸灰阀,螺旋输送机-泊头市天润环保机械设备有限公司 | 餐饮加盟网_特色餐饮加盟店_餐饮连锁店加盟 | 智能垃圾箱|垃圾房|垃圾分类亭|垃圾分类箱专业生产厂家定做-宿迁市传宇环保设备有限公司 | 暖气片十大品牌厂家_铜铝复合暖气片厂家_暖气片什么牌子好_欣鑫达散热器 | 深圳湾1号房价_深圳湾1号二手房源 | 视觉检测设备_自动化检测设备_CCD视觉检测机_外观缺陷检测-瑞智光电 | 冷热冲击试验箱_温度冲击试验箱价格_冷热冲击箱排名_林频厂家 | 微波萃取合成仪-电热消解器价格-北京安合美诚科学仪器有限公司 | 铆钉机|旋铆机|东莞旋铆机厂家|鸿佰专业生产气压/油压/自动铆钉机 | 武汉宣传片制作-视频拍摄-企业宣传片公司-武汉红年影视 |