Spring Boot整合SSE全场景实战指南

一、SSE核心机制解析

1.1 通信流程图示

sequenceDiagram
    Client->>Server: 1. 建立SSE连接(Content-Type:text/event-stream)
    Server->>Client: 2. 保持TCP连接
    loop 数据推送
        Server->>Client: 3. 发送事件数据格式
    end
    Client->>Server: 4. 主动断开/超时断开
图片[1]_Spring Boot整合SSE全场景实战指南_知途无界

1.2 协议规范详解

// 标准事件格式
event: stockUpdate
id: 12345
retry: 5000
data: {"symbol":"AAPL","price":182.73}

// 多行数据示例
data: 第一行内容
data: 第二行内容

// 注释心跳包
: keep-alive

二、基础整合实现

2.1 控制器实现

@RestController
@RequestMapping("/sse")
public class SseController {

    private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();

    @GetMapping("/subscribe/{clientId}")
    public SseEmitter subscribe(@PathVariable String clientId) {
        SseEmitter emitter = new SseEmitter(30_000L); // 30秒超时
        emitters.put(clientId, emitter);
        
        emitter.onCompletion(() -> emitters.remove(clientId));
        emitter.onTimeout(() -> emitters.remove(clientId));
        emitter.onError(e -> {
            log.error("SSE error", e);
            emitters.remove(clientId);
        });
        
        return emitter;
    }

    @PostMapping("/push/{clientId}")
    public void pushEvent(@PathVariable String clientId, 
                         @RequestBody String message) {
        SseEmitter emitter = emitters.get(clientId);
        if (emitter != null) {
            try {
                emitter.send(SseEmitter.event()
                    .id(UUID.randomUUID().toString())
                    .name("custom-event")
                    .data(message));
            } catch (IOException e) {
                emitter.completeWithError(e);
            }
        }
    }
}

2.2 前端连接示例

const eventSource = new EventSource('/sse/subscribe/user123');

eventSource.onmessage = (e) => {
    console.log('原始数据:', e.data);
};

eventSource.addEventListener('custom-event', (e) => {
    console.log('自定义事件:', JSON.parse(e.data));
});

eventSource.onerror = (e) => {
    console.error('连接异常', e);
};

三、高级功能扩展

3.1 心跳保活机制

@Scheduled(fixedRate = 15_000)
public void sendHeartbeat() {
    emitters.forEach((id, emitter) -> {
        try {
            emitter.send(SseEmitter.event()
                .comment("heartbeat")
                .reconnectTime(5000));
        } catch (IOException e) {
            emitter.complete();
        }
    });
}

3.2 断线重连策略

// 前端增强实现
function connectSSE() {
    const es = new EventSource('/sse/subscribe/user123');
    
    es.addEventListener('error', () => {
        es.close();
        setTimeout(connectSSE, 5000); // 5秒后重连
    });
}

四、生产级解决方案

4.1 连接管理器

@Component
public class SseConnectionManager {
    private final Map<String, SseEmitter> connections = new ConcurrentHashMap<>();
    
    public SseEmitter createConnection(String clientId) {
        SseEmitter emitter = new SseEmitter(60_000L);
        connections.put(clientId, emitter);
        
        emitter.onCompletion(() -> remove(clientId));
        emitter.onTimeout(() -> remove(clientId));
        
        return emitter;
    }
    
    public void sendToClient(String clientId, Object data) {
        // 实现发送逻辑...
    }
    
    public void broadcast(Object data) {
        connections.forEach((id, emitter) -> {
            try {
                emitter.send(data);
            } catch (IOException e) {
                remove(id);
            }
        });
    }
    
    private void remove(String clientId) {
        connections.remove(clientId);
    }
}

4.2 异常处理增强

@ControllerAdvice
public class SseExceptionHandler {

    @ExceptionHandler(SseException.class)
    public ResponseEntity<String> handleSseError(SseException ex) {
        return ResponseEntity.status(503)
            .header("Retry-After", "10")
            .body(ex.getMessage());
    }
    
    @ExceptionHandler(AsyncRequestTimeoutException.class)
    public void handleTimeout(AsyncRequestTimeoutException ex) {
        // 忽略SSE超时异常
    }
}

五、性能优化方案

5.1 连接池配置

# application.properties
spring.mvc.async.request-timeout=60000
spring.task.execution.pool.core-size=20
spring.task.execution.pool.max-size=100
spring.task.execution.pool.queue-capacity=500

5.2 背压控制

@GetMapping("/stream")
public Flux<ServerSentEvent<String>> streamData() {
    return Flux.interval(Duration.ofSeconds(1))
        .map(seq -> ServerSentEvent.<String>builder()
            .id(String.valueOf(seq))
            .event("periodic-event")
            .data("Data " + seq)
            .build())
        .onBackpressureBuffer(50); // 缓冲50个元素
}

六、安全防护措施

6.1 安全配置

@Configuration
public class SseSecurityConfig {

    @Bean
    public CorsFilter corsFilter() {
        UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
        CorsConfiguration config = new CorsConfiguration();
        config.addAllowedOrigin("https://trusted.com");
        config.addAllowedHeader("*");
        config.addAllowedMethod("GET");
        source.registerCorsConfiguration("/sse/**", config);
        return new CorsFilter(source);
    }
}

6.2 认证集成

@GetMapping("/secure-stream")
public SseEmitter secureStream(@AuthenticationPrincipal User user) {
    if (!user.hasRole("SSE_ACCESS")) {
        throw new AccessDeniedException("SSE access denied");
    }
    return sseManager.createConnection(user.getId());
}

七、集群环境方案

7.1 Redis广播实现

@Configuration
public class SseClusterConfig {

    @Bean
    public RedisMessageListenerContainer redisContainer(
            RedisConnectionFactory factory,
            MessageListenerAdapter listenerAdapter) {
        
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(factory);
        container.addMessageListener(listenerAdapter, new PatternTopic("sse.*"));
        return container;
    }

    @Bean
    public MessageListenerAdapter listenerAdapter(SseMessageReceiver receiver) {
        return new MessageListenerAdapter(receiver, "handleMessage");
    }
}

7.2 消息接收器

@Component
public class SseMessageReceiver {
    
    @Autowired
    private SseConnectionManager manager;
    
    public void handleMessage(String message, String channel) {
        String clientId = channel.substring(4); // sse.user123
        manager.sendToClient(clientId, message);
    }
}

八、监控与诊断

8.1 健康检查端点

@RestControllerEndpoint(id = "sse")
public class SseEndpoint {

    @Autowired
    private SseConnectionManager manager;
    
    @GetMapping("/stats")
    public Map<String, Object> getStats() {
        return Map.of(
            "activeConnections", manager.getConnectionCount(),
            "lastError", manager.getLastErrorTime(),
            "throughput", manager.getMessagesSent()
        );
    }
}

8.2 Prometheus监控

@Bean
public MeterRegistryCustomizer<MeterRegistry> sseMetrics() {
    return registry -> {
        Gauge.builder("sse.connections", 
                () -> manager.getConnectionCount())
            .register(registry);
        
        Counter.builder("sse.messages.sent")
            .tag("type", "broadcast")
            .register(registry);
    };
}

九、测试验证方案

9.1 单元测试

@Test
void shouldCreateEmitter() {
    SseEmitter emitter = controller.subscribe("test1");
    assertNotNull(emitter);
    assertEquals(1, manager.getConnectionCount());
}

@Test
void shouldPushEvent() throws Exception {
    SseEmitter emitter = mock(SseEmitter.class);
    when(emitter.send(any())).thenReturn(null);
    
    manager.addConnection("test2", emitter);
    controller.pushEvent("test2", "test message");
    
    verify(emitter).send(any(SseEmitter.SseEventBuilder.class));
}

9.2 集成测试

@SpringBootTest
class SseIntegrationTest {

    @Autowired
    private TestRestTemplate restTemplate;
    
    @Test
    void shouldReceiveEvents() {
        ResponseEntity<String> response = restTemplate.getForEntity(
            "/sse/subscribe/test3", String.class);
        
        assertEquals(200, response.getStatusCodeValue());
        assertEquals("text/event-stream", 
            response.getHeaders().getContentType().toString());
    }
}

十、完整案例演示

10.1 股票行情推送

@GetMapping("/stocks/{symbol}")
public SseEmitter streamStock(@PathVariable String symbol) {
    SseEmitter emitter = new SseEmitter();
    
    stockService.subscribe(symbol, price -> {
        try {
            emitter.send(StockEvent.of(symbol, price));
        } catch (IOException e) {
            stockService.unsubscribe(symbol);
        }
    });
    
    return emitter;
}

// 事件封装类
public record StockEvent(String symbol, BigDecimal price) {
    public static SseEmitter event() {
        return SseEmitter.event()
            .name("stockUpdate")
            .data(this);
    }
}

10.2 聊天室应用

@PostMapping("/chat")
public void sendMessage(@RequestBody ChatMessage message) {
    sseManager.broadcast(
        SseEmitter.event()
            .name("newMessage")
            .data(message)
            .build());
}

// 前端处理
eventSource.addEventListener("newMessage", e => {
    const msg = JSON.parse(e.data);
    addMessageToUI(msg.user, msg.text);
});

最佳实践总结​:

  1. 连接管理​:必须实现超时、错误和完成回调
  2. 心跳机制​:保持连接活跃,15-30秒间隔为宜
  3. 背压控制​:防止生产者速度过快压垮消费者
  4. 安全防护​:CORS限制+认证授权缺一不可
  5. 监控指标​:连接数、消息量、错误率必须监控

性能数据参考​:

  • 单机可维持约1万并发SSE连接(4核8G配置)
  • 每条消息传输延迟<50ms(局域网环境)
  • 内存占用约2KB/连接(无消息积压时)

扩展方向​:

  1. 结合WebFlux实现响应式SSE
  2. 使用RSocket替代SSE实现双向通信
  3. 集成消息队列实现可靠事件投递
  4. 开发管理界面实时查看连接状态

通过本方案,您可以构建:

  • 实时股票/加密货币行情系统
  • 跨平台即时通讯应用
  • 物联网设备状态监控
  • 长任务执行进度反馈
  • 实时协同编辑系统

关键优势:

  • 比WebSocket更简单的单向通信方案
  • 原生支持断线重连
  • 兼容HTTP协议栈无需特殊配置
  • 浏览器原生支持无需额外库
© 版权声明
THE END
喜欢就点个赞,支持一下吧!
点赞21 分享
评论 抢沙发
头像
欢迎您留下评论!
提交
头像

昵称

取消
昵称表情代码图片

    暂无评论内容