Procházet zdrojové kódy

优化定时任务分析的逻辑,改普通查询为流式查询,避免OOM

dzk před 1 rokem
rodič
revize
e92aa25f9a

+ 2 - 2
pom.xml

@@ -74,7 +74,7 @@
             </properties>
             <activation>
                 <!-- 默认环境 -->
-                <activeByDefault>true</activeByDefault>
+                <activeByDefault>false</activeByDefault>
             </activation>
         </profile>
         <profile>
@@ -96,7 +96,7 @@
             </properties>
             <activation>
                 <!-- 默认环境 -->
-                <activeByDefault>false</activeByDefault>
+                <activeByDefault>true</activeByDefault>
             </activation>
         </profile>
     </profiles>

+ 2 - 0
ruoyi-demo/src/main/java/com/ruoyi/demo/constant/RedisContant.java

@@ -136,6 +136,8 @@ public class RedisContant {
      * 数据定时
      */
     //数据统计-分析网点周边标签后存储前缀
+    public static String WD_TAG_MAP = "wd_tag_map";
+    //数据统计-分析网点周边标签后存储前缀
     public static String WD_TAG = "wd_tag";
 
     /**

+ 3 - 1
ruoyi-demo/src/main/java/com/ruoyi/demo/mapper/WdInfoDao.java

@@ -10,7 +10,7 @@ import com.ruoyi.demo.entity.WdInfo;
 import com.ruoyi.demo.entity.bo.PointBo;
 import org.apache.ibatis.annotations.Mapper;
 import org.apache.ibatis.annotations.Param;
-import org.springframework.data.geo.Point;
+import org.apache.ibatis.cursor.Cursor;
 
 import java.util.List;
 
@@ -21,4 +21,6 @@ public interface WdInfoDao extends BaseMapper<WdInfo> {
     Page<TopologicalHouseWd> selectTopologicalHouseList(Page<WdInfo> page,@Param(Constants.WRAPPER) QueryWrapper<WdInfo> queryWrapper);
 
     List<PointBo> map(@Param(Constants.WRAPPER) QueryWrapper<WdInfo> queryWrapper);
+
+    Cursor<WdInfo> streamQuery(@Param(Constants.WRAPPER)QueryWrapper<WdInfo> queryWrapper);
 }

+ 1 - 3
ruoyi-demo/src/main/java/com/ruoyi/demo/service/impl/ChannelAnalyseServiceImpl.java

@@ -136,9 +136,7 @@ public class ChannelAnalyseServiceImpl implements ChannelAnalyseService {
         List<WdInfo> queryWd = wdInfoDao.selectList(queryWrapper);
 
         //2.得到网点Id
-        List<String> collect = queryWd.stream().map(item -> {
-            return RedisContant.WD_TAG + "_" + item.getWdId();
-        }).collect(Collectors.toList());
+        List<String> collect = queryWd.stream().map(item -> RedisContant.WD_TAG + "_" + item.getWdId()).collect(Collectors.toList());
         queryWd = null;
 
         //3.切割网点分批次获取网点标签

+ 139 - 75
ruoyi-demo/src/main/java/com/ruoyi/demo/utils/WdRedisStoreage.java

@@ -1,23 +1,26 @@
 package com.ruoyi.demo.utils;
 
+import cn.hutool.core.lang.Console;
+import cn.hutool.core.util.StrUtil;
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.ruoyi.common.utils.redis.RedisUtils;
 import com.ruoyi.demo.constant.RedisContant;
 import com.ruoyi.demo.entity.WdInfo;
 import com.ruoyi.demo.mapper.WdInfoDao;
+import org.apache.ibatis.cursor.Cursor;
+import org.apache.ibatis.session.SqlSession;
+import org.apache.ibatis.session.SqlSessionFactory;
+import org.redisson.api.GeoUnit;
+import org.redisson.api.RFuture;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.data.geo.Distance;
-import org.springframework.data.geo.GeoResult;
-import org.springframework.data.geo.GeoResults;
-import org.springframework.data.geo.Point;
-import org.springframework.data.redis.connection.RedisGeoCommands;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.stereotype.Component;
 
-import java.util.ArrayList;
-import java.util.Hashtable;
-import java.util.List;
+import java.io.IOException;
+import java.util.*;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
@@ -29,6 +32,9 @@ public class WdRedisStoreage {
     @Autowired
     public WdInfoDao wdInfoDao;
 
+    @Autowired
+    private SqlSessionFactory sqlSessionFactory;
+
     @Autowired
     @Qualifier("executor")
     private ExecutorService executorService;
@@ -46,55 +52,56 @@ public class WdRedisStoreage {
      * @return
      */
     public  List<String> getWdTag(WdInfo wdInfo){
-        List<String> tag = (List<String>) redisTemplate.boundValueOps(RedisContant.WD_TAG+"_"+wdInfo.getWdId()).get();
+        List<String> tag = RedisUtils.getCacheMapValue(RedisContant.WD_TAG_MAP, RedisContant.WD_TAG + "_" + wdInfo.getWdId());
+        //List<String> tag = (List<String>) redisTemplate.boundValueOps(RedisContant.WD_TAG+"_"+wdInfo.getWdId()).get();
         return tag;
     }
 
     public  List<List<String>> getWdTagList(List<String> key){
-        List<List<String>>  list = redisTemplate.opsForValue().multiGet(key);
-        return list;
+        Map<String, List<String>> tags = RedisUtils.getMultiCacheMapValue(RedisContant.WD_TAG_MAP, new HashSet<>(key));
+        return new ArrayList<>(tags.values());
     }
 
     /***
      * 分析网点周边标签,并把分析结果存入redis(默认范围:1km)
      * @param wdInfo 中心网点信息
-     * @return
      */
-    public  List<String> wdTagAnalyse(WdInfo wdInfo){
+    public void wdTagAnalyse(WdInfo wdInfo) throws ExecutionException, InterruptedException {
         //1. 找到距离中心网点半径1km的所有网点Id
-        Distance distance = new Distance(1, RedisGeoCommands.DistanceUnit.KILOMETERS);
-        GeoResults<RedisGeoCommands.GeoLocation<Object>> radius = redisTemplate.boundGeoOps(wdInfo.getAddrCode().substring(0, 4)).radius(wdInfo.getWdId(), distance);
-        List<GeoResult<RedisGeoCommands.GeoLocation<Object>>> content = radius.getContent();
-        List<String> aroundWdId = content.stream().map(item -> {
-            String name = (String) item.getContent().getName();
-            return name;
-        }).collect(Collectors.toList());
-
-        //2. 查询周边网点标签的Set集
-        QueryWrapper<WdInfo> queryWrapper = new QueryWrapper<>();
-        queryWrapper.select("type_name_by")
-                .in("wd_id",aroundWdId)
+        RFuture<Map<Object, Double>> radius = RedisUtils.getClient().getGeo(wdInfo.getAddrCode().substring(0, 4)).radiusWithDistanceAsync(wdInfo.getWdId(), 1, GeoUnit.KILOMETERS);
+        CompletableFuture<List<String>> completableFuture = CompletableFuture.supplyAsync(() -> {
+            try {
+                Map<Object, Double> distanceMap = radius.get();
+                List<String> aroundWdId = distanceMap.keySet().stream().map(item -> (String) item).collect(Collectors.toList());
+                //2. 查询周边网点标签的Set集
+                QueryWrapper<WdInfo> queryWrapper = new QueryWrapper<>();
+                queryWrapper.select("type_name_by")
+                    .in("wd_id", aroundWdId)
                     .groupBy("type_name_by");
-        List<WdInfo> wdInfos = wdInfoDao.selectList(queryWrapper);
-        //释放内存
-        aroundWdId = null;
-        aroundWdId = null;
-
-        //3. 打标签
-        List<String> tag = new ArrayList<>();
-        wdInfos.stream().forEach(item -> {
-            if(item != null){
-                String typeNameBy = item.getTypeNameBy();
-                if(typeNameBy != null && !typeNameBy.equals("")){
-                    String[] split = typeNameBy.split(":");
-                    if (split.length > 0)
-                        tag.add(split[split.length-1]);
-                }
+                List<WdInfo> wdInfos = wdInfoDao.selectList(queryWrapper);
+                //3. 打标签
+                List<String> tag = new ArrayList<>();
+                wdInfos.forEach(item -> {
+                    if (item != null) {
+                        String typeNameBy = item.getTypeNameBy();
+                        if (StrUtil.isNotEmpty(typeNameBy)) {
+                            String[] split = typeNameBy.split(":");
+                            if (split.length > 0)
+                                tag.add(split[split.length - 1]);
+                        }
+                    }
+                });
+
+                RedisUtils.setCacheMapValue(RedisContant.WD_TAG_MAP,RedisContant.WD_TAG + "_" + wdInfo.getWdId(), tag);
+                return tag;
+            } catch (Exception e) {
+                e.printStackTrace();
+                return null;
             }
-        });
-        redisTemplate.boundValueOps(RedisContant.WD_TAG+"_"+wdInfo.getWdId()).set(tag);
+        }, executorService);
+
+        completableFuture.get();
 
-        return tag;
     }
 
     /**
@@ -201,53 +208,110 @@ public class WdRedisStoreage {
         long start = System.currentTimeMillis();
         System.out.println("开始将网点坐标存入redis");
         QueryWrapper<WdInfo> queryWrapper = new QueryWrapper<>();
-        queryWrapper.select("wd_id","addr_code","lng","lat");
         queryWrapper.ge("collect_time",time);
+        Long total = wdInfoDao.selectCount(queryWrapper);
+
+        queryWrapper.select("wd_id","addr_code","lng","lat");
+
+        //使用sqlSessionFactory打开一个sqlSession,在没有读取完数据之前不要提交事务或关闭sqlSession
+        System.out.println("----开启sqlSession");
+        SqlSession sqlSession = sqlSessionFactory.openSession();
+        Cursor<WdInfo> cursor = null;
+        try {
+            //获取到指定mapper
+            WdInfoDao mapper = sqlSession.getMapper(WdInfoDao.class);
+            //调用指定mapper的方法,返回一个cursor
+            cursor = mapper.streamQuery(queryWrapper);
+
+            if (cursor != null) {
+                for (WdInfo wdInfo : cursor) {
+                    //打印进度条
+                    Console.printProgress('=', 50, (double) (cursor.getCurrentIndex() + 1) / total);
+
+                    RedisUtils.getClient().getGeo(wdInfo.getAddrCode().substring(0, 4)).add(wdInfo.getLng().doubleValue(), wdInfo.getLat().doubleValue(), wdInfo.getWdId());
+                    wdTagAnalyse(wdInfo);
+                }
+                if (cursor.isConsumed()) {
+                    System.out.println("----查询sql匹配中的数据已经消费完毕!");
+                }
+            }
+            sqlSession.commit();
+            System.out.println("----提交事务");
+        } catch (Exception e) {
+            e.printStackTrace();
+            sqlSession.rollback();
+        } finally {
+            if (cursor != null && cursor.isOpen()) {
+                //关闭cursor
+                cursor.close();
+                System.out.println("----关闭cursor");
+            }
+            if (sqlSession != null) {
+                //全部数据读取并且做好其他业务操作之后,提交事务并关闭连接;
+                sqlSession.close();
+                System.out.println("----关闭sqlSession");
+            }
 
-        Hashtable<String,Integer> hashtable = new Hashtable<>();
-        hashtable.put("total",0);
-        List<CompletableFuture<Void>> completableFutureList = new ArrayList<>();
-        List<WdInfo> wdInfos = wdInfoDao.selectList(queryWrapper);
-        for (WdInfo wdInfo : wdInfos) {
-            //开启线程
-            completableFutureList.add(CompletableFuture.runAsync(()->{
-                redisTemplate.boundGeoOps(wdInfo.getAddrCode().substring(0,4)).add(new Point(wdInfo.getLng().doubleValue(),wdInfo.getLat().doubleValue()),wdInfo.getWdId());
-                wdTagAnalyse(wdInfo);
-                System.out.println("进度:"+wdInfos.size()+"/"+hashtable.get("total"));
-                hashtable.put("total",hashtable.get("total")+1);
-            },executorService));
         }
-        CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[0])).join();
+
         long end = System.currentTimeMillis();
-        System.out.println("完成存入 "+(end-start)/(1000*60));
+        System.out.println("完成存入 耗时:"+(end-start)/(1000*60) + "分钟");
     }
 
-    public void writeWdGeoRedis2(String[] wdTypeCodes){
+    public void writeWdGeoRedis2(String[] wdTypeCodes) throws IOException {
         //对所有网点坐标进行分析打标签
         long start = System.currentTimeMillis();
         System.out.println("开始打标签");
 
         //开始
         QueryWrapper<WdInfo> queryWrapper = new QueryWrapper<>();
-        queryWrapper.select("wd_id","addr_code");
         queryWrapper.in("wd_type_code", wdTypeCodes);
-        List<WdInfo> wdInfos = wdInfoDao.selectList(queryWrapper);
-
-        ArrayList<CompletableFuture<Void>> list = new ArrayList<>();
-        Hashtable<String,Integer> hashtable = new Hashtable<>();
-        hashtable.put("total",1);
-        for (WdInfo wdInfo : wdInfos) {
-            //开启线程
-            CompletableFuture<Void> future = CompletableFuture.runAsync(()->{
-                wdTagAnalyse(wdInfo);
-                System.out.println("进度:"+wdInfos.size()+"/"+hashtable.get("total"));
-                hashtable.put("total",hashtable.get("total")+1);
-            },executorService);
-            list.add(future);
+        Long total = wdInfoDao.selectCount(queryWrapper);
+
+        queryWrapper.select("wd_id","addr_code");
+
+
+        //使用sqlSessionFactory打开一个sqlSession,在没有读取完数据之前不要提交事务或关闭sqlSession
+        System.out.println("----开启sqlSession");
+        SqlSession sqlSession = sqlSessionFactory.openSession();
+        Cursor<WdInfo> cursor = null;
+        try {
+            //获取到指定mapper
+            WdInfoDao mapper = sqlSession.getMapper(WdInfoDao.class);
+            //调用指定mapper的方法,返回一个cursor
+            cursor = mapper.streamQuery(queryWrapper);
+
+            if (cursor != null) {
+                for (WdInfo wdInfo : cursor) {
+                    //打印进度条
+                    Console.printProgress('=', 50, (double) (cursor.getCurrentIndex() + 1) / total);
+                    wdTagAnalyse(wdInfo);
+                }
+                if (cursor.isConsumed()) {
+                    System.out.println("----查询sql匹配中的数据已经消费完毕!");
+                }
+            }
+            sqlSession.commit();
+            System.out.println("----提交事务");
+        } catch (Exception e) {
+            e.printStackTrace();
+            sqlSession.rollback();
+        } finally {
+            if (cursor != null && cursor.isOpen()) {
+                //关闭cursor
+                cursor.close();
+                System.out.println("----关闭cursor");
+            }
+            if (sqlSession != null) {
+                //全部数据读取并且做好其他业务操作之后,提交事务并关闭连接;
+                sqlSession.close();
+                System.out.println("----关闭sqlSession");
+            }
+
         }
-        CompletableFuture.allOf(list.toArray(new CompletableFuture[list.size()])).join();
+
         long end = System.currentTimeMillis();
-        System.out.println("完成打标签 "+(end-start)/(1000*60));
+        System.out.println("完成打标签 耗时:"+(end-start)/(1000*60) + "分钟");
 
     }
 

+ 3 - 0
ruoyi-demo/src/main/resources/mapper/demo/WdInfoDaoMapper.xml

@@ -13,5 +13,8 @@
     <select id="map" resultType="com.ruoyi.demo.entity.bo.PointBo">
         select ${ew.sqlSelect} from ddt_wd_info ${ew.customSqlSegment}
     </select>
+    <select id="streamQuery" resultType="com.ruoyi.demo.entity.WdInfo">
+        select ${ew.sqlSelect} from ddt_wd_info ${ew.customSqlSegment}
+    </select>
 </mapper>
 

+ 3 - 2
ruoyi-job/src/main/java/com/ruoyi/job/service/BenyunService.java

@@ -11,6 +11,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.stereotype.Service;
 
+import java.io.IOException;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
 
@@ -37,7 +38,7 @@ public class BenyunService {
      * 将网点数据加载到redis中
      * */
     @XxlJob("WriteRedisJob")
-    public void WriteRedisJob(){
+    public void WriteRedisJob() throws IOException {
         Boolean aBoolean = redisTemplate.hasKey("isWriteRedis");
         if (aBoolean){
             //将当天新的网点信息加载redis
@@ -69,7 +70,7 @@ public class BenyunService {
      * 网点标签分析定时任务
      */
     @XxlJob("AnalyseJob")
-    public void AnalyseJob(){
+    public void AnalyseJob() throws IOException {
         System.out.println("大统计.....");
         wdRedisStoreage.writeWdGeoRedis2(new String[]{"1"});
         wdRedisStoreage.writeWdGeoRedis2(new String[]{"2","3","5"});