SpringBoot+Netty+WebSocket實(shí)現(xiàn)消息發(fā)送的示例代碼
一.導(dǎo)入Netty依賴
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.25.Final</version> </dependency>
二.搭建websocket服務(wù)器
@Componentpublic class WebSocketServer { /** * 主線程池 */ private EventLoopGroup bossGroup; /** * 工作線程池 */ private EventLoopGroup workerGroup; /** * 服務(wù)器 */ private ServerBootstrap server; /** * 回調(diào) */ private ChannelFuture future; public void start() { future = server.bind(9001); System.out.println('netty server - 啟動(dòng)成功'); } public WebSocketServer() { bossGroup = new NioEventLoopGroup(); workerGroup = new NioEventLoopGroup(); server = new ServerBootstrap(); server.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new WebsocketInitializer()); }}
三.初始化Websocket
public class WebsocketInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // ------------------ // 用于支持Http協(xié)議 // ------------------ // websocket基于http協(xié)議,需要有http的編解碼器 pipeline.addLast(new HttpServerCodec()); // 對(duì)寫大數(shù)據(jù)流的支持 pipeline.addLast(new ChunkedWriteHandler()); // 添加對(duì)HTTP請(qǐng)求和響應(yīng)的聚合器:只要使用Netty進(jìn)行Http編程都需要使用 //設(shè)置單次請(qǐng)求的文件的大小 pipeline.addLast(new HttpObjectAggregator(1024 * 64)); //webSocket 服務(wù)器處理的協(xié)議,用于指定給客戶端連接訪問(wèn)的路由 :/ws pipeline.addLast(new WebSocketServerProtocolHandler('/ws')); // 添加Netty空閑超時(shí)檢查的支持 // 1. 讀空閑超時(shí)(超過(guò)一定的時(shí)間會(huì)發(fā)送對(duì)應(yīng)的事件消息) // 2. 寫空閑超時(shí) // 3. 讀寫空閑超時(shí) pipeline.addLast(new IdleStateHandler(4, 8, 12)); //添加心跳處理 pipeline.addLast(new HearBeatHandler()); // 添加自定義的handler pipeline.addLast(new ChatHandler()); }}
四.創(chuàng)建Netty監(jiān)聽器
@Componentpublic class NettyListener implements ApplicationListener<ContextRefreshedEvent> { @Resource private WebSocketServer websocketServer; @Override public void onApplicationEvent(ContextRefreshedEvent event) { if(event.getApplicationContext().getParent() == null) { try { websocketServer.start(); } catch (Exception e) { e.printStackTrace(); } } }}
五.建立消息通道
public class UserChannelMap { /** * 用戶保存用戶id與通道的Map對(duì)象 */// private static Map<String, Channel> userChannelMap; /* static { userChannelMap = new HashMap<String, Channel>(); }*/ /** * 定義一個(gè)channel組,管理所有的channel * GlobalEventExecutor.INSTANCE 是全局的事件執(zhí)行器,是一個(gè)單例 */ private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); /** * 存放用戶與Chanel的對(duì)應(yīng)信息,用于給指定用戶發(fā)送消息 */ private static ConcurrentHashMap<String,Channel> userChannelMap = new ConcurrentHashMap<>(); private UserChannelMap(){} /** * 添加用戶id與channel的關(guān)聯(lián) * @param userNum * @param channel */ public static void put(String userNum, Channel channel) { userChannelMap.put(userNum, channel); } /** * 根據(jù)用戶id移除用戶id與channel的關(guān)聯(lián) * @param userNum */ public static void remove(String userNum) { userChannelMap.remove(userNum); } /** * 根據(jù)通道id移除用戶與channel的關(guān)聯(lián) * @param channelId 通道的id */ public static void removeByChannelId(String channelId) { if(!StringUtils.isNotBlank(channelId)) { return; } for (String s : userChannelMap.keySet()) { Channel channel = userChannelMap.get(s); if(channelId.equals(channel.id().asLongText())) { System.out.println('客戶端連接斷開,取消用戶' + s + '與通道' + channelId + '的關(guān)聯(lián)'); userChannelMap.remove(s); UserService userService = SpringUtil.getBean(UserService.class); userService.logout(s); break; } } } /** * 打印所有的用戶與通道的關(guān)聯(lián)數(shù)據(jù) */ public static void print() { for (String s : userChannelMap.keySet()) { System.out.println('用戶id:' + s + ' 通道:' + userChannelMap.get(s).id()); } } /** * 根據(jù)好友id獲取對(duì)應(yīng)的通道 * @param receiverNum 接收人編號(hào) * @return Netty通道 */ public static Channel get(String receiverNum) { return userChannelMap.get(receiverNum); } /** * 獲取channel組 * @return */ public static ChannelGroup getChannelGroup() { return channelGroup; } /** * 獲取用戶channel map * @return */ public static ConcurrentHashMap<String,Channel> getUserChannelMap(){ return userChannelMap; }}
六.自定義消息類型
public class Message { /** * 消息類型 */ private Integer type; /** * 聊天消息 */ private String message; /** * 擴(kuò)展消息字段 */ private Object ext; public Integer getType() { return type; } public void setType(Integer type) { this.type = type; } public MarketChatRecord getChatRecord() { return marketChatRecord; } public void setChatRecord(MarketChatRecord chatRecord) { this.marketChatRecord = chatRecord; } public Object getExt() { return ext; } public void setExt(Object ext) { this.ext = ext; } @Override public String toString() { return 'Message{' + 'type=' + type + ', marketChatRecord=' + marketChatRecord + ', ext=' + ext + ’}’; }}
七.創(chuàng)建處理消息的handler
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class); /** * 用來(lái)保存所有的客戶端連接 */ private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); /** *當(dāng)Channel中有新的事件消息會(huì)自動(dòng)調(diào)用 */ @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { // 當(dāng)接收到數(shù)據(jù)后會(huì)自動(dòng)調(diào)用 // 獲取客戶端發(fā)送過(guò)來(lái)的文本消息 Gson gson = new Gson(); log.info('服務(wù)器收到消息:{}',msg.text()); System.out.println('接收到消息數(shù)據(jù)為:' + msg.text()); Message message = gson.fromJson(msg.text(), Message.class); //根據(jù)業(yè)務(wù)要求進(jìn)行消息處理 switch (message.getType()) { // 處理客戶端連接的消息 case 0: // 建立用戶與通道的關(guān)聯(lián) // 處理客戶端發(fā)送好友消息 break; case 1: // 處理客戶端的簽收消息 break; case 2: // 將消息記錄設(shè)置為已讀 break; case 3: // 接收心跳消息 break; default: break; } } // 當(dāng)有新的客戶端連接服務(wù)器之后,會(huì)自動(dòng)調(diào)用這個(gè)方法 @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { log.info('handlerAdded 被調(diào)用'+ctx.channel().id().asLongText()); // 添加到channelGroup 通道組 UserChannelMap.getChannelGroup().add(ctx.channel());// clients.add(ctx.channel()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.info('{異常:}'+cause.getMessage()); // 刪除通道 UserChannelMap.getChannelGroup().remove(ctx.channel()); UserChannelMap.removeByChannelId(ctx.channel().id().asLongText()); ctx.channel().close(); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { log.info('handlerRemoved 被調(diào)用'+ctx.channel().id().asLongText()); //刪除通道 UserChannelMap.getChannelGroup().remove(ctx.channel()); UserChannelMap.removeByChannelId(ctx.channel().id().asLongText()); UserChannelMap.print(); }}
八.處理心跳
public class HearBeatHandler extends ChannelInboundHandlerAdapter { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if(evt instanceof IdleStateEvent) { IdleStateEvent idleStateEvent = (IdleStateEvent)evt; if(idleStateEvent.state() == IdleState.READER_IDLE) { System.out.println('讀空閑事件觸發(fā)...'); } else if(idleStateEvent.state() == IdleState.WRITER_IDLE) { System.out.println('寫空閑事件觸發(fā)...'); } else if(idleStateEvent.state() == IdleState.ALL_IDLE) { System.out.println('---------------'); System.out.println('讀寫空閑事件觸發(fā)'); System.out.println('關(guān)閉通道資源'); ctx.channel().close(); } } }}
搭建完成后調(diào)用測(cè)試
1.頁(yè)面訪問(wèn)http://localhost:9001/ws 2.端口號(hào)9001和訪問(wèn)路徑ws都是我們?cè)谏线吪渲玫模缓髠魅胛覀冏远x的消息message類型。3.大概流程:消息發(fā)送 :用戶1先連接通道,然后發(fā)送消息給用戶2,用戶2若是在線直接可以發(fā)送給用戶,若沒(méi)在線可以將消息暫存在redis或者通道里,用戶2鏈接通道的話,兩者可以直接通訊。消息推送 :用戶1連接通道,根據(jù)通道id查詢要推送的人是否在線,或者推送給所有人,這里我只推送給指定的人。
到此這篇關(guān)于SpringBoot+Netty+WebSocket實(shí)現(xiàn)消息發(fā)送的示例代碼的文章就介紹到這了,更多相關(guān)SpringBoot Netty WebSocket消息發(fā)送內(nèi)容請(qǐng)搜索好吧啦網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持好吧啦網(wǎng)!
相關(guān)文章:
1. chat.asp聊天程序的編寫方法2. JSP之表單提交get和post的區(qū)別詳解及實(shí)例3. jsp cookie+session實(shí)現(xiàn)簡(jiǎn)易自動(dòng)登錄4. PHP循環(huán)與分支知識(shí)點(diǎn)梳理5. 利用FastReport傳遞圖片參數(shù)在報(bào)表上展示簽名信息的實(shí)現(xiàn)方法6. JSP+Servlet實(shí)現(xiàn)文件上傳到服務(wù)器功能7. jsp實(shí)現(xiàn)textarea中的文字保存換行空格存到數(shù)據(jù)庫(kù)的方法8. ASP中格式化時(shí)間短日期補(bǔ)0變兩位長(zhǎng)日期的方法9. JavaWeb Servlet中url-pattern的使用10. jsp EL表達(dá)式詳解
