|
@@ -1,6 +1,10 @@
|
|
|
package com.ruoyi.demo.controller;
|
|
|
|
|
|
+import cn.dev33.satoken.annotation.SaIgnore;
|
|
|
+import cn.hutool.core.collection.CollUtil;
|
|
|
+import cn.hutool.core.util.PageUtil;
|
|
|
import cn.hutool.json.JSONString;
|
|
|
+import cn.hutool.json.JSONUtil;
|
|
|
import com.ruoyi.common.annotation.Log;
|
|
|
import com.ruoyi.common.core.domain.R;
|
|
|
import com.ruoyi.common.helper.LoginHelper;
|
|
@@ -17,6 +21,7 @@ import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.data.geo.Point;
|
|
|
import org.springframework.data.redis.core.RedisTemplate;
|
|
|
import org.springframework.http.MediaType;
|
|
|
+import org.springframework.web.bind.annotation.GetMapping;
|
|
|
import org.springframework.web.bind.annotation.RequestMapping;
|
|
|
import org.springframework.web.bind.annotation.RestController;
|
|
|
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
|
@@ -45,11 +50,6 @@ public class TopologicalCommonController {
|
|
|
@Autowired
|
|
|
RedisTemplate redisTemplate;
|
|
|
|
|
|
- /**
|
|
|
- * SSE连接传输
|
|
|
- */
|
|
|
- private static Map<Long, SseEmitter> sseCache = new ConcurrentHashMap<>();
|
|
|
-
|
|
|
/**
|
|
|
* 搜索栏——前缀查询
|
|
|
* @param searchText 搜索关键字
|
|
@@ -81,14 +81,15 @@ public class TopologicalCommonController {
|
|
|
* @return
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- @RequestMapping(path = "/map",produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
|
|
|
+ @SaIgnore
|
|
|
+ @GetMapping(path = "/map",produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
|
|
|
public SseEmitter map(TopologicalWdAceeptVo topologicalWdAceeptVo) throws IOException {
|
|
|
String md5 = topologicalWdAceeptVo.getHash();
|
|
|
|
|
|
//2.查看redis中是否存在有缓存
|
|
|
List<PointBo> wdCount = (List<PointBo>) redisTemplate.boundHashOps(RedisContant.TOPOLOGICAL_COMMON_MAP).get(md5);
|
|
|
if (wdCount != null) {
|
|
|
- SseEmitter sseEmitter = getSseEmitter(LoginHelper.getUserId());
|
|
|
+ SseEmitter sseEmitter = getSseEmitter();
|
|
|
sendFragment(sseEmitter,wdCount);
|
|
|
return sseEmitter;
|
|
|
}
|
|
@@ -96,31 +97,21 @@ public class TopologicalCommonController {
|
|
|
List<PointBo> map = topologicalCommonService.map(topologicalWdAceeptVo);
|
|
|
redisTemplate.boundHashOps(RedisContant.TOPOLOGICAL_COMMON_MAP).put(md5,map);
|
|
|
redisTemplate.expire(RedisContant.TOPOLOGICAL_COMMON_MAP,RedisContant.TOPOLOGICAL_COMMON_MAP_TIME, TimeUnit.MINUTES); //30分钟
|
|
|
- SseEmitter sseEmitter = getSseEmitter(LoginHelper.getUserId());
|
|
|
+ SseEmitter sseEmitter = getSseEmitter();
|
|
|
sendFragment(sseEmitter,map);
|
|
|
return sseEmitter;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* SSE连接:获取连接实体类SseEmitter
|
|
|
- * @param uId 用户Id
|
|
|
* @return
|
|
|
*/
|
|
|
- public SseEmitter getSseEmitter(Long uId){
|
|
|
- SseEmitter sseEmitter = sseCache.get(uId);
|
|
|
- if (sseEmitter != null) {
|
|
|
- return sseEmitter;
|
|
|
- }
|
|
|
- // 超时时间设置为3s,用于演示客户端自动重连
|
|
|
- // 设置前端的重试时间为1s
|
|
|
- SseEmitter sseEmitter1 = new SseEmitter(30000L);
|
|
|
- sseCache.put(uId, sseEmitter1);
|
|
|
- sseEmitter1.onTimeout(() -> {
|
|
|
- log.info(uId + "超时");
|
|
|
- sseCache.remove(uId);
|
|
|
- });
|
|
|
- //sseEmitter.onCompletion(() -> System.out.println("完成!!!"));
|
|
|
- return sseEmitter1;
|
|
|
+ public SseEmitter getSseEmitter(){
|
|
|
+ SseEmitter sseEmitter = new SseEmitter();
|
|
|
+ sseEmitter.onTimeout(() -> log.info("SSE推送超时"));
|
|
|
+ sseEmitter.onCompletion(() -> log.info("SSE推送完成"));
|
|
|
+ sseEmitter.onError((e) -> log.error("SSE推送异常",e));
|
|
|
+ return sseEmitter;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -129,20 +120,22 @@ public class TopologicalCommonController {
|
|
|
* @param wdCount 点位信息
|
|
|
*/
|
|
|
public void sendFragment(SseEmitter sseEmitter,List<PointBo> wdCount){
|
|
|
- int size = 1000;
|
|
|
- int pages = wdCount.size()%size == 0 ? wdCount.size()/size:wdCount.size()/size+1;
|
|
|
- int p1 = 0,p2 = size;
|
|
|
- for (int i=0;i<pages;i++){
|
|
|
- if (p2 > wdCount.size())
|
|
|
- p2 = wdCount.size();
|
|
|
+ int size = 500;
|
|
|
+ int pages = PageUtil.totalPage(wdCount.size(), size);
|
|
|
+ for (int i = 0; i < pages; i++) {
|
|
|
try {
|
|
|
- sseEmitter.send(SseEmitter.event().name("map").data(JsonUtils.toJsonString(wdCount.subList(p1,p2))));
|
|
|
- p1 = p2;
|
|
|
- p2+=size;
|
|
|
+ List<PointBo> boList = CollUtil.page(i + 1, size, wdCount);
|
|
|
+ if (!boList.isEmpty()) sseEmitter.send(SseEmitter.event().name("message").data(JSONUtil.toJsonStr(boList)));
|
|
|
} catch (IOException e) {
|
|
|
- throw new RuntimeException(e);
|
|
|
+ sseEmitter.completeWithError(e);
|
|
|
}
|
|
|
}
|
|
|
+ try {
|
|
|
+ sseEmitter.send(SseEmitter.event().name("finish").data("数据推送完成"));
|
|
|
+ } catch (IOException e) {
|
|
|
+ sseEmitter.completeWithError(e);
|
|
|
+ }
|
|
|
+ sseEmitter.complete();
|
|
|
}
|
|
|
|
|
|
}
|