电脑知识|欧美黑人一区二区三区|软件|欧美黑人一级爽快片淫片高清|系统|欧美黑人狂野猛交老妇|数据库|服务器|编程开发|网络运营|知识问答|技术教程文章 - 好吧啦网

您的位置:首頁技術文章
文章詳情頁

如何通過Python實現RabbitMQ延遲隊列

瀏覽:4日期:2022-07-03 18:32:40

最近在做一任務時,遇到需要延遲處理的數據,最開始的做法是現將數據存儲在數據庫,然后寫個腳本,隔五分鐘掃描數據表再處理數據,實際效果并不好。因為系統本身一直在用RabbitMQ做異步處理任務的中間件,所以想到是否可以利用RabbitMQ實現延遲隊列。功夫不負有心人,RabbitMQ雖然沒有現成可用的延遲隊列,但是可以利用其兩個重要特性來實現之:1、Time To Live(TTL)消息超時機制;2、Dead Letter Exchanges(DLX)死信隊列。下面將具體描述實現原理以及實現代

延遲隊列的基礎原理Time To Live(TTL)

RabbitMQ可以針對Queue設置x-expires 或者 針對Message設置 x-message-ttl,來控制消息的生存時間,如果超時(兩者同時設置以最先到期的時間為準),則消息變為dead letter(死信)RabbitMQ消息的過期時間有兩種方法設置。

通過隊列(Queue)的屬性設置,隊列中所有的消息都有相同的過期時間。(本次延遲隊列采用的方案)對消息單獨設置,每條消息TTL可以不同。

如果同時使用,則消息的過期時間以兩者之間TTL較小的那個數值為準。消息在隊列的生存時間一旦超過設置的TTL值,就成為死信(dead letter)

Dead Letter Exchanges(DLX)

RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可選)兩個參數,如果隊列內出現了dead letter,則按照這兩個參數重新路由轉發到指定的隊列。

x-dead-letter-exchange:出現死信(dead letter)之后將dead letter重新發送到指定exchange x-dead-letter-routing-key:出現死信(dead letter)之后將dead letter重新按照指定的routing-key發送

隊列中出現死信(dead letter)的情況有:

消息或者隊列的TTL過期。(延遲隊列利用的特性) 隊列達到最大長度 消息被消費端拒絕(basic.reject or basic.nack)并且requeue=false

綜合上面兩個特性,將隊列設置TTL規則,隊列TTL過期后消息會變成死信,然后利用DLX特性將其轉發到另外的交換機和隊列就可以被重新消費,達到延遲消費效果。

如何通過Python實現RabbitMQ延遲隊列

延遲隊列設計及實現(Python)

從上面描述,延遲隊列的實現大致分為兩步:

產生死信,有兩種方式Per-Message TTL和 Queue TTL,因為我的需求中是所有的消息延遲處理時間相同,所以本實現中采用 Queue TTL設置隊列的TTL,如果需要將隊列中的消息設置不同的延遲處理時間,則設置Per-Message TTL(官方文檔)

設置死信的轉發規則,Dead Letter Exchanges設置方法(官方文檔)

完整代碼如下:

'''Created on Fri Aug 3 17:00:44 2018@author: Bge'''import pika,json,loggingclass RabbitMQClient: def __init__(self, conn_str=’amqp://user:pwd@host:port/%2F’): self.exchange_type = 'direct' self.connection_string = conn_str self.connection = pika.BlockingConnection(pika.URLParameters(self.connection_string)) self.channel = self.connection.channel() self._declare_retry_queue() #RetryQueue and RetryExchange logging.debug('connection established') def close_connection(self): self.connection.close() logging.debug('connection closed') def declare_exchange(self, exchange): self.channel.exchange_declare(exchange=exchange, exchange_type=self.exchange_type, durable=True) def declare_queue(self, queue): self.channel.queue_declare(queue=queue, durable=True,) def declare_delay_queue(self, queue,DLX=’RetryExchange’,TTL=60000): ''' 創建延遲隊列 :param TTL: ttl的單位是us,ttl=60000 表示 60s :param queue: :param DLX:死信轉發的exchange :return: ''' arguments={} if DLX: #設置死信轉發的exchange arguments[ ’x-dead-letter-exchange’]=DLX if TTL: arguments[’x-message-ttl’]=TTL print(arguments) self.channel.queue_declare(queue=queue, durable=True, arguments=arguments) def _declare_retry_queue(self): ''' 創建異常交換器和隊列,用于存放沒有正常處理的消息。 :return: ''' self.channel.exchange_declare(exchange=’RetryExchange’, exchange_type=’fanout’, durable=True) self.channel.queue_declare(queue=’RetryQueue’, durable=True) self.channel.queue_bind(’RetryQueue’, ’RetryExchange’,’RetryQueue’) def publish_message(self,routing_key, msg,exchange=’’,delay=0,TTL=None): ''' 發送消息到指定的交換器 :param exchange: RabbitMQ交換器 :param msg: 消息實體,是一個序列化的JSON字符串 :return: ''' if delay==0: self.declare_queue(routing_key) else: self.declare_delay_queue(routing_key,TTL=TTL) if exchange!=’’: self.declare_exchange(exchange) self.channel.basic_publish(exchange=exchange, routing_key=routing_key, body=msg, properties=pika.BasicProperties( delivery_mode=2, type=exchange )) self.close_connection() print('message send out to %s' % exchange) logging.debug('message send out to %s' % exchange) def start_consume(self,callback,queue=’#’,delay=1): ''' 啟動消費者,開始消費RabbitMQ中的消息 :return: ''' if delay==1: queue=’RetryQueue’ else: self.declare_queue(queue) self.channel.basic_qos(prefetch_count=1) try: self.channel.basic_consume( # 消費消息callback, # 如果收到消息,就調用callback函數來處理消息queue=queue, # 你要從那個隊列里收消息 ) self.channel.start_consuming() except KeyboardInterrupt: self.stop_consuming() def stop_consuming(self): self.channel.stop_consuming() self.close_connection() def message_handle_successfully(channel, method): ''' 如果消息處理正常完成,必須調用此方法, 否則RabbitMQ會認為消息處理不成功,重新將消息放回待執行隊列中 :param channel: 回調函數的channel參數 :param method: 回調函數的method參數 :return: ''' channel.basic_ack(delivery_tag=method.delivery_tag) def message_handle_failed(channel, method): ''' 如果消息處理失敗,應該調用此方法,會自動將消息放入異常隊列 :param channel: 回調函數的channel參數 :param method: 回調函數的method參數 :return: ''' channel.basic_reject(delivery_tag=method.delivery_tag, requeue=False)

發布消息代碼如下:

from MQ.RabbitMQ import RabbitMQClientprint('start program')client = RabbitMQClient()msg1 = ’{'key':'value'}’client.publish_message(’test-delay’,msg1,delay=1,TTL=10000)print('message send out')

消費者代碼如下:

from MQ.RabbitMQ import RabbitMQClientimport jsonprint('start program')client = RabbitMQClient()def callback(ch, method, properties, body): msg = body.decode() print(msg) # 如果處理成功,則調用此消息回復ack,表示消息成功處理完成。 RabbitMQClient.message_handle_successfully(ch, method)queue_name = 'RetryQueue'client.start_consume(callback,queue_name,delay=0)

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持好吧啦網。

標簽: Python 編程
相關文章:
主站蜘蛛池模板: 断桥铝破碎机_铝合金破碎机_废铁金属破碎机-河南鑫世昌机械制造有限公司 | 【直乐】河北石家庄脊柱侧弯医院_治疗椎间盘突出哪家医院好_骨科脊柱外科专业医院_治疗抽动症/关节病骨伤权威医院|排行-直乐矫形中医医院 | 阀门智能定位器_电液动执行器_气动执行机构-赫尔法流体技术(北京)有限公司 | 山东风淋室_201/304不锈钢风淋室净化设备厂家-盛之源风淋室厂家 翻斗式矿车|固定式矿车|曲轨侧卸式矿车|梭式矿车|矿车配件-山东卓力矿车生产厂家 | 脉冲除尘器,除尘器厂家-淄博机械 | 隧道窑炉,隧道窑炉厂家-山东艾瑶国际贸易| 海外仓系统|国际货代系统|退货换标系统|WMS仓储系统|海豚云 | 气动球阀_衬氟蝶阀_调节阀_电动截止阀_上海沃托阀门有限公司 | 玻纤土工格栅_钢塑格栅_PP焊接_单双向塑料土工格栅_复合防裂布厂家_山东大庚工程材料科技有限公司 | 二手光谱仪维修-德国OBLF光谱仪|进口斯派克光谱仪-热电ARL光谱仪-意大利GNR光谱仪-永晖检测 | 【365公司转让网】公司求购|转让|资质买卖_股权转让交易平台 | 翻斗式矿车|固定式矿车|曲轨侧卸式矿车|梭式矿车|矿车配件-山东卓力矿车生产厂家 | 仓储货架_南京货架_钢制托盘_仓储笼_隔离网_环球零件盒_诺力液压车_货架-南京一品仓储设备制造公司 | 烟台游艇培训,威海游艇培训-烟台市邮轮游艇行业协会 | 青岛球场围网,青岛车间隔离网,青岛机器人围栏,青岛水源地围网,青岛围网,青岛隔离栅-青岛晟腾金属制品有限公司 | 欧美日韩国产一区二区三区不_久久久久国产精品无码不卡_亚洲欧洲美洲无码精品AV_精品一区美女视频_日韩黄色性爱一级视频_日本五十路人妻斩_国产99视频免费精品是看4_亚洲中文字幕无码一二三四区_国产小萍萍挤奶喷奶水_亚洲另类精品无码在线一区 | 电镀标牌_电铸标牌_金属标贴_不锈钢标牌厂家_深圳市宝利丰精密科技有限公司 | 在线PH计-氧化锆分析仪-在线浊度仪-在线溶氧仪- 无锡朝达 | 全国冰箱|空调|洗衣机|热水器|燃气灶维修服务平台-百修家电 | 进口消泡剂-道康宁消泡剂-陶氏消泡剂-大洋消泡剂 | 烘箱-工业烘箱-工业电炉-实验室干燥箱 - 苏州华洁烘箱制造有限公司 | 洛阳网站建设_洛阳网站优化_网站建设平台_洛阳香河网络科技有限公司 | 压接机|高精度压接机|手动压接机|昆明可耐特科技有限公司[官网] 胶泥瓷砖胶,轻质粉刷石膏,嵌缝石膏厂家,腻子粉批发,永康家德兴,永康市家德兴建材厂 | 砖机托板价格|免烧砖托板|空心砖托板厂家_山东宏升砖机托板厂 | 不锈钢法兰-碳钢法兰-法兰盘生产加工厂家-[鼎捷峰]-不锈钢法兰-碳钢法兰-法兰盘生产加工厂家-[鼎捷峰] | 金蝶帐无忧|云代账软件|智能财税软件|会计代账公司专用软件 | 仿真茅草_人造茅草瓦价格_仿真茅草厂家_仿真茅草供应-深圳市科佰工贸有限公司 | 全自动定氮仪-半自动凯氏定氮仪厂家-祎鸿仪器 | 佛山市钱丰金属不锈钢蜂窝板定制厂家|不锈钢装饰线条|不锈钢屏风| 电梯装饰板|不锈钢蜂窝板不锈钢工艺板材厂家佛山市钱丰金属制品有限公司 | 「安徽双凯」自动售货机-无人售货机-成人用品-自动饮料食品零食售货机 | 铝镁锰板厂家_进口钛锌板_铝镁锰波浪板_铝镁锰墙面板_铝镁锰屋面-杭州军晟金属建筑材料 | arch电源_SINPRO_开关电源_模块电源_医疗电源-东佑源 | 煤矿人员精确定位系统_矿用无线通信系统_煤矿广播系统 | 成都LED显示屏丨室内户外全彩led屏厂家方案报价_四川诺显科技 | 卷筒电缆-拖链电缆-特种柔性扁平电缆定制厂家「上海缆胜」 | 硬度计,金相磨抛机_厂家-莱州华煜众信试验仪器有限公司 | Maneurop/美优乐压缩机,活塞压缩机,型号规格,技术参数,尺寸图片,价格经销商 | 转向助力泵/水泵/发电机皮带轮生产厂家-锦州华一精工有限公司 | 泰国试管婴儿_泰国第三代试管婴儿费用|成功率|医院—新生代海外医疗 | LED投光灯-工矿灯-led路灯头-工业灯具 - 山东普瑞斯照明科技有限公司 | 净气型药品柜-试剂柜-无管道净气型通风柜-苏州毕恩思 |