WikiWiki
首页
Java开发
Java面试
Linux手册
  • AI相关
  • Python Flask
  • Pytorch
  • youlo8
SEO
uniapp小程序
Vue前端
work
数据库
软件设计师
入门指南
首页
Java开发
Java面试
Linux手册
  • AI相关
  • Python Flask
  • Pytorch
  • youlo8
SEO
uniapp小程序
Vue前端
work
数据库
软件设计师
入门指南
  • API工具笔记
  • Java基础
  • SpringAI开发问答系统
  • 前后端分离架构
  • 工作流Activity7
  • 微服务架构

DeepSeek+SpringAI

image-20250311185648821

image-20250311185745929

核心技术业务功能

  • 本地部署大模型DeepSeek
  • Ollama脚本Api使用、角色定制
  • SpringBoot +Spring AI
  • 代码调用本地模型编程
  • 模型会话数据记录持久化

DeepSeek

image-20250311192203814

DeepSeek | 深度求索

DeepSeek-R1 发布,性能对标 OpenAI o1 正式版 | DeepSeek API Docs

img

V3 论文:https://github.com/deepseek-ai/DeepSeek-V3/blob/main/DeepSeek_V3.pdf

一、Ollama与DeepSeek

1.Ollama部署DeepSeek

Ollama

  • 运行模型
ollama run deepseek-r1:8b
  • 查看模型列表
ollama list
  • 设置ollama 下载模型到其他盘符

  • 安装时使用命令行指定位置

OllamaSetup.exe /DIR=G:\ollama
  • 系统环境变量添加参数
OLLAMA_MODELS
G:\ollama\models

Path中添加: G:\ollama
然后重启电脑
  • Ollama 常用命令

    1. 启动Ollama服务
    ollama serve
    
    2. 从模型文件创建模型
    ollama create
    
    3. 显示模型信息
    ollama show
    
    4. 运行模型
    ollama run 模型名称
    
    5. 从注册表中拉去模型
    ollama pull 模型名称
    
    6. 将模型推送到注册表
    ollama push
    
    7. 列出模型
    ollama list
    
    8. 复制模型
    ollama cp
    
    9. 删除模型
    ollama rm 模型名称
    
    10. 获取有关Ollama任何命令的帮助信息
    ollama help
    

2. **PageAssist **

  • 安装插件:必须谷歌浏览器使用魔法

    image-20250311204532236

image-20250311205101770

3.Chatbox AI

Ollma设置:http://127.0.0.1:11434

  • Install Windows(PC) | Chatbox

image-20250311205308037

image-20250311210021849

4.CherryStudio集成硅基流动满血DeepSeek Api

cherry-ai.com

线上方案

硅基流动统一登录

image-20250311210705504

二、Java环境创建(忽略)

1.安装JDK21与Maven

参考(如果已配置可以忽略,非初学者都不用)

[JDK21下载+安装+环境配置教程(Windows系统)-CSDN博客](https://blog.csdn.net/m0_69750058/article/details/131628553#:~:text=前言:甲骨文公司与2023.9发布JDK21%2CJDK21将是一个 长期支持 (LTS)版本,JDK20目前可以从官网下载使用。 1,搜索 Oracle 官网找到JDK21 Java,Downloads | Oracle 2,切换 Windows 系统,然后点击下载(初学者下载X64 Installer就可) 3,下载完成后点击安装向导(本贴选择放在D盘目录下))

Maven安装与配置(详细步骤)_maven安装及配置教程-CSDN博客

2.使用IDEA配置基础项目环境

Springboot3.4.2

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.3.8</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>javao.cn</groupId>
    <artifactId>deepseek-doctor</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>deepseek-doctor</name>
    <description>deepseek-doctor</description>
    <url/>
    <licenses>
        <license/>
    </licenses>
    <developers>
        <developer/>
    </developers>
    <scm>
        <connection/>
        <developerConnection/>
        <tag/>
        <url/>
    </scm>
    <properties>
        <java.version>21</java.version>
    </properties>
    <dependencies>
<!--        核心-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter-test</artifactId>
            <version>3.0.4</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
<!--        aop切面-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>
<!--        配置文件相关-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
        </dependency>


        <!--        <dependency>-->
        <!--            <groupId>org.springframework.boot</groupId>-->
        <!--            <artifactId>spring-boot-starter-jdbc</artifactId>-->
        <!--        </dependency>-->
<!--        <dependency>-->
<!--            <groupId>org.mybatis.spring.boot</groupId>-->
<!--            <artifactId>mybatis-spring-boot-starter</artifactId>-->
<!--            <version>3.0.4</version>-->
<!--        </dependency>-->

<!--        <dependency>-->
<!--            <groupId>com.mysql</groupId>-->
<!--            <artifactId>mysql-connector-j</artifactId>-->
<!--            <scope>runtime</scope>-->
<!--        </dependency>-->

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

三、集成DeepSeek

1.调用ollama API

Ollma官方文档:文档 -- Ollama 中文文档|Ollama官方文档

2.SpringAI

官网文档:Spring AI 中文文档 _ Spring官方文档中文翻译 _ Spring AI中文翻译|Spring 教程 —— CADN开发者文档中心

SpringAI集成ollama实现deepseek同步调用 Spring boot版本配合3.3.8

<!--        引入SpringAI-->
<dependency>
    <groupId>io.springboot.ai</groupId>
    <artifactId>spring-ai-ollama</artifactId>
    <version>1.0.3</version>
</dependency>
<dependency>
    <groupId>io.springboot.ai</groupId>
    <artifactId>spring-ai-ollama-spring-boot-starter</artifactId>
    <version>1.0.3</version>
</dependency>

接口示例

package javao.cn.controller;

import com.github.xiaoymin.knife4j.annotations.ApiSupport;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.enums.ParameterIn;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.annotation.Resource;
import org.springframework.ai.ollama.OllamaChatClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Description Ollama  测试Controller
 * @Author Admin小闫
 * @Date 2025/3/13
 */
@Tag(name = "Ollama")
@ApiSupport(author = "Admin小闫", order = 1)
@RestController
@RequestMapping("/ollama")
public class OllamaController {

    @Resource
    private OllamaChatClient ollamaChatClient;

    @Operation(summary = "测试请求")
    @GetMapping("/ai/chat")
    @Parameter(name = "msg",description = "消息正文",required = true,in= ParameterIn.QUERY)
    public String chat(@RequestParam String msg){
        return ollamaChatClient.call(msg);
    }
}

3.knife4j

官方文档:快速开始 | Knife4j

依赖:

<!--        knife4j 引入-->
<dependency>
    <groupId>com.github.xiaoymin</groupId>
    <artifactId>knife4j-openapi3-jakarta-spring-boot-starter</artifactId>
    <version>4.4.0</version>
</dependency>

配置文件

# springdoc-openapi项目配置
springdoc:
  swagger-ui:
    path: /swagger-ui.html
    tags-sorter: alpha
    operations-sorter: alpha
  api-docs:
    path: /v3/api-docs
  group-configs:
    - group: 'Ollama'
      paths-to-match: '/**'
      packages-to-scan: javao.cn.controller
knife4j:
  # 开启增强配置
  enable: true
  # 开启生产环境屏蔽(如果是生产环境,需要把下面配置设置true)
  #  production: true
  setting:
    language: zh_cn
#    swagger-model-name: 实体类列表
  basic: # 开始授权认证
    enable: true
    username: admin
    password: 123456

配置文件

package javao.cn.config;

import io.swagger.v3.oas.models.Components;
import io.swagger.v3.oas.models.OpenAPI;
import io.swagger.v3.oas.models.info.Contact;
import io.swagger.v3.oas.models.info.Info;
import io.swagger.v3.oas.models.info.License;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/***
 * 创建Swagger配置
 * @since:knife4j-springdoc-openapi-demo 1.0
 * @author <a href="mailto:xiaoymin@foxmail.com">xiaoymin@foxmail.com</a>
 * 2020/03/15 20:40
 */
@Configuration
public class SwaggerConfig {

    @Bean
    public OpenAPI customOpenAPI() {
        Contact contact = new Contact();
        contact.setEmail("yanpeng@niliuapp.work");
        contact.setName("闫鹏");
        contact.setUrl("http://doc.xiaominfo.com");
        return new OpenAPI()
                // 增加swagger授权请求头配置
//                .components(new Components().addSecuritySchemes(CommonConstant.X_ACCESS_TOKEN,
//                        new SecurityScheme().type(SecurityScheme.Type.HTTP).scheme(CommonConstant.X_ACCESS_TOKEN)))
                .info(new Info()
                        .title("ollama 后台服务API接口文档")
                        .version("1.0")
                        .contact(contact)
                        .description("Knife4j集成springdoc-openapi示例")
                        .termsOfService("http://doc.xiaominfo.com")
                        .license(new License().name("Apache 2.0")
                                .url("http://www.apache.org/licenses/LICENSE-2.0.html")));
    }

}

4.流式stream调用Ollama

接口示例

    @Operation(summary = "流试调用DeepSeek")
    @GetMapping("/ai/streamChat")
    @Parameter(name = "msg",description = "消息正文",required = true,in= ParameterIn.QUERY)
    public List<String> streamChat(@RequestParam String msg){
        Prompt prompt = new Prompt(new UserMessage(msg));
        List<String> collect = ollamaChatClient.stream(prompt).toStream().map(chatResponse -> {
            String content = chatResponse.getResult().getOutput().getContent();
            log.info(content);
            return content;
        }).collect(Collectors.toList());
        return collect;
    }

四.SSE服务端推送*

1.原理机制解析

服务端发送事件 Service Sent Event

image-20250313134204582

与WebSocket的主要区别是单向流输出,不可逆。服务的-客户端。

2.SSE构建

package javao.cn.utils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;

/**
 * @Description SSE服务器构建
 * @Author Admin小闫
 * @Date 2025/3/13
 */
public class SSEServer {

    private static final Logger log = LoggerFactory.getLogger(SSEServer.class);
    /**
     * 使用map对象关联用户ID与SSE对象
     */
    private static Map<String, SseEmitter> sseClientMap = new ConcurrentHashMap<>();


    public static SseEmitter connect(String userId) {
        //设置默认超时时间,0代表永不过期;默认超时时间为30秒;
        SseEmitter sseEmitter = new SseEmitter(0L);
        //设置完成回调,注册SSE的回调事件
        sseEmitter.onCompletion(completionCallback(userId));
        sseEmitter.onError(errorCallback(userId));
        sseEmitter.onTimeout(timeoutCallback(userId));
        sseClientMap.put(userId, sseEmitter);
        log.info("当前用户id为{}连接成功!", userId);

        return sseEmitter;
    }

    /**
     * 当用户连接完成并关闭时的回调
     *
     * @param userId 用户ID,用于标识和管理用户连接
     * @return Runnable 一个实现了Runnable接口的匿名内部类实例,用于执行连接完成后的逻辑
     */
    private static Runnable completionCallback(String userId) {
        return () -> {
            // 记录用户连接完成并关闭的信息
            log.info("用户id为{}连接完成并关闭!", userId);
            // 从映射中移除用户的连接
            sseClientMap.remove(userId);
        };
    }

    /**
     * 当用户连接发生错误时的回调
     *
     * @param userId 用户ID,用于标识和管理用户连接
     * @return Runnable 一个实现了Runnable接口的匿名内部类实例,用于执行连接错误时的逻辑
     */
    private static Consumer<Throwable> errorCallback(String userId) {
        return Throwable -> {
            // 记录用户连接错误断开的信息
            log.info("用户id为{}连接发生错误!", userId);
            // 从映射中移除用户的连接
            sseClientMap.remove(userId);
        };
    }

    /**
     * 当用户连接超时时的回调
     *
     * @param userId 用户ID,用于标识和管理用户连接
     * @return Runnable 一个实现了Runnable接口的匿名内部类实例,用于执行连接超时时的逻辑
     */
    private static Runnable timeoutCallback(String userId) {
        return () -> {
            // 记录用户连接超时的信息
            log.info("用户id为{}连接超时!", userId);
            // 从映射中移除用户的连接
            sseClientMap.remove(userId);
        };
    }

}
package javao.cn.controller;

import com.github.xiaoymin.knife4j.annotations.ApiSupport;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.enums.ParameterIn;
import io.swagger.v3.oas.annotations.tags.Tag;
import javao.cn.utils.SSEServer;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.awt.*;

/**
 * @Description
 * @Author Admin小闫
 * @Date 2025/3/13
 */
@Tag(name = "SSE")
@ApiSupport(author = "Admin小闫", order = 1)
@RestController
@RequestMapping("/sse")
public class SSEController {

    /**
     * 链接SSE服务
     * @param userId
     * @return
     */
    @Operation(summary = "建立SSE链接")
    @GetMapping(path = "/connect",produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
    @Parameter(name = "msg", description = "消息正文", required = true, in = ParameterIn.QUERY)
    public SseEmitter connect(@RequestParam String userId) {
        return SSEServer.connect(userId);
    }
}

开发基于SSE的H5客户端

<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>SSE 客户端示例</title>
</head>
<body>
    <h1>SSE 客户端示例1</h1>
    <div id="messages"></div>

</body>
</html>

<script>

    let source = null;
    let userId = Math.random().toString(36).substr(2, 9); // 生成一个随机字符串作为 userId
    if (window.EventSource) {
        // 创建一个EventSource对象,建立SSE连接
        source = new EventSource('http://localhost:8080/sse/connect?userId=' + userId);

        // 建立连接成功,则会触发open事件
        source.addEventListener('open', function(e) {
            console.log('建立连接成功...');

            var text = document.getElementById('messages').innerHTML;
            text += '<br> 建立连接成功...';
            document.getElementById('messages').innerHTML = text;

        }, false);

        // 客户端接收到服务器发来的消息时,会触发message事件
        source.addEventListener('message', function(e) {
            var msg = e.data;

            var text = document.getElementById('messages').innerHTML;
            text += '<br>' + msg;
            document.getElementById('messages').innerHTML = text;

        }, false);

        // add 事件,stream 流式推送
        source.addEventListener('add', function(e) {
            var msg = e.data;

            var text = document.getElementById('messages').innerHTML;
            text += '<br>' + msg;
            document.getElementById('messages').innerHTML = text;

        }, false);

        // 用户自定义事件
        source.addEventListener('custom', function(e) {
            var msg = e.data;

            var text = document.getElementById('messages').innerHTML;
            text += '<br>' + "自定义事件:" + msg;
            document.getElementById('messages').innerHTML = text;

        }, false);

        // SSE完成时调用的事件
        source.addEventListener('finish', function(e) {
            console.log('当前SSE事件推送完毕...');
        }, false);

        // 连接发生错误时,会触发error事件
        source.addEventListener('error', function(e) {
            if (e.readyState == EventSource.CLOSED) {
                console.log('连接已关闭...');

                var text = document.getElementById('messages').innerHTML;
                text += '<br> 连接已关闭...';
                document.getElementById('messages').innerHTML = text;

            } else {
                console.log('发生错误...');
            }
        })

    } else {
        console.log('您的浏览器不支持SSE');
        // closeSSE();
    }

    function closeSSE() {
        source.close();
    }

</script>

链接SSE服务发送单条消息

SSEServer


    /**
     * 给SSE发送单条消息
     * @param userId 用户ID
     * @param message 消息内容
     * @param sseMstType 消息类型
     */
    public static void sendMessage(String userId, String message,SSEMstType sseMstType) {
        if (CollectionUtils.isEmpty(sseClientMap)){
            return;
        }

        if (sseClientMap.containsKey(userId)){
            SseEmitter sseEmitter = sseClientMap.get(userId);
            sendEmitterMessage(sseEmitter,userId, message, sseMstType);
        }
    }



    /**
     * 使用SSEEmitter发送消息
     * @param sseEmitter
     * @param userId
     * @param message
     * @param messageType
     */
    public static void sendEmitterMessage(SseEmitter sseEmitter,String userId, Object message,SSEMstType messageType) {
       SseEmitter.SseEventBuilder msg = SseEmitter.event()
                .id(userId)
                .name(messageType.type)
                .data(message);

        try {
            sseEmitter.send(msg);
        } catch (IOException e) {
            log.error("用户id为{}发送消息时发生异常!", userId, e);
            sseClientMap.remove(userId);
        }
        ;
    }

SSEMstType

package javao.cn.utils;

/**
 * @Description 发送SSE的消息类型
 * @Author Admin小闫
 * @Date 2025/3/13
 */
public enum SSEMstType {
    MESSAGE("message","单次发送的普通消息!"),
    ADD("add","消息追加,用于流试stream推送"),
    FINISH("finish","消息完成"),
    DONE("done","消息完成");

    public final String type;
    public final String value;

    SSEMstType(String type, String value) {
        this.type = type;
        this.value = value;
    }

}

SSEController

package javao.cn.controller;

import com.github.xiaoymin.knife4j.annotations.ApiSupport;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.enums.ParameterIn;
import io.swagger.v3.oas.annotations.tags.Tag;
import javao.cn.utils.SSEMstType;
import javao.cn.utils.SSEServer;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.awt.*;

/**
 * @Description
 * @Author Admin小闫
 * @Date 2025/3/13
 */
@Tag(name = "SSE")
@ApiSupport(author = "Admin小闫", order = 1)
@RestController
@RequestMapping("/sse")
public class SSEController {

    /**
     * 链接SSE服务
     * @param userId
     * @return
     */
    @Operation(summary = "建立SSE链接")
    @GetMapping(path = "/connect",produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
    @Parameter(name = "msg", description = "消息正文", required = true, in = ParameterIn.QUERY)
    public SseEmitter connect(@RequestParam String userId) {
        return SSEServer.connect(userId);
    }


    @Operation(summary = "发送单条消息到SSE客户端")
    @GetMapping("/sendMessage")
    @Parameter(name = "msg", description = "消息正文", required = true, in = ParameterIn.QUERY)
    @Parameter(name = "userId", description = "用户ID", required = true, in = ParameterIn.QUERY)
    public String asyncChat(@RequestParam String userId, @RequestParam String msg) {
        SSEServer.sendMessage(userId,msg, SSEMstType.MESSAGE);
        return "OK";
    }

}

跨域问题

CorsConfig

package javao.cn.config;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

/**
 * @Description
 * @Author Admin小闫
 * @Date 2025/3/13
 */
@Configuration
public class CorsConfig implements WebMvcConfigurer {

//    @Value("${website.domain}")
//    private String domain;

    @Override
    public void addCorsMappings(CorsRegistry registry) {
        registry.addMapping("/**")
//                .allowedOrigins(domain)
                .allowedOriginPatterns("*")
                .allowedMethods("*")
                .allowedHeaders("*")
                .allowCredentials(true)
                .maxAge(60 * 60);
    }
}

SSE服务推送消息给所有客户端

SSEController

@Operation(summary = "给所有用户发送消息到客户端")
@GetMapping("/sendMessageAllUsers")
@Parameter(name = "msg", description = "消息正文", required = true, in = ParameterIn.QUERY)
public String sendMessageAllUsers( @RequestParam String msg) {
    SSEServer.sendMessageAllUsers(msg);
    return "OK";
}

SSEServer

    /**
     * 给所有用户发送消息
     * @param message 消息内容
     */
    public static void sendMessageAllUsers(String message) {
        if (CollectionUtils.isEmpty(sseClientMap)){
            return;
        }
        sseClientMap.forEach((userId, sseEmitter) -> {
            sendEmitterMessage(sseEmitter,userId, message, SSEMstType.MESSAGE);
        });
    }

ADD事件批量消息流式推送与自动重连机制

    @Operation(summary = "发送单条消息到SSE客户端:Add事件")
    @GetMapping("/sendMessageAdd")
    @Parameter(name = "msg", description = "消息正文", required = true, in = ParameterIn.QUERY)
    @Parameter(name = "userId", description = "用户ID", required = true, in = ParameterIn.QUERY)
    public String sendMessageAdd(@RequestParam String userId, @RequestParam String msg) throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            Thread.sleep(2000);
            SSEServer.sendMessage(userId, msg + "_" + i, SSEMstType.ADD);
        }
        SSEServer.sendMessage(userId, msg, SSEMstType.ADD);
        return "OK";
    }

主动断开与服务链接

优势与特性,相对于WebSocket与Netty相比,当链接断开时可以主动的进行链接重连。

    /**
     * 主动切断用户与服务的链接
     * @param userId 用户ID
     */
    public static void stopService(String userId) {
        if (CollectionUtils.isEmpty(sseClientMap)){
            return;
        }
        SseEmitter sseEmitter = sseClientMap.get(userId);
        if (sseEmitter!=null){
            //表示执行完毕断开链接
            sseEmitter.complete();
            log.info("用户id为{}主动断开链接!", userId);
        }else {
            log.warn("用户id为{}不存在!", userId);
        }
    }
    @Operation(summary = "主动断开链接")
    @GetMapping("/stop")
    @Parameter(name = "userId", description = "用户ID", required = true, in = ParameterIn.QUERY)
    public String sendMessageAdd(@RequestParam String userId) {
        SSEServer.stopService(userId);
        return "OK";
    }

SSE服务同时在线人数统计

最后整合的完整代码

package javao.cn.utils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

/**
 * @ClassName SSEServer
 * @Author 闫鹏
 * @Version 1.0
 * @Description SSEServer
 **/
public class SSEServer {

    private static final Logger log = LoggerFactory.getLogger(SSEServer.class);
    /**
     * 使用map对象,关联用户id和sse的服务连接
     * 进阶提问1:SseEmitter 能不能放在Redis中和userId进行关联?
     * 进阶提问2:SseEmitter 如何在集群SpringBoot中存在
     */
    private static Map<String, SseEmitter> sseClients = new ConcurrentHashMap<>();

    /**
     * 用于统计当前总在线人数
     */
    private static AtomicInteger onlineCounts = new AtomicInteger(0);

    public static SseEmitter connect(String userId) {
        // 设置超时时间,0代表永不过期;默认30秒,超时未完成任务则会抛出异常
        SseEmitter sseEmitter = new SseEmitter(0L);

        // 注册SSE的回调方法
        sseEmitter.onCompletion(completionCallback(userId));
        sseEmitter.onError(errorCallback(userId));
        sseEmitter.onTimeout(timeoutCallback(userId));
        sseClients.put(userId, sseEmitter);
        log.info("当前创建新的SSE连接,用户ID为: {}", userId);
        onlineCounts.getAndIncrement();

        return sseEmitter;
    }

    /**
     * @Description: 发送单条消息
     * @Author 闫鹏
     * @param userId
     * @param message
     * @param msgType
     */
    public static void sendMessage(String userId, String message, SSEMsgType msgType) {
        if (CollectionUtils.isEmpty(sseClients)) {
            return;
        }

        if (sseClients.containsKey(userId)) {
            SseEmitter sseEmitter = sseClients.get(userId);
            sendEmitterMessage(sseEmitter, userId, message, msgType);
        }
    }

    /**
     * @Description: 发送消息给所有人
     * @Author 闫鹏
     * @param message
     */
    public static void sendMessageToAllUsers(String message) {
        if (CollectionUtils.isEmpty(sseClients)) {
            return;
        }

        sseClients.forEach((userId, sseEmitter) -> {
                sendEmitterMessage(sseEmitter, userId, message, SSEMsgType.MESSAGE);
            }
        );
    }

    /**
     * @Description: 使用SseEmitter推送消息
     * @Author 闫鹏
     * @param sseEmitter
     * @param userId
     * @param message
     * @param msgType
     */
    public static void sendEmitterMessage(SseEmitter sseEmitter,
                                           String userId,
                                           String message,
                                           SSEMsgType msgType) {

        try {
            SseEmitter.SseEventBuilder msg = SseEmitter.event()
                    .id(userId)
                    .name(msgType.type)
                    .data(message);
            sseEmitter.send(msg);
        } catch (IOException e) {
            log.error("用户[{}]的消息推送发生异常!", userId);
            removeConnection(userId);
        }

    }

    /**
     * @Description: 主动切断,停止sse服务和客户端的连接
     * @Author 闫鹏
     * @param userId
     */
    public static void stopServer(String userId) {
        if (CollectionUtils.isEmpty(sseClients)) {
            return;
        }

        SseEmitter sseEmitter = sseClients.get(userId);
        if (sseEmitter != null) {
            // complete 表示执行完毕,断开连接
            sseEmitter.complete();
            removeConnection(userId);
            log.info("连接关闭成功,被关闭的用户为 {}", userId);
        } else {
            log.warn("当前连接无需关闭,请不要重复操作");
        }

    }

    /**
     * @Description: SSE连接完成后的回调方法(关闭连接的时候调用)
     * @Author 闫鹏
     * @param userId
     * @return Runnable
     */
    private static Runnable completionCallback(String userId) {
        return () -> {
            log.info("SSE连接完成并结束,用户ID为: {}", userId);
            removeConnection(userId);
        };
    }
    /**
     * @Description: SSE连接超时的时候进行调用
     * @Author 闫鹏
     * @param userId
     * @return Runnable
     */
    private static Runnable timeoutCallback(String userId) {
        return () -> {
            log.info("SSE连接超时,用户ID为: {}", userId);
            removeConnection(userId);
        };
    }
    /**
     * @Description: SSE连接发生错误的时候进行调用
     * @Author 闫鹏
     * @param userId
     * @return Runnable
     */
    private static Consumer<Throwable> errorCallback(String userId) {
        return Throwable -> {
            log.info("SSE连接发生错误,用户ID为: {}", userId);
            removeConnection(userId);
        };
    }

    /**
     * @Description: 从整个SSE服务中移除用户连接
     * @Author 闫鹏
     * @param userId
     */
    public static void removeConnection(String userId) {
        sseClients.remove(userId);
        log.info("SSE连接被移除,移除的用户ID为: {}", userId);

        onlineCounts.getAndDecrement();
    }

    /**
     * @Description: 获得当前所有的会话总连接数(在线人数)
     * @Author 闫鹏
     * @param
     * @return int
     */
    public static int getOnlineCounts() {
        return onlineCounts.intValue();
    }

}

ollama定制deepseek角色的提示词引擎

最近更新:: 2025/8/22 15:05
Contributors: yanpeng_
Prev
Java基础
Next
前后端分离架构