问小白 wenxiaobai
资讯
历史
科技
环境与自然
成长
游戏
财经
文学与艺术
美食
健康
家居
文化
情感
汽车
三农
军事
旅行
运动
教育
生活
星座命理

Server-Sent Events向前端消息推送技术

创作时间:
作者:
@小白创作中心

Server-Sent Events向前端消息推送技术

引用
简书
1.
https://www.jianshu.com/p/942e1b5223b3

什么是SSE技术

Server-Sent Events (SSE) 是一种用于服务端向客户端单向实时通信的Web技术。它适用于消息通知的场景,能够实现实时数据推送。

SSE技术优缺点

优点

  • 实时性
  • SSE 一般只用来传送文本,二进制数据需要编码后传送
  • SSE 使用 HTTP 协议,只是content-type标识数据类型为 text/event-stream

缺点

  • 单向通信
  • 老旧浏览器无法兼容
  • 无法跨域

SSE通信本质

  1. 前端向后端发起连接请求,建立连接后会一直保持连接,后端会一直发送数据流到前端。前端关闭连接,就断开连接。
  2. 前端添加事件监听。
  3. 前端就可以接收到后端消息推送。

Spring Boot实现SSE技术

依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    <version>2.6.13</version>
</dependency>

代码

/**
 * 消息模型
 */
public class MessageInfo {

    private String userNo;
    private String message;
    private String sendTime;
}

/**
 * SSE服务实现
 */
public class SseEmitterServer {
    /**
     * 统计SSE在线连接数
     */
    private static AtomicInteger count = new AtomicInteger(0);
    /**
     *  客户标识符和消息推送映射关系
     */
    private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();

    /**
     * 建立连接
     * @param userNo
     * @return
     */
    public static SseEmitter connect(String userNo) {
        // 设置超时日期,0表示不过期
        SseEmitter sseEmitter = new SseEmitter(0L);

        sseEmitter.onCompletion(() -> {
            log.info("结束连接 userNo = " + userNo);
        });

        sseEmitter.onError(throwable -> {
            log.info("连接异常 userNo = " + userNo);
            removeUser(userNo);
        });

        sseEmitter.onTimeout(() -> {
            log.info("连接超时 userNo = " + userNo);
            removeUser(userNo);
        });
        sseEmitterMap.put(userNo, sseEmitter);
        count.getAndIncrement();
        log.info("创建新SSE连接,userNo = " + userNo);
        return sseEmitter;
    }

    /**
     * 移出用户
     * @param userNo
     */
    public static void removeUser(String userNo){
        sseEmitterMap.remove(userNo);
        count.getAndDecrement();
        log.info("移除的 userNo = " + userNo);
        log.info("剩余的 userNo集合 = " + sseEmitterMap.keySet());
    }

    /**
     * 向指定的userNo发送消息message
     * @param userNo
     * @param message
     */
    public static void sendMessageToUserNo(String userNo, String message) {
        if (!sseEmitterMap.containsKey(userNo)) {
            connect(userNo);
        }

        try {
            sseEmitterMap.get(userNo).send(message);
            log.info("向 userNo = " + userNo + "发送消息,内容如下:" + message);
        } catch (IOException e) {
            log.error("向 userNo = " + userNo + "发送消息出错,错误内容如下:" + e.getMessage());
            e.printStackTrace();
        }
    }
}

@Controller
@RequestMapping("/sse")
public class SseController {

    /**
     * 接收连接
     * @param userNo
     * @return
     */
    @GetMapping("/connect/{userNo}")
    public SseEmitter connect( String userNo){
        SseEmitter sseEmitter = SseEmitterServer.connect(userNo);
        return sseEmitter;
    }

    /**
     * 关闭连接
     * @param userNo
     * @return
     */
    @GetMapping("/close/{userNo}")
    public String closeSse( String userNo) {
        SseEmitterServer.removeUser(userNo);
        return "关闭 userNo = " + userNo + "连接";
    }

    /**
     * 发送消息
     * @param userNo
     * @param messageInfo
     * @return
     */
    @PostMapping("/send/{userNo}")
    public String sendMessageToUserNo( String userNo, @RequestBody MessageInfo messageInfo) {
        if (Objects.isNull(messageInfo) || StrUtil.isBlank(messageInfo.getMessage())) {
            log.error("消息内容不能为空!");
            return "消息内容不能为空!";
        }

        messageInfo.setUserNo(userNo);
        DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        String nowTime = LocalDateTime.now().format(dtf);
        messageInfo.setSendTime(nowTime);

        // 向SSE前端发送消息
        String jsonStr = JSONUtil.toJsonStr(messageInfo);
        SseEmitterServer.sendMessageToUserNo(userNo, jsonStr);

        return "向 userNo = " + userNo + "发送消息,内容为:" + jsonStr;
    }
}

前端实现SSE技术

前端封装sse的js代码:

let eventSource = null;
export function createConnection() {
    let url = "http://localhost:3831/sse/connect/123456";
    let headers = {};
    eventSource = new EventSource(url, headers);
}

export function addListener() {
    eventSource.onopen = () => {
        console.log("eventSource.onopen 连接畅通");
    }

    eventSource.onmessage = (event) => {
        console.log("eventSource.onmessage 收到消息 event.data = " + event.data);

        eventSource.close();
        console.log("收到消息后关闭连接");
    }

    eventSource.onerror = (err) => {
        console.log("eventSource.onerror 发生错误 err = " + err);
    }
}

前端页面代码:

<template>
  <div class="hello">
    <el-row style="margin-top:10px;">
      <el-col :span="12">
        <el-button type="primary" @click="openAndRecvForSSE">点击</el-button>
      </el-col>
    </el-row>
  </div>
</template>
  
<script>
import {createConnection, addListener} from "@/utils/sse.js";
  export default {
    name: 'HelloWorld',
    data () {
      return {
      }
    },
    watch: {},
    mounted() {},
    created() { },
    methods: {
      openAndRecvForSSE() {
        createConnection();
        addListener();
      }
    }
  }
  </script>
  <!-- Add "scoped" attribute to limit CSS to this component only -->
  <style scoped></style>

测试结果

  1. 点击按钮,建立连接,前端添加监听事件,后端与前端成功建立连接。
  2. 使用postman发送消息给Controller,其中使用SSE消息通知到前端。

抓包查看SSE

参考

© 2023 北京元石科技有限公司 ◎ 京公网安备 11010802042949号