多线程编程的核心目的是充分利用CPU资源,提升程序执行效率。虽然理论上所有多线程代码都可以用单线程实现,但在”多角色”场景中(如生产者-消费者模型),多线程是模拟实际场景的必然选择
一、导出问题 每天大概有40W的数据多个任务同时进行离线导出,等待时间超过一个小时都未导出成功,需要优化缩短导出时间。
分析:
1、插入导出任务
2、定时抓取任务
3、处理导出业务
4、回写导出任务
离线导出业务处理过程分为三个步骤:
查询待导出的明细数据、调用接口组装相关字段数据、写入Excel导出文件,经过耗时分析:调用接口组装相关字段数据这一步,每条记录调用2个接口大概耗时0.01秒。
1 导出24W数据,粗略计算: 240000*0.01/60=40min
二、解决方案 1、按1W条遍历分批处理待导出的明细数据
2、Runnable分组多线程调用接口组装数据
三、安全问题 在操作系统中,线程是不拥有资源的,进程是拥有资源的。而线程是由进程创建的,一个进程可以创建多个线程,这些线程共享着进程中的资源。所以,当线程一起并发运行时,同时对一个数据进行修改,就可能会造成数据的不一致性
解决线程不安全问题
(1)给共享的资源加把锁,保证每个资源变量每时每刻至多被一个线程占用。 (2)让线程也拥有资源,不用去共享进程中的资源。
1、线程安全的集合类接收回调结果数据,保证插入数据过程安全
1 2 3 4 5 6 7 8 9 10 11 List<FmAfgatherVo > newVerifydetaillist = Collections.synchronizedList(new ArrayList<FmAfgatherVo > ()); ArrayList与LinkedList 线程不安全 ArrayList更适合读取数据,linkedList更多的时候添加或删除数据 (1)ArrayList是实现了基于动态数组的数据结构,LinkedList基于链表的数据结构。 (2)对于随机访问get和set,ArrayList觉得优于LinkedList,因为LinkedList要移动指针。 (3)对于新增和删除操作add和remove,LinedList比较占优势,因为ArrayList要移动数据。
2、多线程高并发,导致全部阻塞,线程终止,需要等待定时睡眠,让空闲线程先执行
3、多任务高并发,造成大量线程同时进行,延长的导出时间,加锁lock处理
1 private static Lock serviceLock = new ReentrantLock ();
1 2 3 4 5 4、共享资源:接口方法、写入文件 接口方法:加锁Synchronized关键字 写入文件:因为流通道超时自动关闭,所以采用单线程写入,保证数据完整性
5、线程池ThreadPoolExecutor关闭,抛出拒绝线程进入的java.util.concurrent.RejectedExecutionException异常,选择调用者运行策略CallerRunsPolicy解决
1 2 //有界队列 BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(threadCounts);
1 2 //调用者运行策略 ThreadPoolExecutor pool = new ThreadPoolExecutor(threadCounts, threadCounts, 0L, TimeUnit.MILLISECONDS, queue, new ThreadPoolExecutor.CallerRunsPolicy());
具体分析查看:https://blog.csdn.net/nuowei_senlin/article/details/78631964
四、处理结果 1W数据导出大概0.7~1分钟,粗略统计24W数据导出需要18分钟
五、代码实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public class VerifyDetailReportApi { @Autowired private VerifyDetailService verifyDetailService; public ResponseEntity<DealResultDto> exportOffLineVerifyDetail ( @ApiParam(value = "调用生成导出文件", required = true) @RequestBody FmAfgatherVo vo) throws Exception { DealResultDto dealResultDto = new DealResultDto (); try { serviceLock.lock(); verifyDetailService.exportOffLineVerifyDetail(vo); serviceLock.unlock(); } catch (Exception e) { dealResultDto.setReturnCode("E" ); dealResultDto.setReturnMessage(e.getMessage()); logger.info(e.getMessage()); } dealResultDto.setReturnCode("S" ); dealResultDto.setReturnMessage("导出成功!" ); return Responses.ok(dealResultDto); }
*/
1 2 @Override public void exportOffLineVerifyDetail (FmAfgatherVo dto) throws Exception{
1 logger.info("离线导出收款核销明细表信息(exportOffLineVerifyDetail) --- 传入参数:"+ JSONObject.toJSONString(dto));
1 exportOffLineStart =System.currentTimeMillis();
1 String returnMessage = "正在离线导出文件,请稍后查看!";
1 BigDataExportDto bigData = new BigDataExportDto();
1 2 3 4 5 bigData.setId(dto.getId()); bigData.setLoginUser(dto.getLoginUser()); bigData.setFileName(dto.getFileName()); bigData.setMassage(dto.getMassage()); bigData.setDeleteFileName(dto.getDeleteFileName());
1 2 3 //1.调用下级单位查询接口 if (PubUtils.isNotNull(dto.getCorpInnercode())) { restInterFaceService.insertSubCorpInfo(dto.getPkCorp());
}
1 2 //2.查询收款核销明细表总数 int count = fmAfgatherMapper.selectVerifyDetailCount(dto);
1 logger.info("收款核销明细表信息(exportOffLineVerifyDetail) -离线导出 --- 查询总数为" + count);
1 2 3 4 5 6 7 if (count == Constant .MIN_EXPORT ) { returnMessage = "没有查询到任何数据,导出结束!" ; bigData.setMassage (returnMessage); createExportReportService.createSuccess (bigData); logger.error ("收款核销明细表信息(exportOffLineVerifyDetail) -离线导出-" +returnMessage);
}
1 logger.info("收款核销明细表信息(exportOffLineVerifyDetail) -离线导出 --- 导出文件开始");
1 logger.info("收款核销明细表信息(exportOffLineVerifyDetail) -离线导出 --- 按分页条数遍历写入开始");
1 2 //按分页条数遍历查询 for(int startnumber = 0; startnumber < count; startnumber+=10000){
1 2 3 int endnumber = startnumber+10000; dto.setStartnumber(startnumber); dto.setEndnumber(endnumber);
1 2 3 4 //多线程分组处理业务 logger.info("收款核销明细表信息(exportOffLineVerifyDetail) -离线导出 --- 多线程分组处理业务开始"); createExportFile(dto ,bigData, endnumber, count); logger.info("收款核销明细表信息(exportOffLineVerifyDetail) -离线导出 --- 多线程分组处理业务结束");
}
1 2 3 4 5 if (sumListData.size() >= count) { //写入核销明细数据 createFileService.createFile(bigData, sumListData, title, column, WebConstant.FIELD_VALUE_MAP,true); logger.info("收款核销明细表信息(exportOffLineVerifyDetail) -离线导出 ---按分页条数遍历写入总数:"+sumListData.size()); logger.info("收款核销明细表信息(exportOffLineVerifyDetail) -离线导出 --- 生成文件和任务回写结束");
}
1 2 3 //所有线程执行完毕 long exportOffLineEnd = System.currentTimeMillis(); logger.info("写入核销明细数据(exportOffLineVerifyDetail)-"+dto.getLoginUser()+"--"+dto.getFileName()+"所有任务执行完毕-离线导出总数: "+sumListData.size()+"--离线导出总耗时: " + (exportOffLineEnd - exportOffLineStart) + "毫秒");
1 2 3 4 }catch(Exception e){ logger.info("收款核销明细表信息(exportOffLineVerifyDetail)-离线导出任务异常: "+e.getMessage()); bigData.setMassage("收款核销明细表信息(exportOffLineVerifyDetail)-离线导出任务异常: "+e.getMessage()); createExportReportService.createError(bigData);
}
}
1 private void createExportFile (FmAfgatherVo dto, BigDataExportDto bigData, int endnumber,int count) throws Exception {
1 2 //查询核销明细数据 List<FmAfgatherVo> dataList = fmAfgatherMapper.selectVerifyDetailList(dto);
1 int threadCounts = interfaceConfig.getThreadCounts();
1 if (dataList != null && dataList.size() > 0) {
1 //ExecutorService pool = Executors.newFixedThreadPool(threadCounts);
1 2 3 4 5 6 7 int dataCount = dataList.size(); int len = dataCount/threadCounts;//分批数 if(len == 0){ //采用一个线程处理List中的一个元素 threadCounts = dataCount; //重新平均分割List len = 1;
}
1 2 3 4 5 6 for(int i=0; i<threadCounts; i++){ final List<FmAfgatherVo> subList; if(i == threadCounts-1){ subList = dataList.subList(i*len, dataCount); }else{ subList = dataList.subList(i*len, len*(i+1)>dataList.size()?dataList.size():len*(i+1));
}
1 2 3 Runnable run = new Runnable () { public void run () { try {
1 2 3 4 5 6 7 8 //调接口组装明细数据 logger.info("调接口组装明细数据----------开始"); List<FmAfgatherVo> fmAfgatherDtoList = assembleData(subList); //写入计数 sum = sum + fmAfgatherDtoList.size(); sumListData.addAll(fmAfgatherDtoList); long exportFileEnd = System.currentTimeMillis(); logger.info("多线程分组处理业务(createExportFile)-"+dto.getLoginUser()+"--"+dto.getFileName()+"接口获取核销明细数据---当前执行数:"+sum+"----耗时:"+(exportFileEnd-exportOffLineStart)+"毫秒---导出总数:"+count);
1 logger.info("多线程分组处理业务(createExportFile)-导出业务处理异常(createExportFile):", e.getMessage(),e);
}
};
}
1 2 3 4 while (true) { if (pool.isTerminated()) { logger.info("多线程分组处理业务(createExportFile)-------"+dto.getLoginUser()+"--"+dto.getFileName()+"-------多线程执行"+sum+"条数据结束"); break;
}
}
1 pool.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
}
1、[谈谈我对多线程的理解](https://blog.csdn.net/dongmeng1994/article/details/54586466)
2、[线程池,这一篇或许就够了](https://www.jianshu.com/p/210eab345423)
3、[java多线程技能](https://www.cnblogs.com/-new/p/7156811.html)
4、[Java并发包:ExecutorService和ThreadPoolExecutor](https://blog.csdn.net/zxc123e/article/details/51891200)
本文标题: Java多线程处理大量数据
发布时间: 2023年02月05日 00:00
最后更新: 2025年12月30日 08:54
原始链接: https://haoxiang.eu.org/a0df3d6/
版权声明: 本文著作权归作者所有,均采用CC BY-NC-SA 4.0 许可协议,转载请注明出处!