电脑知识|欧美黑人一区二区三区|软件|欧美黑人一级爽快片淫片高清|系统|欧美黑人狂野猛交老妇|数据库|服务器|编程开发|网络运营|知识问答|技术教程文章 - 好吧啦网

您的位置:首頁(yè)技術(shù)文章
文章詳情頁(yè)

SpringBoot+Netty+WebSocket實(shí)現(xiàn)消息發(fā)送的示例代碼

瀏覽:2日期:2023-04-22 13:14:29

一.導(dǎo)入Netty依賴

<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.25.Final</version> </dependency>

二.搭建websocket服務(wù)器

@Componentpublic class WebSocketServer { /** * 主線程池 */ private EventLoopGroup bossGroup; /** * 工作線程池 */ private EventLoopGroup workerGroup; /** * 服務(wù)器 */ private ServerBootstrap server; /** * 回調(diào) */ private ChannelFuture future; public void start() { future = server.bind(9001); System.out.println('netty server - 啟動(dòng)成功'); } public WebSocketServer() { bossGroup = new NioEventLoopGroup(); workerGroup = new NioEventLoopGroup(); server = new ServerBootstrap(); server.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new WebsocketInitializer()); }}

三.初始化Websocket

public class WebsocketInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // ------------------ // 用于支持Http協(xié)議 // ------------------ // websocket基于http協(xié)議,需要有http的編解碼器 pipeline.addLast(new HttpServerCodec()); // 對(duì)寫大數(shù)據(jù)流的支持 pipeline.addLast(new ChunkedWriteHandler()); // 添加對(duì)HTTP請(qǐng)求和響應(yīng)的聚合器:只要使用Netty進(jìn)行Http編程都需要使用 //設(shè)置單次請(qǐng)求的文件的大小 pipeline.addLast(new HttpObjectAggregator(1024 * 64)); //webSocket 服務(wù)器處理的協(xié)議,用于指定給客戶端連接訪問(wèn)的路由 :/ws pipeline.addLast(new WebSocketServerProtocolHandler('/ws')); // 添加Netty空閑超時(shí)檢查的支持 // 1. 讀空閑超時(shí)(超過(guò)一定的時(shí)間會(huì)發(fā)送對(duì)應(yīng)的事件消息) // 2. 寫空閑超時(shí) // 3. 讀寫空閑超時(shí) pipeline.addLast(new IdleStateHandler(4, 8, 12)); //添加心跳處理 pipeline.addLast(new HearBeatHandler()); // 添加自定義的handler pipeline.addLast(new ChatHandler()); }}

四.創(chuàng)建Netty監(jiān)聽器

@Componentpublic class NettyListener implements ApplicationListener<ContextRefreshedEvent> { @Resource private WebSocketServer websocketServer; @Override public void onApplicationEvent(ContextRefreshedEvent event) { if(event.getApplicationContext().getParent() == null) { try { websocketServer.start(); } catch (Exception e) { e.printStackTrace(); } } }}

五.建立消息通道

public class UserChannelMap { /** * 用戶保存用戶id與通道的Map對(duì)象 */// private static Map<String, Channel> userChannelMap; /* static { userChannelMap = new HashMap<String, Channel>(); }*/ /** * 定義一個(gè)channel組,管理所有的channel * GlobalEventExecutor.INSTANCE 是全局的事件執(zhí)行器,是一個(gè)單例 */ private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); /** * 存放用戶與Chanel的對(duì)應(yīng)信息,用于給指定用戶發(fā)送消息 */ private static ConcurrentHashMap<String,Channel> userChannelMap = new ConcurrentHashMap<>(); private UserChannelMap(){} /** * 添加用戶id與channel的關(guān)聯(lián) * @param userNum * @param channel */ public static void put(String userNum, Channel channel) { userChannelMap.put(userNum, channel); } /** * 根據(jù)用戶id移除用戶id與channel的關(guān)聯(lián) * @param userNum */ public static void remove(String userNum) { userChannelMap.remove(userNum); } /** * 根據(jù)通道id移除用戶與channel的關(guān)聯(lián) * @param channelId 通道的id */ public static void removeByChannelId(String channelId) { if(!StringUtils.isNotBlank(channelId)) { return; } for (String s : userChannelMap.keySet()) { Channel channel = userChannelMap.get(s); if(channelId.equals(channel.id().asLongText())) { System.out.println('客戶端連接斷開,取消用戶' + s + '與通道' + channelId + '的關(guān)聯(lián)'); userChannelMap.remove(s); UserService userService = SpringUtil.getBean(UserService.class); userService.logout(s); break; } } } /** * 打印所有的用戶與通道的關(guān)聯(lián)數(shù)據(jù) */ public static void print() { for (String s : userChannelMap.keySet()) { System.out.println('用戶id:' + s + ' 通道:' + userChannelMap.get(s).id()); } } /** * 根據(jù)好友id獲取對(duì)應(yīng)的通道 * @param receiverNum 接收人編號(hào) * @return Netty通道 */ public static Channel get(String receiverNum) { return userChannelMap.get(receiverNum); } /** * 獲取channel組 * @return */ public static ChannelGroup getChannelGroup() { return channelGroup; } /** * 獲取用戶channel map * @return */ public static ConcurrentHashMap<String,Channel> getUserChannelMap(){ return userChannelMap; }}

六.自定義消息類型

public class Message { /** * 消息類型 */ private Integer type; /** * 聊天消息 */ private String message; /** * 擴(kuò)展消息字段 */ private Object ext; public Integer getType() { return type; } public void setType(Integer type) { this.type = type; } public MarketChatRecord getChatRecord() { return marketChatRecord; } public void setChatRecord(MarketChatRecord chatRecord) { this.marketChatRecord = chatRecord; } public Object getExt() { return ext; } public void setExt(Object ext) { this.ext = ext; } @Override public String toString() { return 'Message{' + 'type=' + type + ', marketChatRecord=' + marketChatRecord + ', ext=' + ext + ’}’; }}

七.創(chuàng)建處理消息的handler

public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class); /** * 用來(lái)保存所有的客戶端連接 */ private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); /** *當(dāng)Channel中有新的事件消息會(huì)自動(dòng)調(diào)用 */ @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { // 當(dāng)接收到數(shù)據(jù)后會(huì)自動(dòng)調(diào)用 // 獲取客戶端發(fā)送過(guò)來(lái)的文本消息 Gson gson = new Gson(); log.info('服務(wù)器收到消息:{}',msg.text()); System.out.println('接收到消息數(shù)據(jù)為:' + msg.text()); Message message = gson.fromJson(msg.text(), Message.class); //根據(jù)業(yè)務(wù)要求進(jìn)行消息處理 switch (message.getType()) { // 處理客戶端連接的消息 case 0: // 建立用戶與通道的關(guān)聯(lián) // 處理客戶端發(fā)送好友消息 break; case 1: // 處理客戶端的簽收消息 break; case 2: // 將消息記錄設(shè)置為已讀 break; case 3: // 接收心跳消息 break; default: break; } } // 當(dāng)有新的客戶端連接服務(wù)器之后,會(huì)自動(dòng)調(diào)用這個(gè)方法 @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { log.info('handlerAdded 被調(diào)用'+ctx.channel().id().asLongText()); // 添加到channelGroup 通道組 UserChannelMap.getChannelGroup().add(ctx.channel());// clients.add(ctx.channel()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.info('{異常:}'+cause.getMessage()); // 刪除通道 UserChannelMap.getChannelGroup().remove(ctx.channel()); UserChannelMap.removeByChannelId(ctx.channel().id().asLongText()); ctx.channel().close(); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { log.info('handlerRemoved 被調(diào)用'+ctx.channel().id().asLongText()); //刪除通道 UserChannelMap.getChannelGroup().remove(ctx.channel()); UserChannelMap.removeByChannelId(ctx.channel().id().asLongText()); UserChannelMap.print(); }}

八.處理心跳

public class HearBeatHandler extends ChannelInboundHandlerAdapter { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if(evt instanceof IdleStateEvent) { IdleStateEvent idleStateEvent = (IdleStateEvent)evt; if(idleStateEvent.state() == IdleState.READER_IDLE) { System.out.println('讀空閑事件觸發(fā)...'); } else if(idleStateEvent.state() == IdleState.WRITER_IDLE) { System.out.println('寫空閑事件觸發(fā)...'); } else if(idleStateEvent.state() == IdleState.ALL_IDLE) { System.out.println('---------------'); System.out.println('讀寫空閑事件觸發(fā)'); System.out.println('關(guān)閉通道資源'); ctx.channel().close(); } } }}

搭建完成后調(diào)用測(cè)試

1.頁(yè)面訪問(wèn)http://localhost:9001/ws 2.端口號(hào)9001和訪問(wèn)路徑ws都是我們?cè)谏线吪渲玫模缓髠魅胛覀冏远x的消息message類型。3.大概流程:消息發(fā)送 :用戶1先連接通道,然后發(fā)送消息給用戶2,用戶2若是在線直接可以發(fā)送給用戶,若沒(méi)在線可以將消息暫存在redis或者通道里,用戶2鏈接通道的話,兩者可以直接通訊。消息推送 :用戶1連接通道,根據(jù)通道id查詢要推送的人是否在線,或者推送給所有人,這里我只推送給指定的人。

到此這篇關(guān)于SpringBoot+Netty+WebSocket實(shí)現(xiàn)消息發(fā)送的示例代碼的文章就介紹到這了,更多相關(guān)SpringBoot Netty WebSocket消息發(fā)送內(nèi)容請(qǐng)搜索好吧啦網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持好吧啦網(wǎng)!

標(biāo)簽: Spring
相關(guān)文章:
主站蜘蛛池模板: 【铜排折弯机,钢丝折弯成型机,汽车发泡钢丝折弯机,线材折弯机厂家,线材成型机,铁线折弯机】贝朗折弯机厂家_东莞市贝朗自动化设备有限公司 | 冷水机-工业冷水机-冷水机组-欧科隆品牌保障 | 切铝机-数控切割机-型材切割机-铝型材切割机-【昆山邓氏精密机械有限公司】 | 吸音板,隔音板,吸音材料,吸音板价格,声学材料 - 佛山诺声吸音板厂家 | bng防爆挠性连接管-定做金属防爆挠性管-依客思防爆科技 | 公交驾校-北京公交驾校欢迎您! 工作心得_读书心得_学习心得_找心得体会范文就上学道文库 | 超声波分散机-均质机-萃取仪-超声波涂料分散设备-杭州精浩 | 挤出熔体泵_高温熔体泵_熔体出料泵_郑州海科熔体泵有限公司 | 透平油真空滤油机-变压器油板框滤油机-滤油车-华之源过滤设备 | 薪动-人力资源公司-灵活用工薪资代发-费用结算-残保金优化-北京秒付科技有限公司 | 建筑消防设施检测系统检测箱-电梯**检测仪器箱-北京宇成伟业科技有限责任公司 | 南京精锋制刀有限公司-纵剪机刀片_滚剪机刀片_合金刀片厂家 | 移动厕所租赁|移动卫生间|上海移动厕所租赁-家瑞租赁 | 全自动包衣机-无菌分装隔离器-浙江迦南科技股份有限公司 | 金属清洗剂,防锈油,切削液,磨削液-青岛朗力防锈材料有限公司 | 旋转滴界面张力仪(张力测定仪器)-百科| 恒温油槽-恒温水槽-低温恒温槽厂家-宁波科麦仪器有限公司 | 北京三友信电子科技有限公司-ETC高速自动栏杆机|ETC机柜|激光车辆轮廓测量仪|嵌入式车道控制器 | 一路商机网-品牌招商加盟优选平台-加盟店排行榜平台 | 一点车讯-汽车网站,每天一点最新车讯! | 手板_手板模型制作_cnc手板加工厂-东莞天泓 | 环氧乙烷灭菌器_压力蒸汽灭菌器_低温等离子过氧化氢灭菌器 _低温蒸汽甲醛灭菌器_清洗工作站_医用干燥柜_灭菌耗材-环氧乙烷灭菌器_脉动真空压力蒸汽灭菌器_低温等离子灭菌设备_河南省三强医疗器械有限责任公司 | 楼承板-开闭口楼承板-无锡海逵楼承板 | 齿轮减速机_齿轮减速电机-VEMT蜗轮蜗杆减速机马达生产厂家瓦玛特传动瑞环机电 | 活性炭厂家-蜂窝活性炭-粉状/柱状/果壳/椰壳活性炭-大千净化-活性炭 | 广东青藤环境科技有限公司-水质检测 | 海外仓系统|国际货代系统|退货换标系统|WMS仓储系统|海豚云 | 便携式XPDM露点仪-在线式防爆露点仪-增强型烟气分析仪-约克仪器 冰雕-冰雪世界-大型冰雕展制作公司-赛北冰雕官网 | 上海软件开发-上海软件公司-软件外包-企业软件定制开发公司-咏熠科技 | 酒店厨房设计_中央厨房设计_北京商用厨房设计公司-奇能商厨 | 滑石粉,滑石粉厂家,超细滑石粉-莱州圣凯滑石有限公司 | 胶水,胶粘剂,AB胶,环氧胶,UV胶水,高温胶,快干胶,密封胶,结构胶,电子胶,厌氧胶,高温胶水,电子胶水-东莞聚力-聚厉胶粘 | 间苯二酚,间苯二酚厂家-淄博双和化工| 橡胶接头_橡胶软接头_可曲挠橡胶接头-巩义市创伟机械制造有限公司 | 热风机_工业热风机生产厂家上海冠顶公司提供专业热风机图片价格实惠 | 液氮罐(生物液氮罐)百科-无锡爱思科| 东莞海恒试验仪器设备有限公司| 泰来华顿液氮罐,美国MVE液氮罐,自增压液氮罐,定制液氮生物容器,进口杜瓦瓶-上海京灿精密机械有限公司 | 雷冲击高压发生器-水内冷直流高压发生器-串联谐振分压器-武汉特高压电力科技有限公司 | 交变/复合盐雾试验箱-高低温冲击试验箱_安奈设备产品供应杭州/江苏南京/安徽马鞍山合肥等全国各地 | Honsberg流量计-Greisinger真空表-气压计-上海欧臻机电设备有限公司 |