SSE Example
创作时间:
作者:
@小白创作中心
SSE Example
引用
CSDN
1.
https://blog.csdn.net/qq_39209927/article/details/137562544
本文将介绍如何使用SSE(Server-Sent Events)结合EventBus实现消息定向推送。通过在Spring Boot项目中添加依赖、配置EventBus、定义事件类、创建SSE管理类、事件处理类以及SSE接口实现等步骤,可以实现服务器向客户端推送实时数据的功能。
一、EventBus 简介
EventBus 是一个广泛应用于开发中的轻量级事件发布/订阅框架,它的核心设计理念是简化应用程序内部各组件间的通信。通过采用发布/订阅(Publish/Subscribe)设计模式,EventBus能够有效地降低组件之间的耦合度,提高代码可读性和维护性。
1.1 EventBus 工作流程
EventBus 的工作流程如下:
- 定义事件(Event):首先,需要自定义事件类,这个类可以封装任何类型的数据,比如行为数据、数据更新或者其他需要传递的信息。
- 注册订阅者(Subscriber Registration):使用注解
@Subscribe
标记订阅者类中的某个方法,表示该方法想要接收某种类型的事件。在初始化阶段,需要将订阅者注册到 EventBus 中,这样 EventBus 就能知道哪些对象对何种事件感兴趣。 - 发布事件(Post):当有事件需要传递时,发布者通过调用
post
方法,将事件发布到 EventBus 上。 - 事件分发(Dispatching):EventBus 收到事件后,会根据事件类型找到所有注册了该事件类型处理方法的订阅者。根据事件订阅时指定的线程模式,EventBus 会选择合适的线程执行订阅者的方法。线程模型如下:
POSTING
:事件处理在发布事件所在线程中执行。MAIN
:如果不在主线程,则切换到主线程执行事件处理。MAIN_ORDERED
:类似MAIN
,但在主线程中按照事件发布的顺序逐个执行。BACKGROUND
:如果不在后台线程,则新建一个后台线程执行。ASYNC
:无论在哪种线程环境下,都会在独立的线程池中异步执行。
- 执行订阅者方法:EventBus 通过反射调用订阅者中对应注解的方法,并将发布的事件对象作为参数传递给该方法。
二、SSE 基于 EventBus 实现定向推送
由于 SSE 只能实现广播模式的消息推送,如果要实现推送数据到指定的客户端,就需要做一些改动。假设有一个场景,某一个客户端关注某一个事件,当该事件发生变动时,只把这个新时间推送给指定的客户端,这如何实现呢?
在客户端的入参中需要加入 clientId
来进行区分,新数据到来后也要有相应的标识能获取到绑定的客户端,这样就能将变动的数据推送给指定的客户端了,具体实现如下。
2.1 添加依赖
<!-- Spring boot相关依赖请自行添加 -->
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<version>3.0.2</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.0.1-jre</version>
</dependency>
2.2 添加 EventBus 配置类
package cn.scf.sse.config;
import com.google.common.eventbus.EventBus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class EventBusConfig {
@Bean
public EventBus eventBus() {
return new EventBus();
}
}
2.3 定义事件类
package cn.scf.sse.event;
public class ClientEvent {
// 客户端ID
private final String clientId;
private final String data;
public ClientEvent(String clientId, String data) {
this.clientId = clientId;
this.data = data;
}
public String getClientId() {
return clientId;
}
public String getData() {
return data;
}
}
2.4 定义 SSE 的管理类
package cn.scf.sse.event;
import com.google.common.eventbus.EventBus;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Component
public class SSEManager {
// 存储与客户端关联的SseEmitter实例
private final Map<String, SseEmitter> clientEmitters = new ConcurrentHashMap<>();
@Autowired
private EventBus eventBus;
public void register(Object listener) {
// 注册监听器,当EventBus上有事件时,根据事件中的客户端标识发送数据
eventBus.register(listener);
}
public void postEvent(String clientId, String data) {
ClientEvent event = new ClientEvent(clientId, data);
eventBus.post(event);
}
// 关闭连接时调用
public void closeEmitter(String clientId, String exception) {
SseEmitter sseEmitter = clientEmitters.remove(clientId);
if (sseEmitter != null) {
sseEmitter.complete();
}
log.info("关闭连接清理资源成功, ex: {}", exception);
}
public Map<String, SseEmitter> getClientEmitters() {
return clientEmitters;
}
}
2.5 事件处理类
package cn.scf.sse.event;
import com.alibaba.fastjson.JSONObject;
import com.google.common.eventbus.Subscribe;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import javax.annotation.PostConstruct;
import java.util.Map;
@Service
@Slf4j
public class EventHandler {
@Autowired
private SSEManager sseManager;
@PostConstruct
public void init() {
// 初始化时将自己注册到 EventBus 中
sseManager.register(this);
}
@Subscribe
public void handleEvent(ClientEvent event) {
Map<String, SseEmitter> clientEmitters = sseManager.getClientEmitters();
SseEmitter emitter = clientEmitters.get(event.getClientId());
if (emitter != null) {
new Thread(() -> {
try {
String message = JSONObject.toJSONString(event);
// 发送给客户端
emitter.send(SseEmitter.event().data(message));
} catch (Exception e) {
emitter.completeWithError(e);
}
}).start();
}
}
}
2.6 SSE 接口实现
@RestController
public class SseController {
@Autowired
private SSEManager sseManager;
@CrossOrigin
@GetMapping(value = "/sse/{clientId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter handleSseRequest(@PathVariable String clientId) {
// 设置超时时间, 如果在指定的时间内没有向客户端发送任何数据,则连接将自动关闭。
SseEmitter emitter = new SseEmitter(TimeUnit.MINUTES.toMillis(10));
Map<String, SseEmitter> clientEmitters = sseManager.getClientEmitters();
clientEmitters.put(clientId, emitter);
// 当SSE连接关闭时,从管理器中移除
emitter.onCompletion(() -> sseManager.closeEmitter(clientId, null));
// 监听链接错误
emitter.onError((ex) -> sseManager.closeEmitter(clientId, ex.getMessage()));
return emitter;
}
// 此接口用来模拟有新数据到来时发布事件,将新数据推送给指定客户端,实际中这里可能是三方数据推送,也可能是数据库数据变化
@GetMapping(value = "/pub/event")
public String pubEvent(@RequestParam String clientId) {
sseManager.postEvent(clientId, "发送事件客户端ID:" + clientId + "-" +UUID.randomUUID().toString());
return "ok";
}
}
2.7 H5 调用 SSE 接口
<!DOCTYPE html>
<html>
<head>
<title>SSE Example</title>
<script type="text/javascript">
document.addEventListener('DOMContentLoaded', function() {
let clientId = 'some-client-id'; // 假设已经获取到客户端标识
let source = new EventSource(`http://localhost:8080/sse/${clientId}`);
source.onmessage = function(event) {
let data = event.data;
console.log(data);
// 更新页面上的某些内容,例如:
document.getElementById('sse').innerHTML += data + '<br>';
};
source.onerror = function(event) {
console.error("EventSource failed.");
};
// source.close();
});
</script>
</head>
<body>
<h1>Server Sent Events</h1>
<div id="sse"></div>
<!-- 页面内容根据接收到的数据更新 -->
</body>
</html>
当服务启动时,EventHandler
就将自己注册到了 EventBus
中,并通过 @Subscribe
标记想要监听的事件,在方法内部取出客户端与 SseEmitter
的关系,判断是否有对应的 SseEmitter
,如果存在,就向客户端推送新的数据。
2.8 使用 SSE 注意事项
使用Server-Sent Events(SSE)时,以下是一些值得注意的关键事项:
- 浏览器兼容性:SSE 是 HTML5 的一项功能,所以并非所有浏览器都支持。在使用之前,应检查目标浏览器是否支持
EventSource
API。 - 连接管理:SSE 通过单个 HTTP 连接进行数据推送,这意味着浏览器会维持一个长连接至服务器。确保服务器端正确处理连接的生命周期,包括维持连接、处理空闲连接、以及在连接断开时自动重新连接。
- 并发限制:浏览器可能对同一域名下的并发 SSE 连接有所限制,通常每个浏览器标签页共享一个最大连接数,超过这个数量的 SSE 连接可能无法建立。
- 资源管理:由于连接长期存在,需要考虑服务器资源消耗和客户端内存占用。在服务器端确保及时释放不再使用的资源,客户端也需要适当管理
EventSource
对象,比如在页面卸载时取消注册事件源。
热门推荐
5~6个月宝宝辅食全攻略!宝宝辅食菜单【超详细图解】
认知行为疗法(CBT)
相机照片储存格式有哪些
加拿大衣物去污洗涤攻略 - 洗衣标志解读,去血渍油渍技巧,衣物保养!
维生素C什么时候吃最好?告诉你维生素C功效、摄入量以及谁不适合吃
胃痞病是什么病?主要症状有哪些?
美国宪法日历史:从“我是美国人日”到宪法教育纪念日
马晓河:为什么货币供给持续增加,物价没有全面上涨?
什么是USB线?USB线类型、USB Type A和USB Type C
右侧眼睛以及太阳穴疼痛的原因是什么
如何优化游戏测试流程和测试方法以提升玩家体验?
咳嗽可以吃香蕉吗,会不会更咳嗽
咳嗽时可以吃香蕉吗?医生的专业解答来了
眼睛肿怎么办?调整枕头高度等实用解决方案
Win11系统中如何管理用户账户,如何设置Windows用户权限?
如何在一台电脑设置两个账户
72年前,毛泽东写下了12个字……
经常把“苏打水”当水喝,对身体到底有什么影响
踏入南京美龄宫:一段民国风情的邂逅
渭水之盟:唐朝与突厥的和解及其后续
鸡蛋瘦肉粥
产品经理如何精准定义用户需求?项目管理中的7个技巧
西甲联盟首次成功起诉种族歧视:三名球迷被判入狱,两年内禁止观赛
福建晋江:传统产业焕新升级 新兴产业成型成势
哪里可以获取智慧物流管理系统的培训资料?
爱情的成长:从恋爱到婚姻的情感转变
今天歌词表达了什么情感
新西兰签证办理指南:留学签证与工作签证详解
运动饮料不是谁都适合喝,这3类人要谨慎饮用
吃高蛋白食物拉肚子小便泡沫多