DeepSeek+SpringAI


核心技术业务功能
- 本地部署大模型DeepSeek
- Ollama脚本Api使用、角色定制
- SpringBoot +Spring AI
- 代码调用本地模型编程
- 模型会话数据记录持久化
DeepSeek

DeepSeek-R1 发布,性能对标 OpenAI o1 正式版 | DeepSeek API Docs

V3 论文:https://github.com/deepseek-ai/DeepSeek-V3/blob/main/DeepSeek_V3.pdf
一、Ollama与DeepSeek
1.Ollama部署DeepSeek
- 运行模型
ollama run deepseek-r1:8b
- 查看模型列表
ollama list
设置ollama 下载模型到其他盘符
安装时使用命令行指定位置
OllamaSetup.exe /DIR=G:\ollama
- 系统环境变量添加参数
OLLAMA_MODELS
G:\ollama\models
Path中添加: G:\ollama
然后重启电脑
Ollama 常用命令
1. 启动Ollama服务 ollama serve 2. 从模型文件创建模型 ollama create 3. 显示模型信息 ollama show 4. 运行模型 ollama run 模型名称 5. 从注册表中拉去模型 ollama pull 模型名称 6. 将模型推送到注册表 ollama push 7. 列出模型 ollama list 8. 复制模型 ollama cp 9. 删除模型 ollama rm 模型名称 10. 获取有关Ollama任何命令的帮助信息 ollama help
2. **PageAssist **
安装插件:必须谷歌浏览器使用魔法


3.Chatbox AI
Ollma设置:http://127.0.0.1:11434


4.CherryStudio集成硅基流动满血DeepSeek Api
线上方案

二、Java环境创建(忽略)
1.安装JDK21与Maven
参考(如果已配置可以忽略,非初学者都不用)
[JDK21下载+安装+环境配置教程(Windows系统)-CSDN博客](https://blog.csdn.net/m0_69750058/article/details/131628553#:~:text=前言:甲骨文公司与2023.9发布JDK21%2CJDK21将是一个 长期支持 (LTS)版本,JDK20目前可以从官网下载使用。 1,搜索 Oracle 官网找到JDK21 Java,Downloads | Oracle 2,切换 Windows 系统,然后点击下载(初学者下载X64 Installer就可) 3,下载完成后点击安装向导(本贴选择放在D盘目录下))
Maven安装与配置(详细步骤)_maven安装及配置教程-CSDN博客
2.使用IDEA配置基础项目环境
Springboot3.4.2
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.3.8</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>javao.cn</groupId>
<artifactId>deepseek-doctor</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>deepseek-doctor</name>
<description>deepseek-doctor</description>
<url/>
<licenses>
<license/>
</licenses>
<developers>
<developer/>
</developers>
<scm>
<connection/>
<developerConnection/>
<tag/>
<url/>
</scm>
<properties>
<java.version>21</java.version>
</properties>
<dependencies>
<!-- 核心-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter-test</artifactId>
<version>3.0.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- aop切面-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<!-- 配置文件相关-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>spring-boot-starter-jdbc</artifactId>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.mybatis.spring.boot</groupId>-->
<!-- <artifactId>mybatis-spring-boot-starter</artifactId>-->
<!-- <version>3.0.4</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>com.mysql</groupId>-->
<!-- <artifactId>mysql-connector-j</artifactId>-->
<!-- <scope>runtime</scope>-->
<!-- </dependency>-->
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
三、集成DeepSeek
1.调用ollama API
Ollma官方文档:文档 -- Ollama 中文文档|Ollama官方文档
2.SpringAI
官网文档:Spring AI 中文文档 _ Spring官方文档中文翻译 _ Spring AI中文翻译|Spring 教程 —— CADN开发者文档中心
SpringAI集成ollama实现deepseek同步调用 Spring boot版本配合3.3.8
<!-- 引入SpringAI-->
<dependency>
<groupId>io.springboot.ai</groupId>
<artifactId>spring-ai-ollama</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>io.springboot.ai</groupId>
<artifactId>spring-ai-ollama-spring-boot-starter</artifactId>
<version>1.0.3</version>
</dependency>
接口示例
package javao.cn.controller;
import com.github.xiaoymin.knife4j.annotations.ApiSupport;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.enums.ParameterIn;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.annotation.Resource;
import org.springframework.ai.ollama.OllamaChatClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* @Description Ollama 测试Controller
* @Author Admin小闫
* @Date 2025/3/13
*/
@Tag(name = "Ollama")
@ApiSupport(author = "Admin小闫", order = 1)
@RestController
@RequestMapping("/ollama")
public class OllamaController {
@Resource
private OllamaChatClient ollamaChatClient;
@Operation(summary = "测试请求")
@GetMapping("/ai/chat")
@Parameter(name = "msg",description = "消息正文",required = true,in= ParameterIn.QUERY)
public String chat(@RequestParam String msg){
return ollamaChatClient.call(msg);
}
}
3.knife4j
官方文档:快速开始 | Knife4j
依赖:
<!-- knife4j 引入-->
<dependency>
<groupId>com.github.xiaoymin</groupId>
<artifactId>knife4j-openapi3-jakarta-spring-boot-starter</artifactId>
<version>4.4.0</version>
</dependency>
配置文件
# springdoc-openapi项目配置
springdoc:
swagger-ui:
path: /swagger-ui.html
tags-sorter: alpha
operations-sorter: alpha
api-docs:
path: /v3/api-docs
group-configs:
- group: 'Ollama'
paths-to-match: '/**'
packages-to-scan: javao.cn.controller
knife4j:
# 开启增强配置
enable: true
# 开启生产环境屏蔽(如果是生产环境,需要把下面配置设置true)
# production: true
setting:
language: zh_cn
# swagger-model-name: 实体类列表
basic: # 开始授权认证
enable: true
username: admin
password: 123456
配置文件
package javao.cn.config;
import io.swagger.v3.oas.models.Components;
import io.swagger.v3.oas.models.OpenAPI;
import io.swagger.v3.oas.models.info.Contact;
import io.swagger.v3.oas.models.info.Info;
import io.swagger.v3.oas.models.info.License;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/***
* 创建Swagger配置
* @since:knife4j-springdoc-openapi-demo 1.0
* @author <a href="mailto:xiaoymin@foxmail.com">xiaoymin@foxmail.com</a>
* 2020/03/15 20:40
*/
@Configuration
public class SwaggerConfig {
@Bean
public OpenAPI customOpenAPI() {
Contact contact = new Contact();
contact.setEmail("yanpeng@niliuapp.work");
contact.setName("闫鹏");
contact.setUrl("http://doc.xiaominfo.com");
return new OpenAPI()
// 增加swagger授权请求头配置
// .components(new Components().addSecuritySchemes(CommonConstant.X_ACCESS_TOKEN,
// new SecurityScheme().type(SecurityScheme.Type.HTTP).scheme(CommonConstant.X_ACCESS_TOKEN)))
.info(new Info()
.title("ollama 后台服务API接口文档")
.version("1.0")
.contact(contact)
.description("Knife4j集成springdoc-openapi示例")
.termsOfService("http://doc.xiaominfo.com")
.license(new License().name("Apache 2.0")
.url("http://www.apache.org/licenses/LICENSE-2.0.html")));
}
}
4.流式stream调用Ollama
接口示例
@Operation(summary = "流试调用DeepSeek")
@GetMapping("/ai/streamChat")
@Parameter(name = "msg",description = "消息正文",required = true,in= ParameterIn.QUERY)
public List<String> streamChat(@RequestParam String msg){
Prompt prompt = new Prompt(new UserMessage(msg));
List<String> collect = ollamaChatClient.stream(prompt).toStream().map(chatResponse -> {
String content = chatResponse.getResult().getOutput().getContent();
log.info(content);
return content;
}).collect(Collectors.toList());
return collect;
}
四.SSE服务端推送*
1.原理机制解析
服务端发送事件 Service Sent Event

与WebSocket的主要区别是单向流输出,不可逆。服务的-客户端。
2.SSE构建
package javao.cn.utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
/**
* @Description SSE服务器构建
* @Author Admin小闫
* @Date 2025/3/13
*/
public class SSEServer {
private static final Logger log = LoggerFactory.getLogger(SSEServer.class);
/**
* 使用map对象关联用户ID与SSE对象
*/
private static Map<String, SseEmitter> sseClientMap = new ConcurrentHashMap<>();
public static SseEmitter connect(String userId) {
//设置默认超时时间,0代表永不过期;默认超时时间为30秒;
SseEmitter sseEmitter = new SseEmitter(0L);
//设置完成回调,注册SSE的回调事件
sseEmitter.onCompletion(completionCallback(userId));
sseEmitter.onError(errorCallback(userId));
sseEmitter.onTimeout(timeoutCallback(userId));
sseClientMap.put(userId, sseEmitter);
log.info("当前用户id为{}连接成功!", userId);
return sseEmitter;
}
/**
* 当用户连接完成并关闭时的回调
*
* @param userId 用户ID,用于标识和管理用户连接
* @return Runnable 一个实现了Runnable接口的匿名内部类实例,用于执行连接完成后的逻辑
*/
private static Runnable completionCallback(String userId) {
return () -> {
// 记录用户连接完成并关闭的信息
log.info("用户id为{}连接完成并关闭!", userId);
// 从映射中移除用户的连接
sseClientMap.remove(userId);
};
}
/**
* 当用户连接发生错误时的回调
*
* @param userId 用户ID,用于标识和管理用户连接
* @return Runnable 一个实现了Runnable接口的匿名内部类实例,用于执行连接错误时的逻辑
*/
private static Consumer<Throwable> errorCallback(String userId) {
return Throwable -> {
// 记录用户连接错误断开的信息
log.info("用户id为{}连接发生错误!", userId);
// 从映射中移除用户的连接
sseClientMap.remove(userId);
};
}
/**
* 当用户连接超时时的回调
*
* @param userId 用户ID,用于标识和管理用户连接
* @return Runnable 一个实现了Runnable接口的匿名内部类实例,用于执行连接超时时的逻辑
*/
private static Runnable timeoutCallback(String userId) {
return () -> {
// 记录用户连接超时的信息
log.info("用户id为{}连接超时!", userId);
// 从映射中移除用户的连接
sseClientMap.remove(userId);
};
}
}
package javao.cn.controller;
import com.github.xiaoymin.knife4j.annotations.ApiSupport;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.enums.ParameterIn;
import io.swagger.v3.oas.annotations.tags.Tag;
import javao.cn.utils.SSEServer;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.awt.*;
/**
* @Description
* @Author Admin小闫
* @Date 2025/3/13
*/
@Tag(name = "SSE")
@ApiSupport(author = "Admin小闫", order = 1)
@RestController
@RequestMapping("/sse")
public class SSEController {
/**
* 链接SSE服务
* @param userId
* @return
*/
@Operation(summary = "建立SSE链接")
@GetMapping(path = "/connect",produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
@Parameter(name = "msg", description = "消息正文", required = true, in = ParameterIn.QUERY)
public SseEmitter connect(@RequestParam String userId) {
return SSEServer.connect(userId);
}
}
开发基于SSE的H5客户端
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>SSE 客户端示例</title>
</head>
<body>
<h1>SSE 客户端示例1</h1>
<div id="messages"></div>
</body>
</html>
<script>
let source = null;
let userId = Math.random().toString(36).substr(2, 9); // 生成一个随机字符串作为 userId
if (window.EventSource) {
// 创建一个EventSource对象,建立SSE连接
source = new EventSource('http://localhost:8080/sse/connect?userId=' + userId);
// 建立连接成功,则会触发open事件
source.addEventListener('open', function(e) {
console.log('建立连接成功...');
var text = document.getElementById('messages').innerHTML;
text += '<br> 建立连接成功...';
document.getElementById('messages').innerHTML = text;
}, false);
// 客户端接收到服务器发来的消息时,会触发message事件
source.addEventListener('message', function(e) {
var msg = e.data;
var text = document.getElementById('messages').innerHTML;
text += '<br>' + msg;
document.getElementById('messages').innerHTML = text;
}, false);
// add 事件,stream 流式推送
source.addEventListener('add', function(e) {
var msg = e.data;
var text = document.getElementById('messages').innerHTML;
text += '<br>' + msg;
document.getElementById('messages').innerHTML = text;
}, false);
// 用户自定义事件
source.addEventListener('custom', function(e) {
var msg = e.data;
var text = document.getElementById('messages').innerHTML;
text += '<br>' + "自定义事件:" + msg;
document.getElementById('messages').innerHTML = text;
}, false);
// SSE完成时调用的事件
source.addEventListener('finish', function(e) {
console.log('当前SSE事件推送完毕...');
}, false);
// 连接发生错误时,会触发error事件
source.addEventListener('error', function(e) {
if (e.readyState == EventSource.CLOSED) {
console.log('连接已关闭...');
var text = document.getElementById('messages').innerHTML;
text += '<br> 连接已关闭...';
document.getElementById('messages').innerHTML = text;
} else {
console.log('发生错误...');
}
})
} else {
console.log('您的浏览器不支持SSE');
// closeSSE();
}
function closeSSE() {
source.close();
}
</script>
链接SSE服务发送单条消息
SSEServer
/**
* 给SSE发送单条消息
* @param userId 用户ID
* @param message 消息内容
* @param sseMstType 消息类型
*/
public static void sendMessage(String userId, String message,SSEMstType sseMstType) {
if (CollectionUtils.isEmpty(sseClientMap)){
return;
}
if (sseClientMap.containsKey(userId)){
SseEmitter sseEmitter = sseClientMap.get(userId);
sendEmitterMessage(sseEmitter,userId, message, sseMstType);
}
}
/**
* 使用SSEEmitter发送消息
* @param sseEmitter
* @param userId
* @param message
* @param messageType
*/
public static void sendEmitterMessage(SseEmitter sseEmitter,String userId, Object message,SSEMstType messageType) {
SseEmitter.SseEventBuilder msg = SseEmitter.event()
.id(userId)
.name(messageType.type)
.data(message);
try {
sseEmitter.send(msg);
} catch (IOException e) {
log.error("用户id为{}发送消息时发生异常!", userId, e);
sseClientMap.remove(userId);
}
;
}
SSEMstType
package javao.cn.utils;
/**
* @Description 发送SSE的消息类型
* @Author Admin小闫
* @Date 2025/3/13
*/
public enum SSEMstType {
MESSAGE("message","单次发送的普通消息!"),
ADD("add","消息追加,用于流试stream推送"),
FINISH("finish","消息完成"),
DONE("done","消息完成");
public final String type;
public final String value;
SSEMstType(String type, String value) {
this.type = type;
this.value = value;
}
}
SSEController
package javao.cn.controller;
import com.github.xiaoymin.knife4j.annotations.ApiSupport;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.enums.ParameterIn;
import io.swagger.v3.oas.annotations.tags.Tag;
import javao.cn.utils.SSEMstType;
import javao.cn.utils.SSEServer;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.awt.*;
/**
* @Description
* @Author Admin小闫
* @Date 2025/3/13
*/
@Tag(name = "SSE")
@ApiSupport(author = "Admin小闫", order = 1)
@RestController
@RequestMapping("/sse")
public class SSEController {
/**
* 链接SSE服务
* @param userId
* @return
*/
@Operation(summary = "建立SSE链接")
@GetMapping(path = "/connect",produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
@Parameter(name = "msg", description = "消息正文", required = true, in = ParameterIn.QUERY)
public SseEmitter connect(@RequestParam String userId) {
return SSEServer.connect(userId);
}
@Operation(summary = "发送单条消息到SSE客户端")
@GetMapping("/sendMessage")
@Parameter(name = "msg", description = "消息正文", required = true, in = ParameterIn.QUERY)
@Parameter(name = "userId", description = "用户ID", required = true, in = ParameterIn.QUERY)
public String asyncChat(@RequestParam String userId, @RequestParam String msg) {
SSEServer.sendMessage(userId,msg, SSEMstType.MESSAGE);
return "OK";
}
}
跨域问题
CorsConfig
package javao.cn.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
/**
* @Description
* @Author Admin小闫
* @Date 2025/3/13
*/
@Configuration
public class CorsConfig implements WebMvcConfigurer {
// @Value("${website.domain}")
// private String domain;
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**")
// .allowedOrigins(domain)
.allowedOriginPatterns("*")
.allowedMethods("*")
.allowedHeaders("*")
.allowCredentials(true)
.maxAge(60 * 60);
}
}
SSE服务推送消息给所有客户端
SSEController
@Operation(summary = "给所有用户发送消息到客户端")
@GetMapping("/sendMessageAllUsers")
@Parameter(name = "msg", description = "消息正文", required = true, in = ParameterIn.QUERY)
public String sendMessageAllUsers( @RequestParam String msg) {
SSEServer.sendMessageAllUsers(msg);
return "OK";
}
SSEServer
/**
* 给所有用户发送消息
* @param message 消息内容
*/
public static void sendMessageAllUsers(String message) {
if (CollectionUtils.isEmpty(sseClientMap)){
return;
}
sseClientMap.forEach((userId, sseEmitter) -> {
sendEmitterMessage(sseEmitter,userId, message, SSEMstType.MESSAGE);
});
}
ADD事件批量消息流式推送与自动重连机制
@Operation(summary = "发送单条消息到SSE客户端:Add事件")
@GetMapping("/sendMessageAdd")
@Parameter(name = "msg", description = "消息正文", required = true, in = ParameterIn.QUERY)
@Parameter(name = "userId", description = "用户ID", required = true, in = ParameterIn.QUERY)
public String sendMessageAdd(@RequestParam String userId, @RequestParam String msg) throws InterruptedException {
for (int i = 0; i < 10; i++) {
Thread.sleep(2000);
SSEServer.sendMessage(userId, msg + "_" + i, SSEMstType.ADD);
}
SSEServer.sendMessage(userId, msg, SSEMstType.ADD);
return "OK";
}
主动断开与服务链接
优势与特性,相对于WebSocket与Netty相比,当链接断开时可以主动的进行链接重连。
/**
* 主动切断用户与服务的链接
* @param userId 用户ID
*/
public static void stopService(String userId) {
if (CollectionUtils.isEmpty(sseClientMap)){
return;
}
SseEmitter sseEmitter = sseClientMap.get(userId);
if (sseEmitter!=null){
//表示执行完毕断开链接
sseEmitter.complete();
log.info("用户id为{}主动断开链接!", userId);
}else {
log.warn("用户id为{}不存在!", userId);
}
}
@Operation(summary = "主动断开链接")
@GetMapping("/stop")
@Parameter(name = "userId", description = "用户ID", required = true, in = ParameterIn.QUERY)
public String sendMessageAdd(@RequestParam String userId) {
SSEServer.stopService(userId);
return "OK";
}
SSE服务同时在线人数统计
最后整合的完整代码
package javao.cn.utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
/**
* @ClassName SSEServer
* @Author 闫鹏
* @Version 1.0
* @Description SSEServer
**/
public class SSEServer {
private static final Logger log = LoggerFactory.getLogger(SSEServer.class);
/**
* 使用map对象,关联用户id和sse的服务连接
* 进阶提问1:SseEmitter 能不能放在Redis中和userId进行关联?
* 进阶提问2:SseEmitter 如何在集群SpringBoot中存在
*/
private static Map<String, SseEmitter> sseClients = new ConcurrentHashMap<>();
/**
* 用于统计当前总在线人数
*/
private static AtomicInteger onlineCounts = new AtomicInteger(0);
public static SseEmitter connect(String userId) {
// 设置超时时间,0代表永不过期;默认30秒,超时未完成任务则会抛出异常
SseEmitter sseEmitter = new SseEmitter(0L);
// 注册SSE的回调方法
sseEmitter.onCompletion(completionCallback(userId));
sseEmitter.onError(errorCallback(userId));
sseEmitter.onTimeout(timeoutCallback(userId));
sseClients.put(userId, sseEmitter);
log.info("当前创建新的SSE连接,用户ID为: {}", userId);
onlineCounts.getAndIncrement();
return sseEmitter;
}
/**
* @Description: 发送单条消息
* @Author 闫鹏
* @param userId
* @param message
* @param msgType
*/
public static void sendMessage(String userId, String message, SSEMsgType msgType) {
if (CollectionUtils.isEmpty(sseClients)) {
return;
}
if (sseClients.containsKey(userId)) {
SseEmitter sseEmitter = sseClients.get(userId);
sendEmitterMessage(sseEmitter, userId, message, msgType);
}
}
/**
* @Description: 发送消息给所有人
* @Author 闫鹏
* @param message
*/
public static void sendMessageToAllUsers(String message) {
if (CollectionUtils.isEmpty(sseClients)) {
return;
}
sseClients.forEach((userId, sseEmitter) -> {
sendEmitterMessage(sseEmitter, userId, message, SSEMsgType.MESSAGE);
}
);
}
/**
* @Description: 使用SseEmitter推送消息
* @Author 闫鹏
* @param sseEmitter
* @param userId
* @param message
* @param msgType
*/
public static void sendEmitterMessage(SseEmitter sseEmitter,
String userId,
String message,
SSEMsgType msgType) {
try {
SseEmitter.SseEventBuilder msg = SseEmitter.event()
.id(userId)
.name(msgType.type)
.data(message);
sseEmitter.send(msg);
} catch (IOException e) {
log.error("用户[{}]的消息推送发生异常!", userId);
removeConnection(userId);
}
}
/**
* @Description: 主动切断,停止sse服务和客户端的连接
* @Author 闫鹏
* @param userId
*/
public static void stopServer(String userId) {
if (CollectionUtils.isEmpty(sseClients)) {
return;
}
SseEmitter sseEmitter = sseClients.get(userId);
if (sseEmitter != null) {
// complete 表示执行完毕,断开连接
sseEmitter.complete();
removeConnection(userId);
log.info("连接关闭成功,被关闭的用户为 {}", userId);
} else {
log.warn("当前连接无需关闭,请不要重复操作");
}
}
/**
* @Description: SSE连接完成后的回调方法(关闭连接的时候调用)
* @Author 闫鹏
* @param userId
* @return Runnable
*/
private static Runnable completionCallback(String userId) {
return () -> {
log.info("SSE连接完成并结束,用户ID为: {}", userId);
removeConnection(userId);
};
}
/**
* @Description: SSE连接超时的时候进行调用
* @Author 闫鹏
* @param userId
* @return Runnable
*/
private static Runnable timeoutCallback(String userId) {
return () -> {
log.info("SSE连接超时,用户ID为: {}", userId);
removeConnection(userId);
};
}
/**
* @Description: SSE连接发生错误的时候进行调用
* @Author 闫鹏
* @param userId
* @return Runnable
*/
private static Consumer<Throwable> errorCallback(String userId) {
return Throwable -> {
log.info("SSE连接发生错误,用户ID为: {}", userId);
removeConnection(userId);
};
}
/**
* @Description: 从整个SSE服务中移除用户连接
* @Author 闫鹏
* @param userId
*/
public static void removeConnection(String userId) {
sseClients.remove(userId);
log.info("SSE连接被移除,移除的用户ID为: {}", userId);
onlineCounts.getAndDecrement();
}
/**
* @Description: 获得当前所有的会话总连接数(在线人数)
* @Author 闫鹏
* @param
* @return int
*/
public static int getOnlineCounts() {
return onlineCounts.intValue();
}
}
