电脑知识|欧美黑人一区二区三区|软件|欧美黑人一级爽快片淫片高清|系统|欧美黑人狂野猛交老妇|数据库|服务器|编程开发|网络运营|知识问答|技术教程文章 - 好吧啦网

您的位置:首頁技術文章
文章詳情頁

Springboot Websocket Stomp 消息訂閱推送

瀏覽:2日期:2023-02-26 13:46:21
目錄需求背景websocket協議stomp協議需求背景

閑話不扯,直奔主題。需要和web前端建立長鏈接,互相實時通訊,因此想到了websocket,后面隨著需求的變更,需要用戶訂閱主題,實現消息的精準推送,發布訂閱等,則想到了STOMP(Simple Text-Orientated Messaging Protocol) 面向消息的簡單文本協議。

websocket協議

想到了之前寫的一個websocket長鏈接的demo,也貼上代碼供大家參考。

pom文件直接引入spring-boot-starter-websocket即可。

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId></dependency>

聲明websocket endpoint

import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.web.socket.server.standard.ServerEndpointExporter;/** * @ClassName WebSocketConfig * @Author scott * @Date 2021/6/16 * @Version V1.0 **/@Configurationpublic class WebSocketConfig { /** * 注入一個ServerEndpointExporter,該Bean會自動注冊使用@ServerEndpoint注解申明的websocket endpoint */ @Bean public ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter(); }}

websocket實現類,其中通過注解監聽了各種事件,實現了推送消息等相關邏輯

import com.google.common.cache.Cache;import com.google.common.cache.CacheBuilder;import com.ruoyi.common.core.domain.AjaxResult;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;import javax.websocket.*;import javax.websocket.server.PathParam;import javax.websocket.server.ServerEndpoint;import java.util.Objects;import java.util.Set;import java.util.concurrent.ConcurrentMap;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicInteger;/** * @ClassName: DataTypePushWebSocket * @Author: scott * @Date: 2021/6/16**/@ServerEndpoint(value = '/ws/dataType/push/{token}')@Componentpublic class DataTypePushWebSocket { private static final Logger log = LoggerFactory.getLogger(DataTypePushWebSocket.class); /** * 記錄當前在線連接數 */ private static AtomicInteger onlineCount = new AtomicInteger(0); private static Cache<String, Session> SESSION_CACHE = CacheBuilder.newBuilder() .initialCapacity(10) .maximumSize(300) .expireAfterWrite(10, TimeUnit.MINUTES) .build(); /** * 連接建立成功調用的方法 */ @OnOpen public void onOpen(Session session, @PathParam('token')String token) {String sessionId = session.getId();onlineCount.incrementAndGet(); // 在線數加1this.sendMessage('sessionId:' + sessionId +',已經和server建立連接', session);SESSION_CACHE.put(sessionId,session);log.info('有新連接加入:{},當前在線連接數為:{}', session.getId(), onlineCount.get()); } /** * 連接關閉調用的方法 */ @OnClose public void onClose(Session session,@PathParam('token')String token) {onlineCount.decrementAndGet(); // 在線數減1SESSION_CACHE.invalidate(session.getId());log.info('有一連接關閉:{},當前在線連接數為:{}', session.getId(), onlineCount.get()); } /** * 收到客戶端消息后調用的方法 * * @param message 客戶端發送過來的消息 */ @OnMessage public void onMessage(String message, Session session,@PathParam('token')String token) {log.info('服務端收到客戶端[{}]的消息:{}', session.getId(), message);this.sendMessage('服務端已收到推送消息:' + message, session); } @OnError public void onError(Session session, Throwable error) {log.error('發生錯誤');error.printStackTrace(); } /** * 服務端發送消息給客戶端 */ private static void sendMessage(String message, Session toSession) {try { log.info('服務端給客戶端[{}]發送消息{}', toSession.getId(), message); toSession.getBasicRemote().sendText(message);} catch (Exception e) { log.error('服務端發送消息給客戶端失敗:{}', e);} } public static AjaxResult sendMessage(String message, String sessionId){Session session = SESSION_CACHE.getIfPresent(sessionId);if(Objects.isNull(session)){ return AjaxResult.error('token已失效');}sendMessage(message,session);return AjaxResult.success(); } public static AjaxResult sendBroadcast(String message){long size = SESSION_CACHE.size();if(size <=0){ return AjaxResult.error('當前沒有在線客戶端,無法推送消息');}ConcurrentMap<String, Session> sessionConcurrentMap = SESSION_CACHE.asMap();Set<String> keys = sessionConcurrentMap.keySet();for (String key : keys) { Session session = SESSION_CACHE.getIfPresent(key); DataTypePushWebSocket.sendMessage(message,session);}return AjaxResult.success(); }}

至此websocket服務端代碼已經完成。

stomp協議

前端代碼.這個是在某個vue工程中寫的js,各位大佬自己動手改改即可。其中Settings.wsPath是后端定義的ws地址例如ws://localhost:9003/ws

import Stomp from ’stompjs’import Settings from ’@/settings.js’export default { // 是否啟用日志 默認啟用 debug:true, // 客戶端連接信息 stompClient:{}, // 初始化 init(callBack){ this.stompClient = Stomp.client(Settings.wsPath) this.stompClient.hasDebug = this.debug this.stompClient.connect({},suce =>{ this.console('連接成功,信息如下 ↓') this.console(this.stompClient) if(callBack){callBack() } },err => { if(err) {this.console('連接失敗,信息如下 ↓')this.console(err) } }) }, // 訂閱 sub(address,callBack){ if(!this.stompClient.connected){ this.console('沒有連接,無法訂閱') return } // 生成 id let timestamp= new Date().getTime() + address this.console('訂閱成功 -> '+address) this.stompClient.subscribe(address,message => { this.console(address+' 訂閱消息通知,信息如下 ↓') this.console(message) let data = message.body callBack(data) },{ id: timestamp }) }, unSub(address){ if(!this.stompClient.connected){ this.console('沒有連接,無法取消訂閱 -> '+address) return } let id = '' for(let item in this.stompClient.subscriptions){ if(item.endsWith(address)){id = itembreak } } this.stompClient.unsubscribe(id) this.console('取消訂閱成功 -> id:'+ id + ' address:'+address) }, // 斷開連接 disconnect(callBack){ if(!this.stompClient.connected){ this.console('沒有連接,無法斷開連接') return } this.stompClient.disconnect(() =>{ console.log('斷開成功') if(callBack){callBack() } }) }, // 單位 秒 reconnect(time){ setInterval(() =>{ if(!this.stompClient.connected){this.console('重新連接中...')this.init() } },time * 1000) }, console(msg){ if(this.debug){ console.log(msg) } }, // 向訂閱發送消息 send(address,msg) { this.stompClient.send(address,{},msg) }}

后端stomp config,里面都有注釋,寫的很詳細,并且我加入了和前端的心跳ping pong。

package com.cn.scott.config;import org.springframework.context.annotation.Configuration;import org.springframework.messaging.simp.config.MessageBrokerRegistry;import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;import org.springframework.web.socket.config.annotation.StompEndpointRegistry;import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;/** * @ClassName: WebSocketStompConfig * @Author: scott * @Date: 2021/7/8**/@Configuration@EnableWebSocketMessageBrokerpublic class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer { private static long HEART_BEAT=10000; @Override public void registerStompEndpoints(StompEndpointRegistry registry) {//允許使用socketJs方式訪問,訪問點為webSocket,允許跨域//在網頁上我們就可以通過這個鏈接//ws://127.0.0.1:port/ws來和服務器的WebSocket連接registry.addEndpoint('/ws').setAllowedOrigins('*'); } @Override public void configureMessageBroker(MessageBrokerRegistry registry) {ThreadPoolTaskScheduler te = new ThreadPoolTaskScheduler();te.setPoolSize(1);te.setThreadNamePrefix('wss-heartbeat-thread-');te.initialize();//基于內存的STOMP消息代理來代替mq的消息代理//訂閱Broker名稱,/user代表點對點即發指定用戶,/topic代表發布廣播即群發//setHeartbeatValue 設置心跳及心跳時間registry.enableSimpleBroker('/user', '/topic').setHeartbeatValue(new long[]{HEART_BEAT,HEART_BEAT}).setTaskScheduler(te);//點對點使用的訂閱前綴,不設置的話,默認也是/user/registry.setUserDestinationPrefix('/user/'); }}

后端stomp協議接受、訂閱等動作通知

package com.cn.scott.ws;import com.alibaba.fastjson.JSON;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.handler.annotation.DestinationVariable;import org.springframework.messaging.handler.annotation.MessageMapping;import org.springframework.messaging.simp.SimpMessagingTemplate;import org.springframework.messaging.simp.annotation.SubscribeMapping;import org.springframework.web.bind.annotation.RestController;/** * @ClassName StompSocketHandler * @Author scott * @Date 2021/6/30 * @Version V1.0 **/@RestControllerpublic class StompSocketHandler { @Autowired private SimpMessagingTemplate simpMessagingTemplate; /** * @MethodName: subscribeMapping * @Description: 訂閱成功通知 * @Param: [id] * @Return: void * @Author: scott * @Date: 2021/6/30 **/ @SubscribeMapping('/user/{id}/listener') public void subscribeMapping(@DestinationVariable('id') final long id) {System.out.println('>>>>>>用戶:'+id +',已訂閱');SubscribeMsg param = new SubscribeMsg(id,String.format('用戶【%s】已訂閱成功', id));sendToUser(param); } /** * @MethodName: test * @Description: 接收訂閱topic消息 * @Param: [id, msg] * @Return: void * @Author: scott * @Date: 2021/6/30 **/ @MessageMapping(value = '/user/{id}/listener') public void UserSubListener(@DestinationVariable long id, String msg) {System.out.println('收到客戶端:' +id+',的消息');SubscribeMsg param = new SubscribeMsg(id,String.format('已收到用戶【%s】發送消息【%s】', id,msg));sendToUser(param); } @GetMapping('/refresh/{userId}') public void refresh(@PathVariable Long userId, String msg) {StompSocketHandler.SubscribeMsg param = new StompSocketHandler.SubscribeMsg(userId,String.format('服務端向用戶【%s】發送消息【%s】', userId,msg));sendToUser(param); } /** * @MethodName: sendToUser * @Description: 推送消息給訂閱用戶 * @Param: [userId] * @Return: void * @Author: scott * @Date: 2021/6/30 **/ public void sendToUser(SubscribeMsg screenChangeMsg){//這里可以控制權限等。。。simpMessagingTemplate.convertAndSendToUser(screenChangeMsg.getUserId().toString(),'/listener', JSON.toJSONString(screenChangeMsg)); } /** * @MethodName: sendBroadCast * @Description: 發送廣播,需要用戶事先訂閱廣播 * @Param: [topic, msg] * @Return: void * @Author: scott * @Date: 2021/6/30 **/ public void sendBroadCast(String topic,String msg){simpMessagingTemplate.convertAndSend(topic,msg); } /** * @ClassName: SubMsg * @Author: scott * @Date: 2021/6/30 **/ public static class SubscribeMsg {private Long userId;private String msg;public SubscribeMsg(Long UserId, String msg){ this.userId = UserId; this.msg = msg;}public Long getUserId() { return userId;}public String getMsg() { return msg;} }}

連接展示

建立連接成功,這里可以看出是基于websocket協議

Springboot Websocket Stomp 消息訂閱推送

連接信息

Springboot Websocket Stomp 消息訂閱推送

ping pong

Springboot Websocket Stomp 消息訂閱推送

調用接口向訂閱用戶1發送消息,http://localhost:9003/refresh/1?msg=HelloStomp,可以在客戶端控制臺查看已經收到了消息。這個時候不同用戶通過自己的userId可以區分訂閱的主題,可以做到通過userId精準的往客戶端推送消息。

Springboot Websocket Stomp 消息訂閱推送

還記得我們在后端配置的時候還指定了廣播的訂閱主題/topic,這時我們前端通過js只要訂閱了這個主題,那么后端在像這個主題推送消息時,所有訂閱的客戶端都能收到,感興趣的小伙伴可以自己試試,api我都寫好了。

Springboot Websocket Stomp 消息訂閱推送

至此,實戰完畢,喜歡的小伙伴麻煩關注加點贊。

springboot + stomp后端源碼地址:https://gitee.com/ErGouGeSiBaKe/stomp-server

到此這篇關于Springboot Websocket Stomp 消息訂閱推送的文章就介紹到這了,更多相關Springboot Websocket Stomp 消息訂閱推送內容請搜索好吧啦網以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持好吧啦網!

標簽: Spring
相關文章:
主站蜘蛛池模板: 欧美日韩国产一区二区三区不_久久久久国产精品无码不卡_亚洲欧洲美洲无码精品AV_精品一区美女视频_日韩黄色性爱一级视频_日本五十路人妻斩_国产99视频免费精品是看4_亚洲中文字幕无码一二三四区_国产小萍萍挤奶喷奶水_亚洲另类精品无码在线一区 | LCD3D打印机|教育|桌面|光固化|FDM3D打印机|3D打印设备-广州造维科技有限公司 | 消泡剂-水处理消泡剂-涂料消泡剂-切削液消泡剂价格-东莞德丰消泡剂厂家 | 团建-拓展-拓展培训-拓展训练-户外拓展训练基地[无锡劲途] | 凝胶成像仪,化学发光凝胶成像系统,凝胶成像分析系统-上海培清科技有限公司 | 微水泥_硅藻泥_艺术涂料_艺术漆_艺术漆加盟-青岛泥之韵环保壁材 武汉EPS线条_EPS装饰线条_EPS构件_湖北博欧EPS线条厂家 | 智能垃圾箱|垃圾房|垃圾分类亭|垃圾分类箱专业生产厂家定做-宿迁市传宇环保设备有限公司 | 河南卓美创业科技有限公司-河南卓美防雷公司-防雷接地-防雷工程-重庆避雷针-避雷器-防雷检测-避雷带-避雷针-避雷塔、机房防雷、古建筑防雷等-山西防雷公司 | 康明斯发电机,上柴柴油发电机,玉柴柴油发电机组_海南重康电力官网 | 浙江自考_浙江自学考试网| 医学动画公司-制作3d医学动画视频-医疗医学演示动画制作-医学三维动画制作公司 | 门禁卡_智能IC卡_滴胶卡制作_硅胶腕带-卡立方rfid定制厂家 | 石膏基自流平砂浆厂家-高强石膏基保温隔声自流平-轻质抹灰石膏粉砂浆批发-永康市汇利建设有限公司 | 回转支承-转盘轴承-回转驱动生产厂家-洛阳隆达轴承有限公司 | 筛分机|振动筛分机|气流筛分机|筛分机厂家-新乡市大汉振动机械有限公司 | SF6环境监测系统-接地环流在线监测装置-瑟恩实业 | 橡胶电子拉力机-塑料-微电脑电子拉力试验机厂家-江苏天源 | 单电机制砂机,BHS制砂机,制沙机设备,制砂机价格-正升制砂机厂家 单级/双级旋片式真空泵厂家,2xz旋片真空泵-浙江台州求精真空泵有限公司 | 立刷【微电签pos机】-嘉联支付立刷运营中心| 塑料造粒机「厂家直销」-莱州鑫瑞迪机械有限公司 | 烟气换热器_GGH烟气换热器_空气预热器_高温气气换热器-青岛康景辉 | 纯化水设备-EDI-制药-实验室-二级反渗透-高纯水|超纯水设备 | 武汉刮刮奖_刮刮卡印刷厂_为企业提供门票印刷_武汉合格证印刷_现金劵代金券印刷制作 - 武汉泽雅印刷有限公司 | 全自动面膜机_面膜折叠机价格_面膜灌装机定制_高速折棉机厂家-深圳市益豪科技有限公司 | 混合生育酚_醋酸生育酚粉_琥珀酸生育酚-山东新元素生物科技 | 万师讲师网-优质讲师培训师供应商,讲师认证,找讲师来万师 | 湖南印刷厂|长沙印刷公司|画册印刷|挂历印刷|台历印刷|杂志印刷-乐成印刷 | 沈阳液压泵_沈阳液压阀_沈阳液压站-沈阳海德太科液压设备有限公司 | 南京展台搭建-南京展会设计-南京展览设计公司-南京展厅展示设计-南京汇雅展览工程有限公司 | 武汉印刷厂-不干胶标签印刷厂-武汉不干胶印刷-武汉标签印刷厂-武汉标签制作 - 善进特种标签印刷厂 | 蜂窝块状沸石分子筛-吸附脱硫分子筛-萍乡市捷龙环保科技有限公司 | 首页 - 军军小站|张军博客| 金联宇电缆|广东金联宇电缆厂家_广东金联宇电缆实业有限公司 | 分轨 | 上传文件,即刻分离人声和伴奏 | 蒸汽热收缩机_蒸汽发生器_塑封机_包膜机_封切收缩机_热收缩包装机_真空机_全自动打包机_捆扎机_封箱机-东莞市中堡智能科技有限公司 | 转子泵_凸轮泵_凸轮转子泵厂家-青岛罗德通用机械设备有限公司 | 刹车盘机床-刹车盘生产线-龙口亨嘉智能装备| 一体化隔油提升设备-餐饮油水分离器-餐厨垃圾处理设备-隔油池-盐城金球环保产业发展有限公司 | 二维运动混料机,加热型混料机,干粉混料机-南京腾阳干燥设备厂 | 激光内雕_led玻璃_发光玻璃_内雕玻璃_导光玻璃-石家庄明晨三维科技有限公司 激光内雕-内雕玻璃-发光玻璃 | 电动葫芦|环链电动葫芦-北京凌鹰名优起重葫芦 |