当前位置 博文首页 > 文章内容

    SpringBoot2.0集成WebSocket实现后台向前端推送信息

    作者:shunshunshun18 栏目:未分类 时间:2021-01-24 14:42:56

    本站于2023年9月4日。收到“大连君*****咨询有限公司”通知
    说我们IIS7站长博客,有一篇博文用了他们的图片。
    要求我们给他们一张图片6000元。要不然法院告我们

    为避免不必要的麻烦,IIS7站长博客,全站内容图片下架、并积极应诉
    博文内容全部不再显示,请需要相关资讯的站长朋友到必应搜索。谢谢!

    另祝:版权碰瓷诈骗团伙,早日弃暗投明。

    相关新闻:借版权之名、行诈骗之实,周某因犯诈骗罪被判处有期徒刑十一年六个月

    叹!百花齐放的时代,渐行渐远!



    什么是WebSocket?

    这里写图片描述

    WebSocket协议是基于TCP的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信——允许服务器主动发送信息给客户端。

    为什么需要 WebSocket?

    初次接触 WebSocket 的人,都会问同样的问题:我们已经有了 HTTP 协议,为什么还需要另一个协议?它能带来什么好处?

    答案很简单,因为 HTTP 协议有一个缺陷:通信只能由客户端发起,HTTP 协议做不到服务器主动向客户端推送信息。

    这里写图片描述

    举例来说,我们想要查询当前的排队情况,只能是页面轮询向服务器发出请求,服务器返回查询结果。轮询的效率低,非常浪费资源(因为必须不停连接,或者 HTTP 连接始终打开)。因此WebSocket 就是这样发明的。 前言

    2020-10-20 教程补充:

    • 补充关于@Component@ServerEndpoint关于是否单例模式等的解答,感谢大家热心提问和研究。
    • Vue版本的websocket连接方法

    2020-01-05 教程补充:

    • 整合了IM相关的优化
    • 优化开启/关闭连接的处理
    • 上传到开源项目spring-cloud-study-websocket,方便大家下载代码。

    感谢大家的支持和留言,14W访问量是满满的动力!接下来还会有websocket+redis集群优化篇针对多ws服务器做简单优化处理,敬请期待!

    话不多说,马上进入干货时刻。

    maven依赖

    SpringBoot2.0对WebSocket的支持简直太棒了,直接就有包可以引入

    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.web.socket.server.standard.ServerEndpointExporter;
    
    /**
     * 开启WebSocket支持
     * @author zhengkai.blog.csdn.net
     */
    @Configuration 
    public class WebSocketConfig { 
    	
     @Bean 
     public ServerEndpointExporter serverEndpointExporter() { 
     return new ServerEndpointExporter(); 
     } 
     
    } 

    WebSocketConfig

    启用WebSocket的支持也是很简单,几句代码搞定

    import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.web.socket.server.standard.ServerEndpointExporter;/** * 开启WebSocket支持 * @author zhengkai.blog.csdn.net */@Configuration public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } } 

    WebSocketServer

    这就是重点了,核心都在这里。

    • 因为WebSocket是类似客户端服务端的形式(采用ws协议),那么这里的WebSocketServer其实就相当于一个ws协议的Controller
    • 直接@ServerEndpoint("/imserver/{userId}")@Component启用即可,然后在里面实现@OnOpen开启连接,@onClose关闭连接,@onMessage接收消息等方法。
    • 新建一个ConcurrentHashMap webSocketMap 用于接收当前userId的WebSocket,方便IM之间对userId进行推送消息。单机版实现到这里就可以。
    • 集群版(多个ws节点)还需要借助mysql或者redis等进行处理,改造对应的sendMessage方法即可。
    package com.softdev.system.demo.config;
    
    import java.io.IOException;
    import java.util.concurrent.ConcurrentHashMap;
    import javax.websocket.OnClose;
    import javax.websocket.OnError;
    import javax.websocket.OnMessage;
    import javax.websocket.OnOpen;
    import javax.websocket.Session;
    import javax.websocket.server.PathParam;
    import javax.websocket.server.ServerEndpoint;
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import org.apache.commons.lang.StringUtils;
    import org.springframework.stereotype.Component;
    import cn.hutool.log.Log;
    import cn.hutool.log.LogFactory;
    
    
    /**
     * @author zhengkai.blog.csdn.net
     */
    @ServerEndpoint("/imserver/{userId}")
    @Component
    public class WebSocketServer {
    
     static Log log=LogFactory.get(WebSocketServer.class);
     /**静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。*/
     private static int onlineCount = 0;
     /**concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。*/
     private static ConcurrentHashMap<String,WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
     /**与某个客户端的连接会话,需要通过它来给客户端发送数据*/
     private Session session;
     /**接收userId*/
     private String userId="";
    
     /**
     * 连接建立成功调用的方法*/
     @OnOpen
     public void onOpen(Session session,@PathParam("userId") String userId) {
     this.session = session;
     this.userId=userId;
     if(webSocketMap.containsKey(userId)){
     webSocketMap.remove(userId);
     webSocketMap.put(userId,this);
     //加入set中
     }else{
     webSocketMap.put(userId,this);
     //加入set中
     addOnlineCount();
     //在线数加1
     }
    
     log.info("用户连接:"+userId+",当前在线人数为:" + getOnlineCount());
    
     try {
     sendMessage("连接成功");
     } catch (IOException e) {
     log.error("用户:"+userId+",网络异常!!!!!!");
     }
     }
    
     /**
     * 连接关闭调用的方法
     */
     @OnClose
     public void onClose() {
     if(webSocketMap.containsKey(userId)){
     webSocketMap.remove(userId);
     //从set中删除
     subOnlineCount();
     }
     log.info("用户退出:"+userId+",当前在线人数为:" + getOnlineCount());
     }
    
     /**
     * 收到客户端消息后调用的方法
     *
     * @param message 客户端发送过来的消息*/
     @OnMessage
     public void onMessage(String message, Session session) {
     log.info("用户消息:"+userId+",报文:"+message);
     //可以群发消息
     //消息保存到数据库、redis
     if(StringUtils.isNotBlank(message)){
     try {
     //解析发送的报文
     JSONObject jsonObject = JSON.parseObject(message);
     //追加发送人(防止串改)
     jsonObject.put("fromUserId",this.userId);
     String toUserId=jsonObject.getString("toUserId");
     //传送给对应toUserId用户的websocket
     if(StringUtils.isNotBlank(toUserId)&&webSocketMap.containsKey(toUserId)){
     webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString());
     }else{
     log.error("请求的userId:"+toUserId+"不在该服务器上");
     //否则不在这个服务器上,发送到mysql或者redis
     }
     }catch (Exception e){
     e.printStackTrace();
     }
     }
     }
    
     /**
     *
     * @param session
     * @param error
     */
     @OnError
     public void onError(Session session, Throwable error) {
     log.error("用户错误:"+this.userId+",原因:"+error.getMessage());
     error.printStackTrace();
     }
     /**
     * 实现服务器主动推送
     */
     public void sendMessage(String message) throws IOException {
     this.session.getBasicRemote().sendText(message);
     }
    
    
     /**
     * 发送自定义消息
     * */
     public static void sendInfo(String message,@PathParam("userId") String userId) throws IOException {
     log.info("发送消息到:"+userId+",报文:"+message);
     if(StringUtils.isNotBlank(userId)&&webSocketMap.containsKey(userId)){
     webSocketMap.get(userId).sendMessage(message);
     }else{
     log.error("用户"+userId+",不在线!");
     }
     }
    
     public static synchronized int getOnlineCount() {
     return onlineCount;
     }
    
     public static synchronized void addOnlineCount() {
     WebSocketServer.onlineCount++;
     }
    
     public static synchronized void subOnlineCount() {
     WebSocketServer.onlineCount--;
     }
    }

    消息推送

    至于推送新信息,可以再自己的Controller写个方法调用WebSocketServer.sendInfo();即可

    import com.softdev.system.demo.config.WebSocketServer;
    import org.springframework.http.ResponseEntity;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    import org.springframework.web.servlet.ModelAndView;
    import java.io.IOException;
    
    /**
     * WebSocketController
     * @author zhengkai.blog.csdn.net
     */
    @RestController
    public class DemoController {
    
     @GetMapping("index")
     public ResponseEntity<String> index(){
     return ResponseEntity.ok("请求成功");
     }
    
     @GetMapping("page")
     public ModelAndView page(){
     return new ModelAndView("websocket");
     }
    
     @RequestMapping("/push/{toUserId}")
     public ResponseEntity<String> pushToWeb(String message, @PathVariable String toUserId) throws IOException {
     WebSocketServer.sendInfo(message,toUserId);
     return ResponseEntity.ok("MSG SEND SUCCESS");
     }
    }

    页面发起

    页面用js代码调用websocket,当然,太古老的浏览器是不行的,一般新的浏览器或者谷歌浏览器是没问题的。还有一点,记得协议是ws的,如果使用了一些路径类,可以replace(“http”,“ws”)来替换协议。

    <!DOCTYPE html>
    <html>
    <head>
     <meta charset="utf-8">
     <title>websocket通讯</title>
    </head>
    <script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.js"></script>
    <script>
     var socket;
     function openSocket() {
     if(typeof(WebSocket) == "undefined") {
     console.log("您的浏览器不支持WebSocket");
     }else{
     console.log("您的浏览器支持WebSocket");
     //实现化WebSocket对象,指定要连接的服务器地址与端口 建立连接
     //等同于socket = new WebSocket("ws://localhost:8888/xxxx/im/25");
     //var socketUrl="${request.contextPath}/im/"+$("#userId").val();
     var socketUrl="http://blog.iis7.com/' defined in class path resource [com/xxx/WebSocketConfig.class]: Invocation of init method failed; nested exception is java.lang.IllegalStateException: javax.websocket.server.ServerContainer not available

    感谢@来了老弟儿 的反馈:

    如果tomcat部署一直报这个错,请移除 WebSocketConfig@Bean ServerEndpointExporter 的注入 。

    ServerEndpointExporter 是由Spring官方提供的标准实现,用于扫描ServerEndpointConfig配置类和@ServerEndpoint注解实例。使用规则也很简单:

    如果使用默认的嵌入式容器 比如Tomcat 则必须手工在上下文提供ServerEndpointExporter。如果使用外部容器部署war包,则不需要提供提供ServerEndpointExporter,因为此时SpringBoot默认将扫描服务端的行为交给外部容器处理,所以线上部署的时候要把WebSocketConfig中这段注入bean的代码注掉。 正式项目的前端WebSocket框架 GoEasy

    感谢kkatrina的补充,正式的项目中,一般是用第三方websocket框架来做,稳定性、实时性有保证的多,也会包括一些心跳、重连机制。

    GoEasy专注于服务器与浏览器,浏览器与浏览器之间消息推送,完美兼容世界上的绝大多数浏览器,包括IE6, IE7之类的非常古老的浏览器。支持Uniapp,各种小程序,react,vue等所有主流Web前端技术。
    GoEasy采用 发布/订阅 的消息模式,帮助您非常轻松的实现一对一,一对多的通信。
    https://www.goeasy.io/cn/doc/

    @Component@ServerEndpoint关于是否单例模式,能否使用static Map等一些问题的解答

    看到大家都在热心的讨论关于是否单例模式这个问题,请大家相信自己的直接,如果websocket是单例模式,还怎么服务这么多session呢。

    • websocket是原型模式@ServerEndpoint每次建立双向通信的时候都会创建一个实例,区别于spring的单例模式。Spring的@Component默认是单例模式,请注意,默认 而已,是可以被改变的。
    • 这里的@Component仅仅为了支持@Autowired依赖注入使用,如果不加则不能注入任何东西,为了方便。
    • 什么是prototype 原型模式? 基本就是你需要从A的实例得到一份与A内容相同,但是又互不干扰的实例B的话,就需要使用原型模式。关于在原型模式下使用static 的webSocketMap,请注意这是ConcurrentHashMap ,也就是线程安全/线程同步的,而且已经是静态变量作为全局调用,这种情况下是ok的,或者大家如果有顾虑或者更好的想法的化,可以进行改进。
    • 例如使用一个中间类来接收和存放session。为什么每次都@OnOpen都要检查webSocketMap.containsKey(userId) ,首先了为了代码强壮性考虑,假设代码以及机制没有问题,那么肯定这个逻辑是废的对吧。
    • 但是实际使用的时候发现偶尔会出现重连失败或者其他原因导致之前的session还存在,这里就做了一个清除旧session,迎接新session的功能。

    Vue版本的websocket连接

    感谢**@GzrStudy**的贡献,供大家参考。

    <script>
    export default {
     data() {
     return {
     socket:null,
     userId:localStorage.getItem("ms_uuid"),
     toUserId:'2',
     content:'3'
     }
     },
     methods: {
     openSocket() {
     if (typeof WebSocket == "undefined") {
     console.log("您的浏览器不支持WebSocket");
     } else {
     console.log("您的浏览器支持WebSocket");
     //实现化WebSocket对象,指定要连接的服务器地址与端口 建立连接
     //等同于socket = new WebSocket("ws://localhost:8888/xxxx/im/25");
     //var socketUrl="${request.contextPath}/im/"+$("#userId").val();
     var socketUrl =
     "http://localhost:8081/imserver/" + this.userId;
     socketUrl = socketUrl.replace("https", "ws").replace("http", "ws");
     console.log(socketUrl);
     if (this.socket != null) {
     this.socket.close();
     this.socket = null;
     }
     this.socket = new WebSocket(socketUrl);
     //打开事件
     this.socket = new WebSocket(socketUrl);
     //打开事件
     this.socket.onopen = function() {
     console.log("websocket已打开");
     //socket.send("这是来自客户端的消息" + location.href + new Date());
     };
     //获得消息事件
     this.socket.onmessage = function(msg) {
     console.log(msg.data);
     //发现消息进入 开始处理前端触发逻辑
     };
     //关闭事件
     this.socket.onclose = function() {
     console.log("websocket已关闭");
     };
     //发生了错误事件
     this.socket.onerror = function() {
     console.log("websocket发生了错误");
     };
     }
     },
     sendMessage() {
     if (typeof WebSocket == "undefined") {
     console.log("您的浏览器不支持WebSocket");
     } else {
     console.log("您的浏览器支持WebSocket");
     console.log(
     '{"toUserId":"' +
     this.toUserId +
     '","contentText":"' +
     this.content +
     '"}'
     );
     this.socket.send(
     '{"toUserId":"' +
     this.toUserId +
     '","contentText":"' +
     this.content +
     '"}'
     );
     
     }
    }