电脑知识|欧美黑人一区二区三区|软件|欧美黑人一级爽快片淫片高清|系统|欧美黑人狂野猛交老妇|数据库|服务器|编程开发|网络运营|知识问答|技术教程文章 - 好吧啦网

您的位置:首頁(yè)技術(shù)文章
文章詳情頁(yè)

Springboot RocketMq實(shí)現(xiàn)過(guò)程詳解

瀏覽:125日期:2023-05-16 13:09:35

首先,在虛擬機(jī)上安裝rocketmq和rocketMq可視化控制,安裝不做描述。

1、pom.xml文件添加依賴(lài)

mq的版本與連接的rocketmq版本保持一致

<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-remoting</artifactId> <version>4.4.0</version> </dependency>

2、yml文件添加rocketmq配置

apache: rocketmq: #消費(fèi)者的配置 consumer: pushConsumer: myConsumer #生產(chǎn)者的配置 producer: producerGroup: myGroup namesrvAddr: 192.168.233.128:9876

3、生產(chǎn)者類(lèi)RocketProducer

package com.zp.springbootdemo.rocketmq;import com.alibaba.fastjson.JSONObject;import com.sun.org.apache.xpath.internal.objects.XString;import org.apache.rocketmq.client.exception.MQBrokerException;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.common.RemotingHelper;import org.apache.rocketmq.remoting.exception.RemotingException;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;import org.springframework.util.StopWatch;import javax.annotation.PostConstruct;import java.io.UnsupportedEncodingException;/** * @Author zp * @Description rocketmq生產(chǎn)者 * @Date 22:06 2020/5/22 * @Param * @return **/@Componentpublic class RocketProducer { /** * 生產(chǎn)者的組名 */ @Value('${apache.rocketmq.producer.producerGroup}') private String producerGroup; /** * NameServer 地址 */ @Value('${apache.rocketmq.namesrvAddr}') private String namesrvAddr; private DefaultMQProducer defaultMQProducer; @PostConstruct public void defaultMQProducer(){ //生產(chǎn)者的組名 defaultMQProducer = new DefaultMQProducer(producerGroup); defaultMQProducer.setNamesrvAddr(namesrvAddr); defaultMQProducer.setVipChannelEnabled(false); try { defaultMQProducer.start(); System.out.println('producer啟動(dòng)了。。。'); } catch (MQClientException e) { e.printStackTrace(); } } public String send(String topic,String tags,String body) throws UnsupportedEncodingException, InterruptedException, RemotingException, MQClientException, MQBrokerException { Message message = new Message(topic,tags,body.getBytes(RemotingHelper.DEFAULT_CHARSET)); StopWatch stop = new StopWatch(); stop.start(); SendResult result = defaultMQProducer.send(message); System.out.println('發(fā)送響應(yīng):MsgId:' + result.getMsgId() + ',發(fā)送狀態(tài):' + result.getSendStatus()); JSONObject jsonObject = new JSONObject(); jsonObject.put('msgId',result.getMsgId()); jsonObject.put('sendStatus',result.getSendStatus()); stop.stop(); return jsonObject.toJSONString(); }}

4、消費(fèi)者類(lèi)RocketConsumer

package com.zp.springbootdemo.rocketmq;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.consumer.ConsumeFromWhere;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.CommandCustomHeader;import org.springframework.beans.factory.annotation.Value;import org.springframework.boot.CommandLineRunner;import org.springframework.stereotype.Component;/** * @Author zp * @Description rocketmq消費(fèi)者 * @Date 22:33 2020/5/22 * @Param * @return **/@Componentpublic class RockerConsumer implements CommandLineRunner { /** * 消費(fèi)者 */ @Value('${apache.rocketmq.consumer.pushConsumer}') private String pushConsumer; //myConsumer /** * NameServer 地址 */ @Value('${apache.rocketmq.namesrvAddr}') private String namesrvAddr; /** * 初始化RocketMq的監(jiān)聽(tīng)信息,渠道信息 */ public void messageListener(){ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(pushConsumer); consumer.setNamesrvAddr(namesrvAddr); try { // 訂閱PushTopic下Tag為push的消息,都訂閱消息 consumer.subscribe('firstTopic','push'); // 程序第一次啟動(dòng)從消息隊(duì)列頭獲取數(shù)據(jù) consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //可以修改每次消費(fèi)消息的數(shù)量,默認(rèn)設(shè)置是每次消費(fèi)一條 consumer.setConsumeMessageBatchMaxSize(1); //在此監(jiān)聽(tīng)中消費(fèi)信息,并返回消費(fèi)的狀態(tài)信息 consumer.registerMessageListener((MessageListenerConcurrently) (msgs,context)->{// 會(huì)把不同的消息分別放置到不同的隊(duì)列中for (Message msg:msgs){ System.out.println('接收到了消息:'+new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); } catch (Exception e) { e.printStackTrace(); } } /** * Callback used to run the bean. * * @param args incoming main method arguments * @throws Exception on error */ @Override public void run(String... args) throws Exception { this.messageListener(); }}

5、controller中編寫(xiě)發(fā)送消息

package com.zp.springbootdemo.rocketmq;import org.apache.rocketmq.client.exception.MQBrokerException;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.remoting.exception.RemotingException;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import java.io.UnsupportedEncodingException;@RestController@RequestMapping('/rocketMq')public class MQController { @Autowired private RocketProducer producer; @RequestMapping('/myFirstProducer') public String pushMsg(String msg){ try { System.out.println('======'+msg); return producer.send('firstTopic','push',msg); } catch (InterruptedException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (MQClientException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return 'ERROR'; }}

6.測(cè)試

請(qǐng)求地址:http://127.0.0.1:8080/rocketMq/myFirstProducer?msg=hello

響應(yīng):{'msgId':'C0A8010E1A3818B4AAC2711E8CD50000','sendStatus':'SEND_OK'}

通過(guò)rocketMq可視化控制查看:

Springboot RocketMq實(shí)現(xiàn)過(guò)程詳解

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持好吧啦網(wǎng)。

標(biāo)簽: Spring
相關(guān)文章:
主站蜘蛛池模板: 复合土工膜厂家|hdpe防渗土工膜|复合防渗土工布|玻璃纤维|双向塑料土工格栅-安徽路建新材料有限公司 | 水平垂直燃烧试验仪-灼热丝试验仪-漏电起痕试验仪-针焰试验仪-塑料材料燃烧检测设备-IP防水试验机 | 诗词大全-古诗名句 - 古诗词赏析| 西门子气候补偿器,锅炉气候补偿器-陕西沃信机电工程有限公司 | 承插管件_不锈钢承插管件_锻钢高压管件-温州科正阀门管件有限公司 | 挨踢网-大家的导航!| 锤式粉碎机,医药粉碎机,锥式粉碎机-无锡市迪麦森机械制造有限公司 | 山东氧化铁红,山东铁红-淄博科瑞化工有限公司 | 广东机电安装工程_中央空调工程_东莞装饰装修-广东粤标建设有限公司 | 工业铝型材-铝合金电机壳-铝排-气动执行器-山东永恒能源集团有限公司 | 六维力传感器_三维力传感器_二维力传感器-南京神源生智能科技有限公司 | 小型手持气象站-空气负氧离子监测站-多要素微气象传感器-山东天合环境科技有限公司 | 学生作文网_中小学生作文大全与写作指导| 防爆电机生产厂家,YBK3电动机,YBX3系列防爆电机,YBX4节防爆电机--河南省南洋防爆电机有限公司 | 工控机,嵌入式主板,工业主板,arm主板,图像采集卡,poe网卡,朗锐智科 | 润东方环保空调,冷风机,厂房车间降温设备-20年深圳环保空调生产厂家 | 脑钠肽-白介素4|白介素8试剂盒-研域(上海)化学试剂有限公司 | 胶原检测试剂盒,弹性蛋白检测试剂盒,类克ELISA试剂盒,阿达木单抗ELISA试剂盒-北京群晓科苑生物技术有限公司 | 玻璃瓶厂家_酱菜瓶厂家_饮料瓶厂家_酒瓶厂家_玻璃杯厂家_徐州东明玻璃制品有限公司 | 拉力机-万能试验机-材料拉伸试验机-电子拉力机-拉力试验机厂家-冲击试验机-苏州皖仪实验仪器有限公司 | 双舌接地线-PC68数字式高阻计-ZC36|苏海百科 | 深圳诚暄fpc首页-柔性线路板,fpc柔性线路板打样生产厂家 | 安全阀_弹簧式安全阀_美标安全阀_工业冷冻安全阀厂家-中国·阿司米阀门有限公司 | 圆窗水平仪|伊莉莎冈特elesa+ganter | 齿轮减速马达一体式_蜗轮蜗杆减速机配电机-德国BOSERL齿轮减速电动机生产厂家 | 定做大型恒温循环水浴槽-工业用不锈钢恒温水箱-大容量低温恒温水槽-常州精达仪器 | 精密交叉滚子轴承厂家,转盘轴承,YRT转台轴承-洛阳千协轴承 | 内窥镜-工业内窥镜厂家【上海修远仪器仪表有限公司】 | 无菌水质袋-NASCO食品无菌袋-Whirl-Pak无菌采样袋-深圳市慧普德贸易有限公司 | 模具ERP_模具管理系统_模具mes_模具进度管理_东莞市精纬软件有限公司 | 快干水泥|桥梁伸缩缝止水胶|伸缩缝装置生产厂家-广东广航交通科技有限公司 | 粉丝机械,粉丝烘干机,粉丝生产线-招远市远东粉丝机械有限公司 | 北京亦庄厂房出租_经开区产业园招商信息平台 | 粉末冶金注射成型厂家|MIM厂家|粉末冶金齿轮|MIM零件-深圳市新泰兴精密科技 | 视觉检测设备_自动化检测设备_CCD视觉检测机_外观缺陷检测-瑞智光电 | 篮球地板厂家_舞台木地板品牌_体育运动地板厂家_凯洁地板 | 学校用栓剂模,玻璃瓶轧盖钳,小型安瓿熔封机,实验室安瓿熔封机-长沙中亚制药设备有限公司 | 陕西高职单招-陕西高职分类考试网 | 定量包装秤,吨袋包装称,伸缩溜管,全自动包装秤,码垛机器人,无锡市邦尧机械工程有限公司 | 深圳货架厂家_金丽声精品货架_广东金丽声展示设备有限公司官网 | 深圳成考网-深圳成人高考报名网 深圳工程师职称评定条件及流程_深圳职称评审_职称评审-职称网 |