一、SSE核心机制解析
1.1 通信流程图示
sequenceDiagram
Client->>Server: 1. 建立SSE连接(Content-Type:text/event-stream)
Server->>Client: 2. 保持TCP连接
loop 数据推送
Server->>Client: 3. 发送事件数据格式
end
Client->>Server: 4. 主动断开/超时断开
![图片[1]_Spring Boot整合SSE全场景实战指南_知途无界](https://zhituwujie.com/wp-content/uploads/2025/07/d2b5ca33bd20250721190405.png)
1.2 协议规范详解
// 标准事件格式
event: stockUpdate
id: 12345
retry: 5000
data: {"symbol":"AAPL","price":182.73}
// 多行数据示例
data: 第一行内容
data: 第二行内容
// 注释心跳包
: keep-alive
二、基础整合实现
2.1 控制器实现
@RestController
@RequestMapping("/sse")
public class SseController {
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
@GetMapping("/subscribe/{clientId}")
public SseEmitter subscribe(@PathVariable String clientId) {
SseEmitter emitter = new SseEmitter(30_000L); // 30秒超时
emitters.put(clientId, emitter);
emitter.onCompletion(() -> emitters.remove(clientId));
emitter.onTimeout(() -> emitters.remove(clientId));
emitter.onError(e -> {
log.error("SSE error", e);
emitters.remove(clientId);
});
return emitter;
}
@PostMapping("/push/{clientId}")
public void pushEvent(@PathVariable String clientId,
@RequestBody String message) {
SseEmitter emitter = emitters.get(clientId);
if (emitter != null) {
try {
emitter.send(SseEmitter.event()
.id(UUID.randomUUID().toString())
.name("custom-event")
.data(message));
} catch (IOException e) {
emitter.completeWithError(e);
}
}
}
}
2.2 前端连接示例
const eventSource = new EventSource('/sse/subscribe/user123');
eventSource.onmessage = (e) => {
console.log('原始数据:', e.data);
};
eventSource.addEventListener('custom-event', (e) => {
console.log('自定义事件:', JSON.parse(e.data));
});
eventSource.onerror = (e) => {
console.error('连接异常', e);
};
三、高级功能扩展
3.1 心跳保活机制
@Scheduled(fixedRate = 15_000)
public void sendHeartbeat() {
emitters.forEach((id, emitter) -> {
try {
emitter.send(SseEmitter.event()
.comment("heartbeat")
.reconnectTime(5000));
} catch (IOException e) {
emitter.complete();
}
});
}
3.2 断线重连策略
// 前端增强实现
function connectSSE() {
const es = new EventSource('/sse/subscribe/user123');
es.addEventListener('error', () => {
es.close();
setTimeout(connectSSE, 5000); // 5秒后重连
});
}
四、生产级解决方案
4.1 连接管理器
@Component
public class SseConnectionManager {
private final Map<String, SseEmitter> connections = new ConcurrentHashMap<>();
public SseEmitter createConnection(String clientId) {
SseEmitter emitter = new SseEmitter(60_000L);
connections.put(clientId, emitter);
emitter.onCompletion(() -> remove(clientId));
emitter.onTimeout(() -> remove(clientId));
return emitter;
}
public void sendToClient(String clientId, Object data) {
// 实现发送逻辑...
}
public void broadcast(Object data) {
connections.forEach((id, emitter) -> {
try {
emitter.send(data);
} catch (IOException e) {
remove(id);
}
});
}
private void remove(String clientId) {
connections.remove(clientId);
}
}
4.2 异常处理增强
@ControllerAdvice
public class SseExceptionHandler {
@ExceptionHandler(SseException.class)
public ResponseEntity<String> handleSseError(SseException ex) {
return ResponseEntity.status(503)
.header("Retry-After", "10")
.body(ex.getMessage());
}
@ExceptionHandler(AsyncRequestTimeoutException.class)
public void handleTimeout(AsyncRequestTimeoutException ex) {
// 忽略SSE超时异常
}
}
五、性能优化方案
5.1 连接池配置
# application.properties
spring.mvc.async.request-timeout=60000
spring.task.execution.pool.core-size=20
spring.task.execution.pool.max-size=100
spring.task.execution.pool.queue-capacity=500
5.2 背压控制
@GetMapping("/stream")
public Flux<ServerSentEvent<String>> streamData() {
return Flux.interval(Duration.ofSeconds(1))
.map(seq -> ServerSentEvent.<String>builder()
.id(String.valueOf(seq))
.event("periodic-event")
.data("Data " + seq)
.build())
.onBackpressureBuffer(50); // 缓冲50个元素
}
六、安全防护措施
6.1 安全配置
@Configuration
public class SseSecurityConfig {
@Bean
public CorsFilter corsFilter() {
UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
CorsConfiguration config = new CorsConfiguration();
config.addAllowedOrigin("https://trusted.com");
config.addAllowedHeader("*");
config.addAllowedMethod("GET");
source.registerCorsConfiguration("/sse/**", config);
return new CorsFilter(source);
}
}
6.2 认证集成
@GetMapping("/secure-stream")
public SseEmitter secureStream(@AuthenticationPrincipal User user) {
if (!user.hasRole("SSE_ACCESS")) {
throw new AccessDeniedException("SSE access denied");
}
return sseManager.createConnection(user.getId());
}
七、集群环境方案
7.1 Redis广播实现
@Configuration
public class SseClusterConfig {
@Bean
public RedisMessageListenerContainer redisContainer(
RedisConnectionFactory factory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
container.addMessageListener(listenerAdapter, new PatternTopic("sse.*"));
return container;
}
@Bean
public MessageListenerAdapter listenerAdapter(SseMessageReceiver receiver) {
return new MessageListenerAdapter(receiver, "handleMessage");
}
}
7.2 消息接收器
@Component
public class SseMessageReceiver {
@Autowired
private SseConnectionManager manager;
public void handleMessage(String message, String channel) {
String clientId = channel.substring(4); // sse.user123
manager.sendToClient(clientId, message);
}
}
八、监控与诊断
8.1 健康检查端点
@RestControllerEndpoint(id = "sse")
public class SseEndpoint {
@Autowired
private SseConnectionManager manager;
@GetMapping("/stats")
public Map<String, Object> getStats() {
return Map.of(
"activeConnections", manager.getConnectionCount(),
"lastError", manager.getLastErrorTime(),
"throughput", manager.getMessagesSent()
);
}
}
8.2 Prometheus监控
@Bean
public MeterRegistryCustomizer<MeterRegistry> sseMetrics() {
return registry -> {
Gauge.builder("sse.connections",
() -> manager.getConnectionCount())
.register(registry);
Counter.builder("sse.messages.sent")
.tag("type", "broadcast")
.register(registry);
};
}
九、测试验证方案
9.1 单元测试
@Test
void shouldCreateEmitter() {
SseEmitter emitter = controller.subscribe("test1");
assertNotNull(emitter);
assertEquals(1, manager.getConnectionCount());
}
@Test
void shouldPushEvent() throws Exception {
SseEmitter emitter = mock(SseEmitter.class);
when(emitter.send(any())).thenReturn(null);
manager.addConnection("test2", emitter);
controller.pushEvent("test2", "test message");
verify(emitter).send(any(SseEmitter.SseEventBuilder.class));
}
9.2 集成测试
@SpringBootTest
class SseIntegrationTest {
@Autowired
private TestRestTemplate restTemplate;
@Test
void shouldReceiveEvents() {
ResponseEntity<String> response = restTemplate.getForEntity(
"/sse/subscribe/test3", String.class);
assertEquals(200, response.getStatusCodeValue());
assertEquals("text/event-stream",
response.getHeaders().getContentType().toString());
}
}
十、完整案例演示
10.1 股票行情推送
@GetMapping("/stocks/{symbol}")
public SseEmitter streamStock(@PathVariable String symbol) {
SseEmitter emitter = new SseEmitter();
stockService.subscribe(symbol, price -> {
try {
emitter.send(StockEvent.of(symbol, price));
} catch (IOException e) {
stockService.unsubscribe(symbol);
}
});
return emitter;
}
// 事件封装类
public record StockEvent(String symbol, BigDecimal price) {
public static SseEmitter event() {
return SseEmitter.event()
.name("stockUpdate")
.data(this);
}
}
10.2 聊天室应用
@PostMapping("/chat")
public void sendMessage(@RequestBody ChatMessage message) {
sseManager.broadcast(
SseEmitter.event()
.name("newMessage")
.data(message)
.build());
}
// 前端处理
eventSource.addEventListener("newMessage", e => {
const msg = JSON.parse(e.data);
addMessageToUI(msg.user, msg.text);
});
最佳实践总结:
- 连接管理:必须实现超时、错误和完成回调
- 心跳机制:保持连接活跃,15-30秒间隔为宜
- 背压控制:防止生产者速度过快压垮消费者
- 安全防护:CORS限制+认证授权缺一不可
- 监控指标:连接数、消息量、错误率必须监控
性能数据参考:
- 单机可维持约1万并发SSE连接(4核8G配置)
- 每条消息传输延迟<50ms(局域网环境)
- 内存占用约2KB/连接(无消息积压时)
扩展方向:
- 结合WebFlux实现响应式SSE
- 使用RSocket替代SSE实现双向通信
- 集成消息队列实现可靠事件投递
- 开发管理界面实时查看连接状态
通过本方案,您可以构建:
- 实时股票/加密货币行情系统
- 跨平台即时通讯应用
- 物联网设备状态监控
- 长任务执行进度反馈
- 实时协同编辑系统
关键优势:
- 比WebSocket更简单的单向通信方案
- 原生支持断线重连
- 兼容HTTP协议栈无需特殊配置
- 浏览器原生支持无需额外库
© 版权声明
文中内容均来源于公开资料,受限于信息的时效性和复杂性,可能存在误差或遗漏。我们已尽力确保内容的准确性,但对于因信息变更或错误导致的任何后果,本站不承担任何责任。如需引用本文内容,请注明出处并尊重原作者的版权。
THE END

























暂无评论内容