SpringBoot + Vue3 整合 SSE 实现实时消息推送功能

本文将详细介绍如何使用 SpringBoot 作为后端,Vue3 作为前端,通过 Server-Sent Events (SSE) 技术实现实时消息推送功能。

图片[1]_SpringBoot + Vue3 整合 SSE 实现实时消息推送功能_知途无界

一、SSE 技术简介

Server-Sent Events (SSE) 是一种基于 HTTP 的服务器向客户端推送数据的技术,特点包括:

  • 单向通信​:服务器向客户端推送数据
  • 基于 HTTP​:使用标准的 HTTP 协议
  • 自动重连​:浏览器内置重连机制
  • 轻量级​:相比 WebSocket 更简单,适合单向数据流场景

二、SpringBoot 后端实现

1. 添加依赖

<!-- pom.xml -->
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- 如果需要消息队列,可以添加 Redis -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
</dependencies>

2. 创建 SSE 控制器

// SseController.java
package com.example.sse.controller;

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@RestController
@RequestMapping("/api/sse")
public class SseController {

    // 存储所有连接的客户端
    private final Map<String, SseEmitter> sseEmitters = new ConcurrentHashMap<>();
    
    // 定时任务线程池,用于模拟消息推送
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

    /**
     * 建立 SSE 连接
     */
    @GetMapping(value = "/connect/{clientId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter connect(@PathVariable String clientId) {
        // 设置超时时间(毫秒),0 表示永不超时
        SseEmitter emitter = new SseEmitter(0L);
        
        // 注册到映射表中
        sseEmitters.put(clientId, emitter);
        
        System.out.println("客户端连接: " + clientId + ", 当前在线数: " + sseEmitters.size());
        
        // 连接成功回调
        emitter.onCompletion(() -> {
            System.out.println("客户端连接完成: " + clientId);
            sseEmitters.remove(clientId);
        });
        
        emitter.onTimeout(() -> {
            System.out.println("客户端连接超时: " + clientId);
            sseEmitters.remove(clientId);
        });
        
        emitter.onError((ex) -> {
            System.out.println("客户端连接错误: " + clientId + ", 错误: " + ex.getMessage());
            sseEmitters.remove(clientId);
        });
        
        // 发送欢迎消息
        try {
            sendMessage(emitter, "connected", "连接成功,您的客户端ID: " + clientId);
        } catch (IOException e) {
            e.printStackTrace();
        }
        
        return emitter;
    }

    /**
     * 向指定客户端发送消息
     */
    private void sendMessage(SseEmitter emitter, String eventType, Object data) throws IOException {
        SseEmitter.SseEventBuilder event = SseEmitter.event()
                .name(eventType) // 事件类型
                .id(String.valueOf(System.currentTimeMillis())) // 事件ID
                .data(data) // 数据
                .reconnectTime(3000); // 重连时间
        emitter.send(event);
    }

    /**
     * 向所有客户端广播消息
     */
    @PostMapping("/broadcast")
    public String broadcast(@RequestBody Map<String, Object> message) {
        String eventType = (String) message.get("type");
        Object data = message.get("data");
        
        int successCount = 0;
        for (Map.Entry<String, SseEmitter> entry : sseEmitters.entrySet()) {
            try {
                sendMessage(entry.getValue(), eventType, data);
                successCount++;
            } catch (IOException e) {
                System.out.println("向客户端 " + entry.getKey() + " 发送消息失败: " + e.getMessage());
                sseEmitters.remove(entry.getKey());
            }
        }
        
        return "广播消息完成,成功发送给 " + successCount + " 个客户端";
    }

    /**
     * 向指定客户端发送消息
     */
    @PostMapping("/send/{clientId}")
    public String sendToClient(@PathVariable String clientId, 
                               @RequestBody Map<String, Object> message) {
        SseEmitter emitter = sseEmitters.get(clientId);
        if (emitter != null) {
            try {
                String eventType = (String) message.get("type");
                Object data = message.get("data");
                sendMessage(emitter, eventType, data);
                return "消息发送成功";
            } catch (IOException e) {
                sseEmitters.remove(clientId);
                return "消息发送失败: " + e.getMessage();
            }
        } else {
            return "客户端不在线";
        }
    }

    /**
     * 获取在线客户端数量
     */
    @GetMapping("/online")
    public Map<String, Object> getOnlineCount() {
        return Map.of(
            "onlineCount", sseEmitters.size(),
            "clients", sseEmitters.keySet()
        );
    }

    /**
     * 关闭所有连接
     */
    @PostMapping("/close-all")
    public String closeAllConnections() {
        for (Map.Entry<String, SseEmitter> entry : sseEmitters.entrySet()) {
            try {
                entry.getValue().complete();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        sseEmitters.clear();
        return "所有连接已关闭";
    }

    /**
     * 启动定时任务,模拟实时数据推送
     */
    @PostConstruct
    public void startDemoTask() {
        // 每 5 秒向所有客户端推送一条消息
        scheduler.scheduleAtFixedRate(() -> {
            if (!sseEmitters.isEmpty()) {
                try {
                    Map<String, Object> message = Map.of(
                        "timestamp", System.currentTimeMillis(),
                        "content", "这是定时推送的消息 - " + new java.util.Date(),
                        "onlineUsers", sseEmitters.size()
                    );
                    
                    // 异步发送,避免阻塞定时任务
                    new Thread(() -> {
                        broadcast(Map.of("type", "demo", "data", message));
                    }).start();
                    
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, 0, 5, TimeUnit.SECONDS);
    }
}

3. 配置 CORS(可选,如果前后端分离)

// CorsConfig.java
package com.example.sse.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

@Configuration
public class CorsConfig implements WebMvcConfigurer {
    
    @Override
    public void addCorsMappings(CorsRegistry registry) {
        registry.addMapping("/api/**")
                .allowedOrigins("http://localhost:3000", "http://127.0.0.1:3000") // Vue 开发服务器地址
                .allowedMethods("GET", "POST", "PUT", "DELETE", "OPTIONS")
                .allowedHeaders("*")
                .allowCredentials(true)
                .maxAge(3600);
    }
}

4. 创建测试 Controller(用于手动触发消息)

// TestController.java
package com.example.sse.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.util.Map;

@RestController
@RequestMapping("/api/test")
public class TestController {

    @Autowired
    private SseController sseController;

    @PostMapping("/send-notification")
    public String sendNotification(@RequestParam String message) {
        Map<String, Object> notification = Map.of(
            "title", "系统通知",
            "content", message,
            "level", "info"
        );
        return sseController.broadcast(Map.of("type", "notification", "data", notification));
    }

    @PostMapping("/send-to-user")
    public String sendToUser(@RequestParam String clientId, 
                           @RequestParam String message) {
        Map<String, Object> userMessage = Map.of(
            "content", message,
            "from", "admin"
        );
        return sseController.sendToClient(clientId, Map.of("type", "private", "data", userMessage));
    }
}

三、Vue3 前端实现

1. 安装依赖

npm install axios
# 或者
yarn add axios

2. 创建 SSE 服务

// src/services/sseService.js
import axios from 'axios';

class SseService {
    constructor() {
        this.eventSource = null;
        this.clientId = null;
        this.listeners = new Map();
    }

    /**
     * 建立 SSE 连接
     * @param {string} clientId 客户端ID
     * @param {function} onMessage 消息回调函数
     * @param {function} onError 错误回调函数
     * @param {function} onOpen 连接打开回调函数
     */
    connect(clientId, onMessage, onError, onOpen) {
        this.clientId = clientId;
        
        // 创建 EventSource 连接
        const url = `http://localhost:8080/api/sse/connect/${clientId}`;
        this.eventSource = new EventSource(url);

        // 监听消息事件
        this.eventSource.onmessage = (event) => {
            try {
                const data = JSON.parse(event.data);
                if (onMessage) onMessage(data);
            } catch (error) {
                console.error('解析消息失败:', error);
                if (onMessage) onMessage({ type: 'raw', data: event.data });
            }
        };

        // 监听自定义事件类型
        this.eventSource.addEventListener('connected', (event) => {
            const data = JSON.parse(event.data);
            console.log('连接成功:', data);
            if (onOpen) onOpen(data);
        });

        this.eventSource.addEventListener('demo', (event) => {
            const data = JSON.parse(event.data);
            console.log('收到演示消息:', data);
            if (onMessage) onMessage({ type: 'demo', data });
        });

        this.eventSource.addEventListener('notification', (event) => {
            const data = JSON.parse(event.data);
            console.log('收到通知:', data);
            if (onMessage) onMessage({ type: 'notification', data });
        });

        this.eventSource.addEventListener('private', (event) => {
            const data = JSON.parse(event.data);
            console.log('收到私信:', data);
            if (onMessage) onMessage({ type: 'private', data });
        });

        // 监听错误
        this.eventSource.onerror = (error) => {
            console.error('SSE 连接错误:', error);
            if (onError) onError(error);
            
            // 检查连接状态并尝试重连
            if (this.eventSource.readyState === EventSource.CLOSED) {
                console.log('连接已关闭,尝试重新连接...');
                setTimeout(() => {
                    this.connect(clientId, onMessage, onError, onOpen);
                }, 3000);
            }
        };

        // 监听连接打开
        this.eventSource.onopen = (event) => {
            console.log('SSE 连接已打开');
            if (onOpen) onOpen(event);
        };
    }

    /**
     * 断开连接
     */
    disconnect() {
        if (this.eventSource) {
            this.eventSource.close();
            this.eventSource = null;
            console.log('SSE 连接已断开');
        }
    }

    /**
     * 发送消息到后端(用于测试)
     */
    async sendBroadcast(message) {
        try {
            const response = await axios.post('http://localhost:8080/api/sse/broadcast', {
                type: 'custom',
                data: { message, timestamp: Date.now() }
            });
            return response.data;
        } catch (error) {
            console.error('发送广播消息失败:', error);
            throw error;
        }
    }

    /**
     * 发送消息给指定客户端
     */
    async sendToClient(clientId, message) {
        try {
            const response = await axios.post(`http://localhost:8080/api/sse/send/${clientId}`, {
                type: 'direct',
                data: { message, timestamp: Date.now() }
            });
            return response.data;
        } catch (error) {
            console.error('发送消息失败:', error);
            throw error;
        }
    }

    /**
     * 获取在线客户端数量
     */
    async getOnlineCount() {
        try {
            const response = await axios.get('http://localhost:8080/api/sse/online');
            return response.data;
        } catch (error) {
            console.error('获取在线数量失败:', error);
            throw error;
        }
    }

    /**
     * 添加事件监听器
     */
    addEventListener(eventType, callback) {
        this.listeners.set(eventType, callback);
    }

    /**
     * 移除事件监听器
     */
    removeEventListener(eventType) {
        this.listeners.delete(eventType);
    }
}

// 创建单例实例
const sseService = new SseService();
export default sseService;

3. 创建 Vue3 组件

<!-- src/components/SSETest.vue -->
<template>
  <div class="sse-test">
    <h2>SSE 实时消息推送测试</h2>
    
    <!-- 连接控制 -->
    <div class="section">
      <h3>连接管理</h3>
      <div class="input-group">
        <input 
          v-model="clientId" 
          placeholder="请输入客户端ID"
          :disabled="isConnected"
        />
        <button 
          @click="connect" 
          :disabled="isConnected"
          class="btn btn-connect"
        >
          连接
        </button>
        <button 
          @click="disconnect" 
          :disabled="!isConnected"
          class="btn btn-disconnect"
        >
          断开连接
        </button>
      </div>
      <div class="status" :class="{ connected: isConnected }">
        连接状态: {{ isConnected ? '已连接' : '未连接' }}
      </div>
    </div>

    <!-- 在线信息 -->
    <div class="section">
      <h3>在线信息</h3>
      <button @click="getOnlineInfo" class="btn btn-info">刷新在线信息</button>
      <div v-if="onlineInfo" class="info-box">
        <p>在线人数: {{ onlineInfo.onlineCount }}</p>
        <p>客户端IDs: {{ onlineInfo.clients?.join(', ') }}</p>
      </div>
    </div>

    <!-- 消息发送 -->
    <div class="section">
      <h3>消息发送测试</h3>
      <div class="input-group">
        <input v-model="broadcastMessage" placeholder="广播消息内容" />
        <button @click="sendBroadcast" class="btn btn-send">发送广播</button>
      </div>
      
      <div class="input-group">
        <input v-model="targetClientId" placeholder="目标客户端ID" />
        <input v-model="privateMessage" placeholder="私信内容" />
        <button @click="sendPrivate" class="btn btn-send">发送私信</button>
      </div>
    </div>

    <!-- 接收到的消息 -->
    <div class="section">
      <h3>接收到的消息 ({{ messages.length }})</h3>
      <div class="messages-container">
        <div 
          v-for="(msg, index) in messages" 
          :key="index" 
          class="message-item"
          :class="msg.type"
        >
          <div class="message-header">
            <span class="message-type">{{ msg.type }}</span>
            <span class="message-time">{{ formatTime(msg.timestamp) }}</span>
          </div>
          <div class="message-content">
            <pre>{{ JSON.stringify(msg.data, null, 2) }}</pre>
          </div>
        </div>
      </div>
      <button @click="clearMessages" class="btn btn-clear">清空消息</button>
    </div>
  </div>
</template>

<script setup>
import { ref, reactive, onMounted, onUnmounted } from 'vue';
import sseService from '../services/sseService';

// 响应式数据
const clientId = ref('');
const isConnected = ref(false);
const messages = ref([]);
const onlineInfo = ref(null);
const broadcastMessage = ref('');
const targetClientId = ref('');
const privateMessage = ref('');

// 生成随机客户端ID
const generateClientId = () => {
  return 'client_' + Math.random().toString(36).substr(2, 9);
};

// 连接 SSE
const connect = () => {
  if (!clientId.value) {
    clientId.value = generateClientId();
  }
  
  sseService.connect(
    clientId.value,
    (data) => {
      // 添加时间戳
      const message = {
        ...data,
        timestamp: Date.now()
      };
      messages.value.push(message);
      
      // 自动滚动到底部
      scrollToBottom();
    },
    (error) => {
      console.error('连接错误:', error);
      isConnected.value = false;
    },
    (data) => {
      console.log('连接成功:', data);
      isConnected.value = true;
    }
  );
};

// 断开连接
const disconnect = () => {
  sseService.disconnect();
  isConnected.value = false;
};

// 获取在线信息
const getOnlineInfo = async () => {
  try {
    onlineInfo.value = await sseService.getOnlineCount();
  } catch (error) {
    console.error('获取在线信息失败:', error);
  }
};

// 发送广播消息
const sendBroadcast = async () => {
  if (!broadcastMessage.value.trim()) return;
  
  try {
    await sseService.sendBroadcast(broadcastMessage.value);
    broadcastMessage.value = '';
  } catch (error) {
    alert('发送失败: ' + error.message);
  }
};

// 发送私信
const sendPrivate = async () => {
  if (!targetClientId.value.trim() || !privateMessage.value.trim()) return;
  
  try {
    await sseService.sendToClient(targetClientId.value, privateMessage.value);
    privateMessage.value = '';
  } catch (error) {
    alert('发送失败: ' + error.message);
  }
};

// 清空消息
const clearMessages = () => {
  messages.value = [];
};

// 格式化时间
const formatTime = (timestamp) => {
  return new Date(timestamp).toLocaleTimeString();
};

// 滚动到底部
const scrollToBottom = () => {
  setTimeout(() => {
    const container = document.querySelector('.messages-container');
    if (container) {
      container.scrollTop = container.scrollHeight;
    }
  }, 100);
};

// 生命周期
onMounted(() => {
  // 页面加载时生成一个默认的客户端ID
  clientId.value = generateClientId();
});

onUnmounted(() => {
  // 组件卸载时断开连接
  disconnect();
});
</script>

<style scoped>
.sse-test {
  max-width: 1000px;
  margin: 0 auto;
  padding: 20px;
  font-family: Arial, sans-serif;
}

.section {
  margin-bottom: 30px;
  padding: 20px;
  border: 1px solid #ddd;
  border-radius: 8px;
  background-color: #f9f9f9;
}

.input-group {
  display: flex;
  gap: 10px;
  margin-bottom: 10px;
  align-items: center;
}

input {
  flex: 1;
  padding: 8px 12px;
  border: 1px solid #ccc;
  border-radius: 4px;
  font-size: 14px;
}

.btn {
  padding: 8px 16px;
  border: none;
  border-radius: 4px;
  cursor: pointer;
  font-size: 14px;
  transition: background-color 0.3s;
}

.btn:disabled {
  opacity: 0.5;
  cursor: not-allowed;
}

.btn-connect {
  background-color: #4CAF50;
  color: white;
}

.btn-connect:hover:not(:disabled) {
  background-color: #45a049;
}

.btn-disconnect {
  background-color: #f44336;
  color: white;
}

.btn-disconnect:hover:not(:disabled) {
  background-color: #da190b;
}

.btn-info {
  background-color: #2196F3;
  color: white;
}

.btn-send {
  background-color: #ff9800;
  color: white;
}

.btn-clear {
  background-color: #9e9e9e;
  color: white;
}

.status {
  margin-top: 10px;
  padding: 8px;
  border-radius: 4px;
  font-weight: bold;
}

.status.connected {
  background-color: #d4edda;
  color: #155724;
  border: 1px solid #c3e6cb;
}

.info-box {
  margin-top: 10px;
  padding: 10px;
  background-color: white;
  border-radius: 4px;
  border: 1px solid #ddd;
}

.messages-container {
  height: 400px;
  overflow-y: auto;
  border: 1px solid #ddd;
  border-radius: 4px;
  padding: 10px;
  background-color: white;
}

.message-item {
  margin-bottom: 15px;
  padding: 10px;
  border-left: 4px solid #ddd;
  background-color: #fafafa;
  border-radius: 4px;
}

.message-item.demo {
  border-left-color: #2196F3;
}

.message-item.notification {
  border-left-color: #ff9800;
}

.message-item.private {
  border-left-color: #9c27b0;
}

.message-item.connected {
  border-left-color: #4CAF50;
}

.message-header {
  display: flex;
  justify-content: space-between;
  margin-bottom: 8px;
  font-size: 12px;
}

.message-type {
  font-weight: bold;
  text-transform: uppercase;
}

.message-time {
  color: #666;
}

.message-content {
  font-size: 13px;
}

.message-content pre {
  margin: 0;
  white-space: pre-wrap;
  word-wrap: break-word;
}
</style>

4. 在主应用中使用组件

<!-- src/App.vue -->
<template>
  <div id="app">
    <SSETest />
  </div>
</template>

<script setup>
import SSETest from './components/SSETest.vue';
</script>

<style>
#app {
  font-family: Arial, sans-serif;
  -webkit-font-smoothing: antialiased;
  -moz-osx-font-smoothing: grayscale;
}

body {
  margin: 0;
  padding: 0;
  background-color: #f0f2f5;
}
</style>

四、高级特性与优化

1. 使用 Redis 实现分布式 SSE

在分布式环境中,需要使用 Redis 来共享客户端连接信息:

// RedisSseController.java
@Service
public class RedisSseService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    private static final String SSE_CLIENTS_KEY = "sse:clients";
    
    public void addClient(String clientId, String serverId) {
        redisTemplate.opsForHash().put(SSE_CLIENTS_KEY, clientId, serverId);
    }
    
    public void removeClient(String clientId) {
        redisTemplate.opsForHash().delete(SSE_CLIENTS_KEY, clientId);
    }
    
    public Map<Object, Object> getAllClients() {
        return redisTemplate.opsForHash().entries(SSE_CLIENTS_KEY);
    }
}

2. 心跳检测机制

// 在 SseController 中添加心跳
@Scheduled(fixedRate = 30000) // 每30秒发送一次心跳
public void sendHeartbeat() {
    for (Map.Entry<String, SseEmitter> entry : sseEmitters.entrySet()) {
        try {
            sendMessage(entry.getValue(), "heartbeat", Map.of("timestamp", System.currentTimeMillis()));
        } catch (IOException e) {
            sseEmitters.remove(entry.getKey());
        }
    }
}

3. 前端重连策略优化

// 在前端添加指数退避重连策略
class SseServiceWithRetry extends SseService {
    connectWithRetry(clientId, onMessage, onError, onOpen, maxRetries = 5) {
        let retryCount = 0;
        const baseDelay = 1000;
        
        const attemptConnection = () => {
            this.connect(
                clientId,
                onMessage,
                (error) => {
                    retryCount++;
                    if (retryCount <= maxRetries) {
                        const delay = baseDelay * Math.pow(2, retryCount - 1);
                        console.log(`连接失败,${delay}ms 后重试 (${retryCount}/${maxRetries})`);
                        setTimeout(attemptConnection, delay);
                    } else {
                        onError(error);
                    }
                },
                onOpen
            );
        };
        
        attemptConnection();
    }
}

五、部署与测试

1. 后端配置

application.properties 中配置:

# 服务器端口
server.port=8080

# 关闭 Thymeleaf 缓存(开发环境)
spring.thymeleaf.cache=false

# 如果需要跨域,确保 CORS 配置正确

2. 启动应用

mvn spring-boot:run

3. 测试步骤

  1. 启动 SpringBoot 应用
  2. 在浏览器中打开 http://localhost:3000(Vue 应用)
  3. 输入客户端 ID 并点击连接
  4. 观察定时推送的演示消息
  5. 测试广播消息和私信功能
  6. 打开多个浏览器标签页模拟多客户端连接

六、总结

通过本文的介绍,我们实现了:

  1. SpringBoot 后端​:完整的 SSE 控制器,支持连接管理、消息推送、广播等功能
  2. Vue3 前端​:美观的测试界面,支持连接管理、消息发送和接收显示
  3. 高级特性​:错误处理、重连机制、心跳检测、分布式扩展等
  4. 实际应用​:可用于实时通知、股票行情、聊天消息、系统监控等场景

SSE 技术相比 WebSocket 更简单,适合服务器向客户端的单向实时通信场景。在实际项目中,可以根据业务需求选择合适的实时通信技术。

© 版权声明
THE END
喜欢就点个赞,支持一下吧!
点赞63 分享
评论 抢沙发
头像
欢迎您留下评论!
提交
头像

昵称

取消
昵称表情代码图片

    暂无评论内容