Browse Source

重写SSE推送数据的代码

dzk 1 year ago
parent
commit
207c679bf8

+ 2 - 2
.run/ruoyi-monitor-admin.run.xml

@@ -1,5 +1,5 @@
 <component name="ProjectRunConfigurationManager">
-  <configuration default="false" name="ruoyi-monitor-admin" type="docker-deploy" factoryName="dockerfile" server-name="lbjzys">
+  <configuration default="false" name="ruoyi-monitor-admin" type="docker-deploy" factoryName="dockerfile">
     <deployment type="dockerfile">
       <settings>
         <option name="imageTag" value="ruoyi/ruoyi-monitor-admin:4.8.0" />
@@ -26,4 +26,4 @@
     </deployment>
     <method v="2" />
   </configuration>
-</component>
+</component>

+ 2 - 2
.run/ruoyi-xxl-job-admin.run.xml

@@ -1,5 +1,5 @@
 <component name="ProjectRunConfigurationManager">
-  <configuration default="false" name="ruoyi-xxl-job-admin" type="docker-deploy" factoryName="dockerfile" server-name="lbjzys">
+  <configuration default="false" name="ruoyi-xxl-job-admin" type="docker-deploy" factoryName="dockerfile">
     <deployment type="dockerfile">
       <settings>
         <option name="imageTag" value="ruoyi/ruoyi-xxl-job-admin:4.8.0" />
@@ -26,4 +26,4 @@
     </deployment>
     <method v="2" />
   </configuration>
-</component>
+</component>

+ 27 - 34
ruoyi-demo/src/main/java/com/ruoyi/demo/controller/TopologicalCommonController.java

@@ -1,6 +1,10 @@
 package com.ruoyi.demo.controller;
 
+import cn.dev33.satoken.annotation.SaIgnore;
+import cn.hutool.core.collection.CollUtil;
+import cn.hutool.core.util.PageUtil;
 import cn.hutool.json.JSONString;
+import cn.hutool.json.JSONUtil;
 import com.ruoyi.common.annotation.Log;
 import com.ruoyi.common.core.domain.R;
 import com.ruoyi.common.helper.LoginHelper;
@@ -17,6 +21,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.data.geo.Point;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.http.MediaType;
+import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
 import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
@@ -45,11 +50,6 @@ public class TopologicalCommonController {
     @Autowired
     RedisTemplate redisTemplate;
 
-    /**
-     * SSE连接传输
-     */
-    private static Map<Long, SseEmitter> sseCache = new ConcurrentHashMap<>();
-
     /**
      * 搜索栏——前缀查询
      * @param searchText 搜索关键字
@@ -81,14 +81,15 @@ public class TopologicalCommonController {
      * @return
      * @throws IOException
      */
-    @RequestMapping(path = "/map",produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
+    @SaIgnore
+    @GetMapping(path = "/map",produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
     public SseEmitter map(TopologicalWdAceeptVo topologicalWdAceeptVo) throws IOException {
         String md5 = topologicalWdAceeptVo.getHash();
 
         //2.查看redis中是否存在有缓存
         List<PointBo> wdCount = (List<PointBo>) redisTemplate.boundHashOps(RedisContant.TOPOLOGICAL_COMMON_MAP).get(md5);
         if (wdCount != null) {
-            SseEmitter sseEmitter = getSseEmitter(LoginHelper.getUserId());
+            SseEmitter sseEmitter = getSseEmitter();
             sendFragment(sseEmitter,wdCount);
             return sseEmitter;
         }
@@ -96,31 +97,21 @@ public class TopologicalCommonController {
         List<PointBo> map = topologicalCommonService.map(topologicalWdAceeptVo);
         redisTemplate.boundHashOps(RedisContant.TOPOLOGICAL_COMMON_MAP).put(md5,map);
         redisTemplate.expire(RedisContant.TOPOLOGICAL_COMMON_MAP,RedisContant.TOPOLOGICAL_COMMON_MAP_TIME, TimeUnit.MINUTES); //30分钟
-        SseEmitter sseEmitter = getSseEmitter(LoginHelper.getUserId());
+        SseEmitter sseEmitter = getSseEmitter();
         sendFragment(sseEmitter,map);
         return sseEmitter;
     }
 
     /**
      * SSE连接:获取连接实体类SseEmitter
-     * @param uId 用户Id
      * @return
      */
-    public SseEmitter getSseEmitter(Long uId){
-        SseEmitter sseEmitter = sseCache.get(uId);
-        if (sseEmitter != null) {
-            return sseEmitter;
-        }
-        // 超时时间设置为3s,用于演示客户端自动重连
-        // 设置前端的重试时间为1s
-        SseEmitter sseEmitter1 = new SseEmitter(30000L);
-        sseCache.put(uId, sseEmitter1);
-        sseEmitter1.onTimeout(() -> {
-            log.info(uId + "超时");
-            sseCache.remove(uId);
-        });
-        //sseEmitter.onCompletion(() -> System.out.println("完成!!!"));
-        return sseEmitter1;
+    public SseEmitter getSseEmitter(){
+        SseEmitter sseEmitter = new SseEmitter();
+        sseEmitter.onTimeout(() -> log.info("SSE推送超时"));
+        sseEmitter.onCompletion(() -> log.info("SSE推送完成"));
+        sseEmitter.onError((e) -> log.error("SSE推送异常",e));
+        return sseEmitter;
     }
 
     /**
@@ -129,20 +120,22 @@ public class TopologicalCommonController {
      * @param wdCount 点位信息
      */
     public void sendFragment(SseEmitter sseEmitter,List<PointBo> wdCount){
-        int size = 1000;
-        int pages = wdCount.size()%size == 0 ? wdCount.size()/size:wdCount.size()/size+1;
-        int p1 = 0,p2 = size;
-        for (int i=0;i<pages;i++){
-            if (p2 > wdCount.size())
-                p2 = wdCount.size();
+        int size = 500;
+        int pages = PageUtil.totalPage(wdCount.size(), size);
+        for (int i = 0; i < pages; i++) {
             try {
-                sseEmitter.send(SseEmitter.event().name("map").data(JsonUtils.toJsonString(wdCount.subList(p1,p2))));
-                p1 = p2;
-                p2+=size;
+                List<PointBo> boList = CollUtil.page(i + 1, size, wdCount);
+                if (!boList.isEmpty()) sseEmitter.send(SseEmitter.event().name("message").data(JSONUtil.toJsonStr(boList)));
             } catch (IOException e) {
-                throw new RuntimeException(e);
+                sseEmitter.completeWithError(e);
             }
         }
+        try {
+            sseEmitter.send(SseEmitter.event().name("finish").data("数据推送完成"));
+        } catch (IOException e) {
+            sseEmitter.completeWithError(e);
+        }
+        sseEmitter.complete();
     }
 
 }