本文将详细介绍如何使用 SpringBoot 作为后端,Vue3 作为前端,通过 Server-Sent Events (SSE) 技术实现实时消息推送功能。
![图片[1]_SpringBoot + Vue3 整合 SSE 实现实时消息推送功能_知途无界](https://zhituwujie.com/wp-content/uploads/2025/11/d2b5ca33bd20251129103533-1024x626.png)
一、SSE 技术简介
Server-Sent Events (SSE) 是一种基于 HTTP 的服务器向客户端推送数据的技术,特点包括:
- 单向通信:服务器向客户端推送数据
- 基于 HTTP:使用标准的 HTTP 协议
- 自动重连:浏览器内置重连机制
- 轻量级:相比 WebSocket 更简单,适合单向数据流场景
二、SpringBoot 后端实现
1. 添加依赖
<!-- pom.xml -->
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 如果需要消息队列,可以添加 Redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
</dependencies>
2. 创建 SSE 控制器
// SseController.java
package com.example.sse.controller;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@RestController
@RequestMapping("/api/sse")
public class SseController {
// 存储所有连接的客户端
private final Map<String, SseEmitter> sseEmitters = new ConcurrentHashMap<>();
// 定时任务线程池,用于模拟消息推送
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
/**
* 建立 SSE 连接
*/
@GetMapping(value = "/connect/{clientId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter connect(@PathVariable String clientId) {
// 设置超时时间(毫秒),0 表示永不超时
SseEmitter emitter = new SseEmitter(0L);
// 注册到映射表中
sseEmitters.put(clientId, emitter);
System.out.println("客户端连接: " + clientId + ", 当前在线数: " + sseEmitters.size());
// 连接成功回调
emitter.onCompletion(() -> {
System.out.println("客户端连接完成: " + clientId);
sseEmitters.remove(clientId);
});
emitter.onTimeout(() -> {
System.out.println("客户端连接超时: " + clientId);
sseEmitters.remove(clientId);
});
emitter.onError((ex) -> {
System.out.println("客户端连接错误: " + clientId + ", 错误: " + ex.getMessage());
sseEmitters.remove(clientId);
});
// 发送欢迎消息
try {
sendMessage(emitter, "connected", "连接成功,您的客户端ID: " + clientId);
} catch (IOException e) {
e.printStackTrace();
}
return emitter;
}
/**
* 向指定客户端发送消息
*/
private void sendMessage(SseEmitter emitter, String eventType, Object data) throws IOException {
SseEmitter.SseEventBuilder event = SseEmitter.event()
.name(eventType) // 事件类型
.id(String.valueOf(System.currentTimeMillis())) // 事件ID
.data(data) // 数据
.reconnectTime(3000); // 重连时间
emitter.send(event);
}
/**
* 向所有客户端广播消息
*/
@PostMapping("/broadcast")
public String broadcast(@RequestBody Map<String, Object> message) {
String eventType = (String) message.get("type");
Object data = message.get("data");
int successCount = 0;
for (Map.Entry<String, SseEmitter> entry : sseEmitters.entrySet()) {
try {
sendMessage(entry.getValue(), eventType, data);
successCount++;
} catch (IOException e) {
System.out.println("向客户端 " + entry.getKey() + " 发送消息失败: " + e.getMessage());
sseEmitters.remove(entry.getKey());
}
}
return "广播消息完成,成功发送给 " + successCount + " 个客户端";
}
/**
* 向指定客户端发送消息
*/
@PostMapping("/send/{clientId}")
public String sendToClient(@PathVariable String clientId,
@RequestBody Map<String, Object> message) {
SseEmitter emitter = sseEmitters.get(clientId);
if (emitter != null) {
try {
String eventType = (String) message.get("type");
Object data = message.get("data");
sendMessage(emitter, eventType, data);
return "消息发送成功";
} catch (IOException e) {
sseEmitters.remove(clientId);
return "消息发送失败: " + e.getMessage();
}
} else {
return "客户端不在线";
}
}
/**
* 获取在线客户端数量
*/
@GetMapping("/online")
public Map<String, Object> getOnlineCount() {
return Map.of(
"onlineCount", sseEmitters.size(),
"clients", sseEmitters.keySet()
);
}
/**
* 关闭所有连接
*/
@PostMapping("/close-all")
public String closeAllConnections() {
for (Map.Entry<String, SseEmitter> entry : sseEmitters.entrySet()) {
try {
entry.getValue().complete();
} catch (Exception e) {
e.printStackTrace();
}
}
sseEmitters.clear();
return "所有连接已关闭";
}
/**
* 启动定时任务,模拟实时数据推送
*/
@PostConstruct
public void startDemoTask() {
// 每 5 秒向所有客户端推送一条消息
scheduler.scheduleAtFixedRate(() -> {
if (!sseEmitters.isEmpty()) {
try {
Map<String, Object> message = Map.of(
"timestamp", System.currentTimeMillis(),
"content", "这是定时推送的消息 - " + new java.util.Date(),
"onlineUsers", sseEmitters.size()
);
// 异步发送,避免阻塞定时任务
new Thread(() -> {
broadcast(Map.of("type", "demo", "data", message));
}).start();
} catch (Exception e) {
e.printStackTrace();
}
}
}, 0, 5, TimeUnit.SECONDS);
}
}
3. 配置 CORS(可选,如果前后端分离)
// CorsConfig.java
package com.example.sse.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
@Configuration
public class CorsConfig implements WebMvcConfigurer {
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/api/**")
.allowedOrigins("http://localhost:3000", "http://127.0.0.1:3000") // Vue 开发服务器地址
.allowedMethods("GET", "POST", "PUT", "DELETE", "OPTIONS")
.allowedHeaders("*")
.allowCredentials(true)
.maxAge(3600);
}
}
4. 创建测试 Controller(用于手动触发消息)
// TestController.java
package com.example.sse.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.Map;
@RestController
@RequestMapping("/api/test")
public class TestController {
@Autowired
private SseController sseController;
@PostMapping("/send-notification")
public String sendNotification(@RequestParam String message) {
Map<String, Object> notification = Map.of(
"title", "系统通知",
"content", message,
"level", "info"
);
return sseController.broadcast(Map.of("type", "notification", "data", notification));
}
@PostMapping("/send-to-user")
public String sendToUser(@RequestParam String clientId,
@RequestParam String message) {
Map<String, Object> userMessage = Map.of(
"content", message,
"from", "admin"
);
return sseController.sendToClient(clientId, Map.of("type", "private", "data", userMessage));
}
}
三、Vue3 前端实现
1. 安装依赖
npm install axios
# 或者
yarn add axios
2. 创建 SSE 服务
// src/services/sseService.js
import axios from 'axios';
class SseService {
constructor() {
this.eventSource = null;
this.clientId = null;
this.listeners = new Map();
}
/**
* 建立 SSE 连接
* @param {string} clientId 客户端ID
* @param {function} onMessage 消息回调函数
* @param {function} onError 错误回调函数
* @param {function} onOpen 连接打开回调函数
*/
connect(clientId, onMessage, onError, onOpen) {
this.clientId = clientId;
// 创建 EventSource 连接
const url = `http://localhost:8080/api/sse/connect/${clientId}`;
this.eventSource = new EventSource(url);
// 监听消息事件
this.eventSource.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
if (onMessage) onMessage(data);
} catch (error) {
console.error('解析消息失败:', error);
if (onMessage) onMessage({ type: 'raw', data: event.data });
}
};
// 监听自定义事件类型
this.eventSource.addEventListener('connected', (event) => {
const data = JSON.parse(event.data);
console.log('连接成功:', data);
if (onOpen) onOpen(data);
});
this.eventSource.addEventListener('demo', (event) => {
const data = JSON.parse(event.data);
console.log('收到演示消息:', data);
if (onMessage) onMessage({ type: 'demo', data });
});
this.eventSource.addEventListener('notification', (event) => {
const data = JSON.parse(event.data);
console.log('收到通知:', data);
if (onMessage) onMessage({ type: 'notification', data });
});
this.eventSource.addEventListener('private', (event) => {
const data = JSON.parse(event.data);
console.log('收到私信:', data);
if (onMessage) onMessage({ type: 'private', data });
});
// 监听错误
this.eventSource.onerror = (error) => {
console.error('SSE 连接错误:', error);
if (onError) onError(error);
// 检查连接状态并尝试重连
if (this.eventSource.readyState === EventSource.CLOSED) {
console.log('连接已关闭,尝试重新连接...');
setTimeout(() => {
this.connect(clientId, onMessage, onError, onOpen);
}, 3000);
}
};
// 监听连接打开
this.eventSource.onopen = (event) => {
console.log('SSE 连接已打开');
if (onOpen) onOpen(event);
};
}
/**
* 断开连接
*/
disconnect() {
if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
console.log('SSE 连接已断开');
}
}
/**
* 发送消息到后端(用于测试)
*/
async sendBroadcast(message) {
try {
const response = await axios.post('http://localhost:8080/api/sse/broadcast', {
type: 'custom',
data: { message, timestamp: Date.now() }
});
return response.data;
} catch (error) {
console.error('发送广播消息失败:', error);
throw error;
}
}
/**
* 发送消息给指定客户端
*/
async sendToClient(clientId, message) {
try {
const response = await axios.post(`http://localhost:8080/api/sse/send/${clientId}`, {
type: 'direct',
data: { message, timestamp: Date.now() }
});
return response.data;
} catch (error) {
console.error('发送消息失败:', error);
throw error;
}
}
/**
* 获取在线客户端数量
*/
async getOnlineCount() {
try {
const response = await axios.get('http://localhost:8080/api/sse/online');
return response.data;
} catch (error) {
console.error('获取在线数量失败:', error);
throw error;
}
}
/**
* 添加事件监听器
*/
addEventListener(eventType, callback) {
this.listeners.set(eventType, callback);
}
/**
* 移除事件监听器
*/
removeEventListener(eventType) {
this.listeners.delete(eventType);
}
}
// 创建单例实例
const sseService = new SseService();
export default sseService;
3. 创建 Vue3 组件
<!-- src/components/SSETest.vue -->
<template>
<div class="sse-test">
<h2>SSE 实时消息推送测试</h2>
<!-- 连接控制 -->
<div class="section">
<h3>连接管理</h3>
<div class="input-group">
<input
v-model="clientId"
placeholder="请输入客户端ID"
:disabled="isConnected"
/>
<button
@click="connect"
:disabled="isConnected"
class="btn btn-connect"
>
连接
</button>
<button
@click="disconnect"
:disabled="!isConnected"
class="btn btn-disconnect"
>
断开连接
</button>
</div>
<div class="status" :class="{ connected: isConnected }">
连接状态: {{ isConnected ? '已连接' : '未连接' }}
</div>
</div>
<!-- 在线信息 -->
<div class="section">
<h3>在线信息</h3>
<button @click="getOnlineInfo" class="btn btn-info">刷新在线信息</button>
<div v-if="onlineInfo" class="info-box">
<p>在线人数: {{ onlineInfo.onlineCount }}</p>
<p>客户端IDs: {{ onlineInfo.clients?.join(', ') }}</p>
</div>
</div>
<!-- 消息发送 -->
<div class="section">
<h3>消息发送测试</h3>
<div class="input-group">
<input v-model="broadcastMessage" placeholder="广播消息内容" />
<button @click="sendBroadcast" class="btn btn-send">发送广播</button>
</div>
<div class="input-group">
<input v-model="targetClientId" placeholder="目标客户端ID" />
<input v-model="privateMessage" placeholder="私信内容" />
<button @click="sendPrivate" class="btn btn-send">发送私信</button>
</div>
</div>
<!-- 接收到的消息 -->
<div class="section">
<h3>接收到的消息 ({{ messages.length }})</h3>
<div class="messages-container">
<div
v-for="(msg, index) in messages"
:key="index"
class="message-item"
:class="msg.type"
>
<div class="message-header">
<span class="message-type">{{ msg.type }}</span>
<span class="message-time">{{ formatTime(msg.timestamp) }}</span>
</div>
<div class="message-content">
<pre>{{ JSON.stringify(msg.data, null, 2) }}</pre>
</div>
</div>
</div>
<button @click="clearMessages" class="btn btn-clear">清空消息</button>
</div>
</div>
</template>
<script setup>
import { ref, reactive, onMounted, onUnmounted } from 'vue';
import sseService from '../services/sseService';
// 响应式数据
const clientId = ref('');
const isConnected = ref(false);
const messages = ref([]);
const onlineInfo = ref(null);
const broadcastMessage = ref('');
const targetClientId = ref('');
const privateMessage = ref('');
// 生成随机客户端ID
const generateClientId = () => {
return 'client_' + Math.random().toString(36).substr(2, 9);
};
// 连接 SSE
const connect = () => {
if (!clientId.value) {
clientId.value = generateClientId();
}
sseService.connect(
clientId.value,
(data) => {
// 添加时间戳
const message = {
...data,
timestamp: Date.now()
};
messages.value.push(message);
// 自动滚动到底部
scrollToBottom();
},
(error) => {
console.error('连接错误:', error);
isConnected.value = false;
},
(data) => {
console.log('连接成功:', data);
isConnected.value = true;
}
);
};
// 断开连接
const disconnect = () => {
sseService.disconnect();
isConnected.value = false;
};
// 获取在线信息
const getOnlineInfo = async () => {
try {
onlineInfo.value = await sseService.getOnlineCount();
} catch (error) {
console.error('获取在线信息失败:', error);
}
};
// 发送广播消息
const sendBroadcast = async () => {
if (!broadcastMessage.value.trim()) return;
try {
await sseService.sendBroadcast(broadcastMessage.value);
broadcastMessage.value = '';
} catch (error) {
alert('发送失败: ' + error.message);
}
};
// 发送私信
const sendPrivate = async () => {
if (!targetClientId.value.trim() || !privateMessage.value.trim()) return;
try {
await sseService.sendToClient(targetClientId.value, privateMessage.value);
privateMessage.value = '';
} catch (error) {
alert('发送失败: ' + error.message);
}
};
// 清空消息
const clearMessages = () => {
messages.value = [];
};
// 格式化时间
const formatTime = (timestamp) => {
return new Date(timestamp).toLocaleTimeString();
};
// 滚动到底部
const scrollToBottom = () => {
setTimeout(() => {
const container = document.querySelector('.messages-container');
if (container) {
container.scrollTop = container.scrollHeight;
}
}, 100);
};
// 生命周期
onMounted(() => {
// 页面加载时生成一个默认的客户端ID
clientId.value = generateClientId();
});
onUnmounted(() => {
// 组件卸载时断开连接
disconnect();
});
</script>
<style scoped>
.sse-test {
max-width: 1000px;
margin: 0 auto;
padding: 20px;
font-family: Arial, sans-serif;
}
.section {
margin-bottom: 30px;
padding: 20px;
border: 1px solid #ddd;
border-radius: 8px;
background-color: #f9f9f9;
}
.input-group {
display: flex;
gap: 10px;
margin-bottom: 10px;
align-items: center;
}
input {
flex: 1;
padding: 8px 12px;
border: 1px solid #ccc;
border-radius: 4px;
font-size: 14px;
}
.btn {
padding: 8px 16px;
border: none;
border-radius: 4px;
cursor: pointer;
font-size: 14px;
transition: background-color 0.3s;
}
.btn:disabled {
opacity: 0.5;
cursor: not-allowed;
}
.btn-connect {
background-color: #4CAF50;
color: white;
}
.btn-connect:hover:not(:disabled) {
background-color: #45a049;
}
.btn-disconnect {
background-color: #f44336;
color: white;
}
.btn-disconnect:hover:not(:disabled) {
background-color: #da190b;
}
.btn-info {
background-color: #2196F3;
color: white;
}
.btn-send {
background-color: #ff9800;
color: white;
}
.btn-clear {
background-color: #9e9e9e;
color: white;
}
.status {
margin-top: 10px;
padding: 8px;
border-radius: 4px;
font-weight: bold;
}
.status.connected {
background-color: #d4edda;
color: #155724;
border: 1px solid #c3e6cb;
}
.info-box {
margin-top: 10px;
padding: 10px;
background-color: white;
border-radius: 4px;
border: 1px solid #ddd;
}
.messages-container {
height: 400px;
overflow-y: auto;
border: 1px solid #ddd;
border-radius: 4px;
padding: 10px;
background-color: white;
}
.message-item {
margin-bottom: 15px;
padding: 10px;
border-left: 4px solid #ddd;
background-color: #fafafa;
border-radius: 4px;
}
.message-item.demo {
border-left-color: #2196F3;
}
.message-item.notification {
border-left-color: #ff9800;
}
.message-item.private {
border-left-color: #9c27b0;
}
.message-item.connected {
border-left-color: #4CAF50;
}
.message-header {
display: flex;
justify-content: space-between;
margin-bottom: 8px;
font-size: 12px;
}
.message-type {
font-weight: bold;
text-transform: uppercase;
}
.message-time {
color: #666;
}
.message-content {
font-size: 13px;
}
.message-content pre {
margin: 0;
white-space: pre-wrap;
word-wrap: break-word;
}
</style>
4. 在主应用中使用组件
<!-- src/App.vue -->
<template>
<div id="app">
<SSETest />
</div>
</template>
<script setup>
import SSETest from './components/SSETest.vue';
</script>
<style>
#app {
font-family: Arial, sans-serif;
-webkit-font-smoothing: antialiased;
-moz-osx-font-smoothing: grayscale;
}
body {
margin: 0;
padding: 0;
background-color: #f0f2f5;
}
</style>
四、高级特性与优化
1. 使用 Redis 实现分布式 SSE
在分布式环境中,需要使用 Redis 来共享客户端连接信息:
// RedisSseController.java
@Service
public class RedisSseService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private static final String SSE_CLIENTS_KEY = "sse:clients";
public void addClient(String clientId, String serverId) {
redisTemplate.opsForHash().put(SSE_CLIENTS_KEY, clientId, serverId);
}
public void removeClient(String clientId) {
redisTemplate.opsForHash().delete(SSE_CLIENTS_KEY, clientId);
}
public Map<Object, Object> getAllClients() {
return redisTemplate.opsForHash().entries(SSE_CLIENTS_KEY);
}
}
2. 心跳检测机制
// 在 SseController 中添加心跳
@Scheduled(fixedRate = 30000) // 每30秒发送一次心跳
public void sendHeartbeat() {
for (Map.Entry<String, SseEmitter> entry : sseEmitters.entrySet()) {
try {
sendMessage(entry.getValue(), "heartbeat", Map.of("timestamp", System.currentTimeMillis()));
} catch (IOException e) {
sseEmitters.remove(entry.getKey());
}
}
}
3. 前端重连策略优化
// 在前端添加指数退避重连策略
class SseServiceWithRetry extends SseService {
connectWithRetry(clientId, onMessage, onError, onOpen, maxRetries = 5) {
let retryCount = 0;
const baseDelay = 1000;
const attemptConnection = () => {
this.connect(
clientId,
onMessage,
(error) => {
retryCount++;
if (retryCount <= maxRetries) {
const delay = baseDelay * Math.pow(2, retryCount - 1);
console.log(`连接失败,${delay}ms 后重试 (${retryCount}/${maxRetries})`);
setTimeout(attemptConnection, delay);
} else {
onError(error);
}
},
onOpen
);
};
attemptConnection();
}
}
五、部署与测试
1. 后端配置
在 application.properties 中配置:
# 服务器端口
server.port=8080
# 关闭 Thymeleaf 缓存(开发环境)
spring.thymeleaf.cache=false
# 如果需要跨域,确保 CORS 配置正确
2. 启动应用
mvn spring-boot:run
3. 测试步骤
- 启动 SpringBoot 应用
- 在浏览器中打开
http://localhost:3000(Vue 应用) - 输入客户端 ID 并点击连接
- 观察定时推送的演示消息
- 测试广播消息和私信功能
- 打开多个浏览器标签页模拟多客户端连接
六、总结
通过本文的介绍,我们实现了:
- SpringBoot 后端:完整的 SSE 控制器,支持连接管理、消息推送、广播等功能
- Vue3 前端:美观的测试界面,支持连接管理、消息发送和接收显示
- 高级特性:错误处理、重连机制、心跳检测、分布式扩展等
- 实际应用:可用于实时通知、股票行情、聊天消息、系统监控等场景
SSE 技术相比 WebSocket 更简单,适合服务器向客户端的单向实时通信场景。在实际项目中,可以根据业务需求选择合适的实时通信技术。
© 版权声明
文中内容均来源于公开资料,受限于信息的时效性和复杂性,可能存在误差或遗漏。我们已尽力确保内容的准确性,但对于因信息变更或错误导致的任何后果,本站不承担任何责任。如需引用本文内容,请注明出处并尊重原作者的版权。
THE END

























暂无评论内容