电脑知识|欧美黑人一区二区三区|软件|欧美黑人一级爽快片淫片高清|系统|欧美黑人狂野猛交老妇|数据库|服务器|编程开发|网络运营|知识问答|技术教程文章 - 好吧啦网

您的位置:首頁技術文章
文章詳情頁

Spring Cloud Stream微服務消息框架原理及實例解析

瀏覽:4日期:2023-09-03 18:45:34

隨著近些年微服務在國內的盛行,消息驅動被提到的越來越多。主要原因是系統被拆分成多個模塊后,一個業務往往需要在多個服務間相互調用,不管是采用HTTP還是RPC都是同步的,不可避免快等慢的情況發生,系統性能上很容易遇到瓶頸。在這樣的背景下,將業務中實時性要求不是特別高且非主干的部分放到消息隊列中是很好的選擇,達到了異步解耦的效果。

目前消息隊列有很多優秀的中間件,目前使用較多的主要有 RabbitMQ,Kafka,RocketMQ 等,這些中間件各有優勢,有的對 AMQP(應用層標準高級消息隊列協議)支持完善,有的提供了更高的可靠性,有的對大數據支持良好,同時各種消息中間件概念不統一,使得選擇和使用一款合適的消息中間件成為難題。Spring跳出來給出了解決方案:Spring Cloud Stream,使用它可以很方便高效的操作消息中間件,程序員只要關心業務代碼即可,目前官方支持 RabbitMQ,Kafka兩大主流MQ,RocketMQ 則自己提供了相應支持。

首先看一下Spring Cloud Stream做了什么,如下圖所示,框架目前官方把消息中間件抽象成了 Binder,業務代碼通過進出管道連接 Binder,各消息中間件的差異性統一交給了框架處理,程序員只需要了解框架的抽象出來的一些統一概念即可

Binder(綁定器):RabbitMQ,Kafka等中間件服務的封裝 Channel(管道):也就是圖中的 inputs 和 outputs 所指區域,是應用程序和 Binder 的橋梁 Gourp(消費組):由于微服務會部署多實例,為了保證只被服務的一個實例消費,可以通過配置,把實例都綁到同一個消費組 Partitioning (消息分區):如果某一類消息只想指定給服務的固定實例消費,可以使用分區實現

Spring Cloud Stream微服務消息框架原理及實例解析

Spring Cloud Stream將業務代碼和消息中間件解耦,帶來的好處可以從下圖很直觀的感受到,很簡潔的代碼,我們便能從RabbitMQ中接受消息然后經過業務處理再向Kafka發送一條消息,只需要更改相關配置就能快速改變系統行為。

Spring Cloud Stream微服務消息框架原理及實例解析

細心的讀者可能會好奇,上圖的代碼只是注入了一個簡單的 Function 而已,實際上,Spring Cloud Stream3.0后集成了Spring Cloud Function框架 ,提倡函數式的風格,棄用先前版本基于注解的開發方式。Spring Cloud Function是 Serverless 和 Faas 的產物,強調面向函數編程,一份代碼各云平臺運行,和Spring Cloud Stream一樣也是解決了基礎設施的差異性問題,通過強大的自動裝配機制,可以根據配置自動暴露 HTTP 服務或者消息服務,并且同時支持命令式和響應式編程模式,可以說是很強大了。下面通過一個簡單的例子來理解下上圖的代碼和框架的使用把。

簡單案例

模擬一個簡單的下單,收到訂單之后處理完,返回成功,然后發送消息給庫存模塊,庫存模塊再發送消息給報表模塊

項目地址

springcloud-stream

項目結構

Spring Cloud Stream微服務消息框架原理及實例解析

項目依賴

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId></dependency><dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>

表單

@Datapublic class OrderForm { private String productName;}

消息管道注冊

@Configuration@Slf4jpublic class MessageQueueConfig { @Bean public Function<OrderForm, OrderForm> inventory() { return orderForm -> { log.info('Inventory Received Message: ' + orderForm); return orderForm; }; } @Bean public Consumer<OrderForm> report() { return orderForm -> { log.info('Report Received Message: ' + orderForm); }; }}

Controller

@Slf4j@RestControllerpublic class OrderController { @Autowired private BeanFactoryChannelResolver resolver; @PostMapping('order') public String order(@RequestBody OrderForm orderForm) { log.info('Received Request ' + orderForm); resolver.resolveDestination('inventory-in-0').send(new GenericMessage<>(orderForm)); return 'success'; }}

配置

框架會按照中間件默認端口去連接,這里自定義了一個名為myLocalRabbit的類型是RabbitMQ的Binder配置,bindings下面 inventory-in-0 是通道名,接受inventory主題(對應RabbitMQ的ExChange)的消息,然后處理完通過 inventory-out-0 通道發送消息到 report 主題, report-in-0通道負責接受report主題的消息。

注:通道名=注冊的 function 方法名 + in或者out + 參數位置(詳見注釋)

spring: cloud: stream:# 配置消息中間件信息 binders: myLocalRabbit: type: rabbit environment: spring: rabbitmq:host: localhostport: 31003username: guestpassword: guestvirtual-host: /# 重點,如何綁定通道,這里有個約定,開頭是函數名,in表示消費消息,out表示生產消息,最后的數字是函數接受的參數的位置,destination后面為訂閱的主題# 比如Function<Tuple2<Flux<String>, Flux<Integer>>, Flux<String>> gather()# gather函數接受的第一個String參數對應 gather-in-0,第二個Integer參數對應 gather-in-1,輸出對應 gather-out-0 bindings: inventory-in-0: destination: inventory inventory-out-0: destination: report report-in-0: destination: report# 注冊聲明的三個函數 function: definition: inventory;report

測試

POST http://localhost:8080/orderContent-Type: application/json{ 'productName': '999'}

結果

POST http://localhost:8080/orderHTTP/1.1 200 Content-Type: text/plain;charset=UTF-8Content-Length: 7Date: Sat, 30 May 2020 15:27:56 GMTKeep-Alive: timeout=60Connection: keep-alivesuccessResponse code: 200; Time: 56ms; Content length: 7 bytes

后臺日志

可以看到消息成功發送到了庫存和報表服務

2020-05-30 23:27:56.956 INFO 8760 --- [nio-8080-exec-1] c.e.springcloudstream.OrderController : Received Request OrderForm(productName=999)2020-05-30 23:27:56.956 INFO 8760 --- [nio-8080-exec-1] o.s.i.h.s.MessagingMethodInvokerHelper : Overriding default instance of MessageHandlerMethodFactory with provided one.2020-05-30 23:27:56.957 INFO 8760 --- [nio-8080-exec-1] c.e.s.MessageQueueConfig : Inventory Received Message: OrderForm(productName=999)2020-05-30 23:27:56.958 INFO 8760 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:31003]2020-05-30 23:27:56.964 INFO 8760 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory.publisher#6131841e:0/SimpleConnection@192fe472 [delegate=amqp://guest@127.0.0.1:31003/, localPort= 2672]2020-05-30 23:27:56.965 INFO 8760 --- [nio-8080-exec-1] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable, auto-delete, or exclusive Queue (inventory.anonymous.wtaFwHlNRkql5IUh2JCNAA) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.2020-05-30 23:27:56.965 INFO 8760 --- [nio-8080-exec-1] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable, auto-delete, or exclusive Queue (report.anonymous.SJgpJKiJQf2tudszgf623w) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.2020-05-30 23:27:56.979 INFO 8760 --- [f2tudszgf623w-1] o.s.i.h.s.MessagingMethodInvokerHelper : Overriding default instance of MessageHandlerMethodFactory with provided one.2020-05-30 23:27:56.980 INFO 8760 --- [f2tudszgf623w-1] c.e.s.MessageQueueConfig : Report Received Message: OrderForm(productName=999)

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持好吧啦網。

標簽: Spring
相關文章:
主站蜘蛛池模板: 流量检测仪-气密性检测装置-密封性试验仪-东莞市奥图自动化科技有限公司 | 山东柳店新能源科技有限公司 | 杭州可当科技有限公司—流量卡_随身WiFi_AI摄像头一站式解决方案 | HYDAC过滤器,HYDAC滤芯,现货ATOS油泵,ATOS比例阀-东莞市广联自动化科技有限公司 | 氟氨基酮、氯硝柳胺、2-氟苯甲酸、异香兰素-新晨化工 | 爱德华真空泵油/罗茨泵维修,爱发科-比其尔产品供应东莞/杭州/上海等全国各地 | 不锈钢水管-不锈钢燃气管-卫生级不锈钢管件-不锈钢食品级水管-广东双兴新材料集团有限公司 | 低浓度恒温恒湿称量系统,强光光照培养箱-上海三腾仪器有限公司 | Maneurop/美优乐压缩机,活塞压缩机,型号规格,技术参数,尺寸图片,价格经销商 | 连续油炸机,全自动油炸机,花生米油炸机-烟台茂源食品机械制造有限公司 | 国产液相色谱仪-超高效液相色谱仪厂家-上海伍丰科学仪器有限公司 | 污水/卧式/潜水/钻井/矿用/大型/小型/泥浆泵,价格,参数,型号,厂家 - 安平县鼎千泵业制造厂 | 气象监测系统_气象传感器_微型气象仪_气象环境监测仪-山东风途物联网 | 防爆大气采样器-防爆粉尘采样器-金属粉尘及其化合物采样器-首页|盐城银河科技有限公司 | 土壤养分检测仪_肥料养分检测仪_土壤水分检测仪-山东莱恩德仪器 大型多片锯,圆木多片锯,方木多片锯,板材多片锯-祥富机械有限公司 | 科研ELISA试剂盒,酶联免疫检测试剂盒,昆虫_植物ELISA酶免试剂盒-上海仁捷生物科技有限公司 | 进口试验机价格-进口生物材料试验机-西安卡夫曼测控技术有限公司 | 北钻固控设备|石油钻采设备-石油固控设备厂家 | 贝朗斯动力商城(BRCPOWER.COM) - 买叉车蓄电池上贝朗斯商城,价格更超值,品质有保障! | 网站建设-临朐爱采购-抖音运营-山东兆通网络科技 | 球磨机,节能球磨机价格,水泥球磨机厂家,粉煤灰球磨机-吉宏机械制造有限公司 | 中式装修设计_全屋定制家具_实木仿古门窗花格厂家-喜迎门 | 全自动面膜机_面膜折叠机价格_面膜灌装机定制_高速折棉机厂家-深圳市益豪科技有限公司 | 集装袋吨袋生产厂家-噸袋廠傢-塑料编织袋-纸塑复合袋-二手吨袋-太空袋-曹县建烨包装 | POS机办理_个人POS机免费领取 - 银联POS机申请首页 | 深圳展厅设计_企业展馆设计_展厅设计公司_数字展厅设计_深圳百艺堂 | 南京泽朗生物科技有限公司 | 定制/定做衬衫厂家/公司-衬衫订做/订制价格/费用-北京圣达信 | 游动电流仪-流通式浊度分析仪-杰普仪器(上海)有限公司 | 大_小鼠elisa试剂盒-植物_人Elisa试剂盒-PCR荧光定量试剂盒-上海一研生物科技有限公司 | 碳化硅,氮化硅,冰晶石,绢云母,氟化铝,白刚玉,棕刚玉,石墨,铝粉,铁粉,金属硅粉,金属铝粉,氧化铝粉,硅微粉,蓝晶石,红柱石,莫来石,粉煤灰,三聚磷酸钠,六偏磷酸钠,硫酸镁-皓泉新材料 | 回转支承-转盘轴承-回转驱动生产厂家-洛阳隆达轴承有限公司 | 废气处理_废气处理设备_工业废气处理_江苏龙泰环保设备制造有限公司 | 手板_手板模型制作_cnc手板加工厂-东莞天泓 | 全自动贴标机-套标机-工业热风机-不干胶贴标机-上海厚冉机械 | 济南菜鸟驿站广告|青岛快递车车体|社区媒体-抖音|墙体广告-山东揽胜广告传媒有限公司 | [官网]叛逆孩子管教_戒网瘾学校_全封闭问题青少年素质教育_新起点青少年特训学校 | 厂房出租_厂房出售_产业园区招商_工业地产&nbsp;-&nbsp;中工招商网 | 上海冠顶工业设备有限公司-隧道炉,烘箱,UV固化机,涂装设备,高温炉,工业机器人生产厂家 | 防火板_饰面耐火板价格、厂家_品牌认准格林雅 | RS系列电阻器,RK_RJ启动调整电阻器,RQ_RZ电阻器-上海永上电器有限公司 |