电脑知识|欧美黑人一区二区三区|软件|欧美黑人一级爽快片淫片高清|系统|欧美黑人狂野猛交老妇|数据库|服务器|编程开发|网络运营|知识问答|技术教程文章 - 好吧啦网

您的位置:首頁技術文章
文章詳情頁

SpringBoot+RabbitMQ方式收發消息的實現示例

瀏覽:4日期:2023-04-25 17:03:38

本篇會和SpringBoot做整合,采用自動配置的方式進行開發,我們只需要聲明RabbitMQ地址就可以了,關于各種創建連接關閉連接的事都由Spring幫我們了~

交給Spring幫我們管理連接可以讓我們專注于業務邏輯,就像聲明式事務一樣易用,方便又高效。

祝有好收獲,先贊后看,快樂無限。

本文代碼:

https://gitee.com/he-erduo/spring-boot-learning-demo

https://github.com/he-erduo/spring-boot-learning-demo

1. 環境配置

第一節我們先來搞一下環境的配置,上一篇中我們已經引入了自動配置的包,我們既然使用了自動配置的方式,那RabbitMQ的連接信息我們直接放在配置文件中就行了,就像我們需要用到JDBC連接的時候去配置一下DataSource一樣。

SpringBoot+RabbitMQ方式收發消息的實現示例

如圖所示,我們只需要指明一下連接的IP+端口號和用戶名密碼就行了,這里我用的是默認的用戶名與密碼,不寫的話默認也都是guest,端口號也是默認5672。

主要我們需要看一下手動確認消息的配置,需要配置成manual才是手動確認,日后還會有其他的配置項,眼下我們配置這一個就可以了。

接下來我們要配置一個Queue,上一篇中我們往一個名叫erduo的隊列中發送消息,當時是我們手動定義的此隊列,這里我們也需要手動配置,聲明一個Bean就可以了。

@Configuration public class RabbitmqConfig { @Bean public Queue erduo() { // 其三個參數:durable exclusive autoDelete // 一般只設置一下持久化即可 return new Queue('erduo',true); } }

就這么簡單聲明一下就可以了,當然了RabbitMQ畢竟是一個獨立的組件,如果你在RabbitMQ中通過其他方式已經創建過一個名叫erduo的隊列了,你這里也可以不聲明,這里起到的一個效果就是如果你沒有這個隊列,會按照你聲明的方式幫你創建這個隊列。

配置完環境之后,我們就可以以SpringBoot的方式來編寫生產者和消費者了。

2. 生產者與RabbitTemplate

和上一篇的節奏一樣,我們先來編寫生產者,不過這次我要引入一個新的工具:RabbitTemplate。

聽它的這個名字就知道,又是一個拿來即用的工具類,Spring家族這點就很舒服,什么東西都給你封裝一遍,讓你用起來更方便更順手。

RabbitTemplate實現了標準AmqpTemplate接口,功能大致可以分為發送消息和接受消息。

我們這里是在生產者中來用,主要就是使用它的發送消息功能:send和convertAndSend方法。

// 發送消息到默認的Exchange,使用默認的routing key void send(Message message) throws AmqpException; // 使用指定的routing key發送消息到默認的exchange void send(String routingKey, Message message) throws AmqpException; // 使用指定的routing key發送消息到指定的exchange void send(String exchange, String routingKey, Message message) throws AmqpException;

send方法是發送byte數組的數據的模式,這里代表消息內容的對象是Message對象,它的構造方法就是傳入byte數組數據,所以我們需要把我們的數據轉成byte數組然后構造成一個Message對象再進行發送。

// Object類型,可以傳入POJO void convertAndSend(Object message) throws AmqpException; void convertAndSend(String routingKey, Object message) throws AmqpException; void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;

convertAndSend方法是可以傳入POJO對象作為參數,底層是有一個MessageConverter幫我們自動將數據轉換成byte類型或String或序列化類型。

所以這里支持的傳入對象也只有三種:byte類型,String類型和實現了Serializable接口的POJO。

介紹完了,我們可以看一下代碼:

@Slf4j @Component('rabbitProduce') public class RabbitProduce { @Autowired private RabbitTemplate rabbitTemplate; public void send() { String message = 'Hello 我是作者和耳朵,歡迎關注我。' + LocalDateTime.now().toString(); System.out.println('Message content : ' + message); // 指定消息類型 MessageProperties props = MessagePropertiesBuilder.newInstance() .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN).build(); rabbitTemplate.send(Producer.QUEUE_NAME,new Message(message.getBytes(StandardCharsets.UTF_8),props)); System.out.println('消息發送完畢。'); } public void convertAndSend() { User user = new User(); System.out.println('Message content : ' + user); rabbitTemplate.convertAndSend(Producer.QUEUE_NAME,user); System.out.println('消息發送完畢。'); } }

這里我特意寫明了兩個例子,一個用來測試send,另一個用來測試convertAndSend。

send方法里我們看下來和之前的代碼是幾乎一樣的,定義一個消息,然后直接send,但是這個構造消息的構造方法可能比我們想的要多一個參數,我們原來說的只要把數據轉成二進制數組放進去即可,現在看來還要多放一個參數了。

MessageProperties,是的我們需要多放一個MessageProperties對象,從他的名字我們也可以看出它的功能就是附帶一些參數,但是某些參數是少不了的,不帶不行。

比如我的代碼這里就是設置了一下消息的類型,消息的類型有很多種可以是二進制類型,文本類型,或者序列化類型,JSON類型,我這里設置的就是文本類型,指定類型是必須的,也可以為我們拿到消息之后要將消息轉換成什么樣的對象提供一個參考。

convertAndSend方法就要簡單太多,這里我放了一個User對象拿來測試用,直接指定隊列然后放入這個對象即可。

Tips:User必須實現Serializable接口,不然的話調用此方法的時候會拋出IllegalArgumentException異常。

代碼完成之后我們就可以調用了,這里我寫一個測試類進行調用:

@SpringBootTest public class RabbitProduceTest { @Autowired private RabbitProduce rabbitProduce; @Test public void sendSimpleMessage() { rabbitProduce.send(); rabbitProduce.convertAndSend(); } }

效果如下圖~

SpringBoot+RabbitMQ方式收發消息的實現示例

同時在控制臺使用命令rabbitmqctl.bat list_queues查看隊列-erduo現在的情況:

如此一來,我們的生產者測試就算完成了,現在消息隊列里兩條消息了,而且消息類型肯定不一樣,一個是我們設置的文本類型,一個是自動設置的序列化類型。

3. 消費者與RabbitListener

既然隊列里面已經有消息了,接下來我們就要看我們該如何通過新的方式拿到消息并消費與確認了。

消費者這里我們要用到@RabbitListener來幫我們拿到指定隊列消息,它的用法很簡單也很復雜,我們可以先來說簡單的方式,直接放到方法上,指定監聽的隊列就行了。

@Slf4j @Component('rabbitConsumer') public class RabbitConsumer { @RabbitListener(queues = Producer.QUEUE_NAME) public void onMessage(Message message, Channel channel) throws Exception { System.out.println('Message content : ' + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); System.out.println('消息已確認'); } }

這段代碼就代表onMessage方法會處理erduo(Producer.QUEUE_NAME是常量字符串'erduo')隊列中的消息。

我們可以看到這個方法里面有兩個參數,Message和Channel,如果用不到Channel可以不寫此參數,但是Message消息一定是要的,它代表了消息本身。

我們可以想想,我們的程序從RabbitMQ之中拉回一條條消息之后,要以怎么樣的方式展示給我們呢?

沒錯,就是封裝為一個個Message對象,這里面放入了一條消息的所有信息,數據結構是什么樣一會我一run你就能看到了。

同時這里我們使用Channel做一個消息確認的操作,這里的DeliveryTag代表的是這個消息在隊列中的序號,這個信息存放在MessageProperties中。

4. SpringBoot 啟動!

編寫完生產者和消費者,同時已經運行過生產者往消息隊列里面放了兩條信息,接下來我們可以直接啟動消息,查看消費情況:

SpringBoot+RabbitMQ方式收發消息的實現示例

在我紅色框線標記的地方可以看到,因為我們有了消費者所以項目啟動后先和RabbitMQ建立了一個連接進行監聽隊列。

隨后就開始消費我們隊列中的兩條消息:

第一條信息是contentType=text/plain類型,所以直接就在控制臺上打印出了具體內容。

第二條信息是contentType=application/x-java-serialized-object,在打印的時候只打印了一個內存地址+字節大小。

不管怎么說,數據我們是拿到了,也就是代表我們的消費是沒有問題的,同時也都進行了消息確認操作,從數據上看,整個消息可以分為兩部分:body和MessageProperties。

我們可以單獨使用一個注解拿到這個body的內容 - @Payload

@RabbitListener(queues = Producer.QUEUE_NAME) public void onMessage(@Payload String body, Channel channel) throws Exception { System.out.println('Message content : ' + body); }

也可以單獨使用一個注解拿到MessageProperties的headers屬性,headers屬性在截圖里也可以看到,只不過是個空的 - @Headers。

@RabbitListener(queues = Producer.QUEUE_NAME) public void onMessage(@Payload String body, @Headers Map<String,Object> headers) throws Exception { System.out.println('Message content : ' + body); System.out.println('Message headers : ' + headers); }

這兩個注解都算是擴展知識,我還是更喜歡直接拿到全部,全都要!!!

上面我們已經完成了消息的發送與消費,整個過程我們可以再次回想一下,一切都和我畫的這張圖上一樣的軌跡:

SpringBoot+RabbitMQ方式收發消息的實現示例

只不過我們一直沒有指定Exchage一直使用的默認路由,希望大家好好記住這張圖。

5. @RabbitListener與@RabbitHandler

下面再來補一些知識點,有關@RabbitListener與@RabbitHandler。

@RabbitListener上面我們已經簡單的進行了使用,稍微擴展一下它其實是可以監聽多個隊列的,就像這樣:

@RabbitListener(queues = { 'queue1', 'queue2' }) public void onMessage(Message message, Channel channel) throws Exception { System.out.println('Message content : ' + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false) System.out.println('消息已確認'); }

還有一些其他的特性如綁定之類的,這里不再贅述因為太硬編碼了一般用不上。

下面來說說這節要主要講的一個特性:@RabbitListener和@RabbitHandler的搭配使用。

前面我們沒有提到,@RabbitListener注解其實是可以注解在類上的,這個注解在類上標志著這個類監聽某個隊列或某些隊列。

這兩個注解的搭配使用就要讓@RabbitListener注解在類上,然后用@RabbitHandler注解在方法上,根據方法參數的不同自動識別并去消費,寫個例子給大家看一看更直觀一些。

@Slf4j @Component('rabbitConsumer') @RabbitListener(queues = Producer.QUEUE_NAME) public class RabbitConsumer { @RabbitHandler public void onMessage(@Payload String message){ System.out.println('Message content : ' + message); } @RabbitHandler public void onMessage(@Payload User user) { System.out.println('Message content : ' + user); } }

大家可以看看這個例子,我們先用@RabbitListener監聽erduo隊列中的消息,然后使用@RabbitHandler注解了兩個方法。

第一個方法的body類型是String類型,這就代表著這個方法只能處理文本類型的消息。 第二個方法的body類型是User類型,這就代表著這個方法只能處理序列化類型且為User類型的消息。

這兩個方法正好對應著我們第二節中測試類會發送的兩種消息,所以我們往RabbitMQ中發送兩條測試消息,用來測試這段代碼,看看效果:

SpringBoot+RabbitMQ方式收發消息的實現示例

都在控制臺上如常打印了,如果@RabbitHandler注解的方法中沒有一個的類型可以和你消息的類型對的上,比如消息都是byte數組類型,這里沒有對應的方法去接收,系統就會在控制臺不斷的報錯,如果你出現這個情況就證明你類型寫的不正確。

假設你的erduo隊列中會出現三種類型的消息:byte,文本和序列化,那你就必須要有對應的處理這三種消息的方法,不然消息發過來的時候就會因為無法正確轉換而報錯。

而且使用了@RabbitHandler注解之后就不能再和之前一樣使用Message做接收類型。

@RabbitHandler public void onMessage(Message message, Channel channel) throws Exception { System.out.println('Message content : ' + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); System.out.println('消息已確認'); }

這樣寫的話會報類型轉換異常的,所以二者選其一。

同時上文我的@RabbitHandler沒有進行消息確認,大家可以自己試一下進行消息確認。

6. 消息的序列化轉換

通過上文我們已經知道,能被自動轉換的對象只有byte[]、String、java序列化對象(實現了Serializable接口的對象),但是并不是所有的Java對象都會去實現Serializable接口,而且序列化的過程中使用的是JDK自帶的序列化方法,效率低下。

所以我們更普遍的做法是:使用Jackson先將數據轉換成JSON格式發送給RabbitMQ,再接收消息的時候再用Jackson將數據反序列化出來。

這樣做可以完美解決上面的痛點:消息對象既不必再去實現Serializable接口,也有比較高的效率(Jackson序列化效率業界應該是最好的了)。

默認的消息轉換方案是消息轉換頂層接口-MessageConverter的一個子類:SimpleMessageConverter,我們如果要換到另一個消息轉換器只需要替換掉這個轉換器就行了。

SpringBoot+RabbitMQ方式收發消息的實現示例

上圖是MessageConverter結構樹的結構樹,可以看到除了SimpleMessageConverter之外還有一個Jackson2JsonMessageConverter,我們只需要將它定義為Bean,就可以直接使用這個轉換器了。

@Bean public MessageConverter jackson2JsonMessageConverter() { return new Jackson2JsonMessageConverter(jacksonObjectMapper); }

這樣就可以了,這里的jacksonObjectMapper可以不傳入,但是默認的ObjectMapper方案對JDK8的時間日期序列化會不太友好,具體可以參考我的上一篇文章:從LocalDateTime序列化探討全局一致性序列化,總的來說就是定義了自己的ObjectMapper。

同時為了接下來測試方便,我又定義了一個專門測試JSON序列化的隊列:

@Bean public Queue erduoJson() { // 其三個參數:durable exclusive autoDelete // 一般只設置一下持久化即可 return new Queue('erduo_json',true); }

如此之后就可以進行測試了,先是生產者代碼:

public void sendObject() { Client client = new Client(); System.out.println('Message content : ' + client); rabbitTemplate.convertAndSend(RabbitJsonConsumer.JSON_QUEUE,client); System.out.println('消息發送完畢。'); }

我又重新定義了一個Client對象,它和之前測試使用的User對象成員變量都是一樣的,不一樣的是它沒有實現Serializable接口。

同時為了保留之前的測試代碼,我又新建了一個RabbitJsonConsumer,用于測試JSON序列化的相關消費代碼,里面定義了一個靜態變量:JSON_QUEUE = 'erduo_json';

所以這段代碼是將Client對象作為消息發送到'erduo_json'隊列中去,隨后我們在測試類中run一下進行一次發送。

緊著是消費者代碼:

@Slf4j @Component('rabbitJsonConsumer') @RabbitListener(queues = RabbitJsonConsumer.JSON_QUEUE) public class RabbitJsonConsumer { public static final String JSON_QUEUE = 'erduo_json'; @RabbitHandler public void onMessage(Client client, @Headers Map<String,Object> headers, Channel channel) throws Exception { System.out.println('Message content : ' + client); System.out.println('Message headers : ' + headers); channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false); System.out.println('消息已確認'); } }

有了上文的經驗之后,這段代碼理解起來也是很簡單了吧,同時給出了上一節沒寫的如何在@RabbitHandler模式下進行消息簽收。

我們直接來看看效果:

SpringBoot+RabbitMQ方式收發消息的實現示例

SpringBoot+RabbitMQ方式收發消息的實現示例

在打印的Headers里面,往后翻可以看到contentType=application/json,這個contentType是表明了消息的類型,這里正是說明我們新的消息轉換器生效了,將所有消息都轉換成了JSON類型。

后記

這兩篇講完了RabbitMQ的基本收發消息,包括手動配置和自動配置的兩種方式,這些大家仔細研讀之后應該會對RabbitMQ收發消息沒什么疑問了~

不過我們一直以來發消息時都是使用默認的交換機,下篇將會講述一下RabbitMQ的幾種交換機類型,以及其使用方式。

到此這篇關于SpringBoot+RabbitMQ方式收發消息的實現示例的文章就介紹到這了,更多相關SpringBoot RabbitMQ 收發消息內容請搜索好吧啦網以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持好吧啦網!

標簽: Spring
相關文章:
主站蜘蛛池模板: 美国PARKER齿轮泵,美国PARKER柱塞泵,美国PARKER叶片泵,美国PARKER电磁阀,美国PARKER比例阀-上海维特锐实业发展有限公司二部 | ERP企业管理系统永久免费版_在线ERP系统_OA办公_云版软件官网 | 纯化水设备-纯水设备-超纯水设备-[大鹏水处理]纯水设备一站式服务商-东莞市大鹏水处理科技有限公司 | 有源电力滤波装置-电力有源滤波器-低压穿排电流互感器|安科瑞 | 低温等离子清洗机(双气路进口)-嘉润万丰| 运动木地板厂家_体育木地板安装_篮球木地板选购_实木运动地板价格 | 质检报告_CE认证_FCC认证_SRRC认证_PSE认证_第三方检测机构-深圳市环测威检测技术有限公司 | 菏泽知彼网络科技有限公司 | 净化工程_无尘车间_无尘车间装修-广州科凌净化工程有限公司 | 手持气象站_便携式气象站_农业气象站_负氧离子监测站-山东万象环境 | 立式壁挂广告机厂家-红外电容触摸一体机价格-华邦瀛 | 不锈钢螺丝,不锈钢螺栓,不锈钢标准件-江苏百德特种合金有限公司 交变/复合盐雾试验箱-高低温冲击试验箱_安奈设备产品供应杭州/江苏南京/安徽马鞍山合肥等全国各地 | 浙江栓钉_焊钉_剪力钉厂家批发_杭州八建五金制造有限公司 | CE认证_FCC认证_CCC认证_MFI认证_UN38.3认证-微测检测 CNAS实验室 | 呼末二氧化碳|ETCO2模块采样管_气体干燥管_气体过滤器-湖南纳雄医疗器械有限公司 | 家德利门业,家居安全门,别墅大门 - 安徽家德利门业有限公司 | 玻璃钢罐_玻璃钢储罐_盐酸罐厂家-河北华盛节能设备有限公司 | 单机除尘器 骨架-脉冲除尘器设备生产厂家-润天环保设备 | 沧州友城管业有限公司-内外涂塑钢管-大口径螺旋钢管-涂塑螺旋管-保温钢管生产厂家 | 变色龙PPT-国内原创PPT模板交易平台 - PPT贰零 - 西安聚讯网络科技有限公司 | 真空上料机(一种真空输送机)-百科 | Akribis直线电机_直线模组_力矩电机_直线电机平台|雅科贝思Akribis-杭州摩森机电科技有限公司 | 热风机_工业热风机生产厂家上海冠顶公司提供专业热风机图片价格实惠 | 包装机_厂家_价格-山东包装机有限公司 | 光纤测温-荧光光纤测温系统-福州华光天锐光电科技有限公司 | 威海防火彩钢板,威海岩棉复合板,威海彩钢瓦-文登区九龙岩棉复合板厂 | 等离子表面处理机-等离子表面活化机-真空等离子清洗机-深圳市东信高科自动化设备有限公司 | 今日娱乐圈——影视剧集_八卦娱乐_明星八卦_最新娱乐八卦新闻 | 培训中心-翰香原香酥板栗饼加盟店总部-正宗板栗酥饼技术 | 臭氧实验装置_实验室臭氧发生器-北京同林臭氧装置网 | 首页_欧瑞传动官方网站--主营变频器、伺服系统、新能源、软起动器、PLC、HMI | 东莞压铸厂_精密压铸_锌合金压铸_铝合金压铸_压铸件加工_东莞祥宇金属制品 | 北京公积金代办/租房发票/租房备案-北京金鼎源公积金提取服务中心 | 酒水灌装机-白酒灌装机-酒精果酒酱油醋灌装设备_青州惠联灌装机械 | 北京模型公司-军事模型-工业模型制作-北京百艺模型沙盘公司 | 上海单片机培训|重庆曙海培训分支机构—CortexM3+uC/OS培训班,北京linux培训,Windows驱动开发培训|上海IC版图设计,西安linux培训,北京汽车电子EMC培训,ARM培训,MTK培训,Android培训 | 回转炉,外热式回转窑,回转窑炉-淄博圣元窑炉工程有限公司 | 上海宿田自动化设备有限公司-双面/平面/单面贴标机 | 手术示教系统-数字化手术室系统-林之硕医疗云智能视频平台 | 软瓷_柔性面砖_软瓷砖_柔性石材_MCM软瓷厂家_湖北博悦佳软瓷 | 英国雷迪地下管线探测仪-雷迪RD8100管线仪-多功能数字听漏仪-北京迪瑞进创科技有限公司 |