多线程编程的核心目的是充分利用CPU资源,提升程序执行效率。虽然理论上所有多线程代码都可以用单线程实现,但在”多角色”场景中(如生产者-消费者模型),多线程是模拟实际场景的必然选择

MultiThread

一、导出问题

每天大概有40W的数据多个任务同时进行离线导出,等待时间超过一个小时都未导出成功,需要优化缩短导出时间。

分析:

1、插入导出任务

2、定时抓取任务

3、处理导出业务

4、回写导出任务

离线导出业务处理过程分为三个步骤:

查询待导出的明细数据、调用接口组装相关字段数据、写入Excel导出文件,经过耗时分析:调用接口组装相关字段数据这一步,每条记录调用2个接口大概耗时0.01秒。

1
导出24W数据,粗略计算: 240000*0.01/60=40min

二、解决方案

1、按1W条遍历分批处理待导出的明细数据

2、Runnable分组多线程调用接口组装数据

三、安全问题

在操作系统中,线程是不拥有资源的,进程是拥有资源的。而线程是由进程创建的,一个进程可以创建多个线程,这些线程共享着进程中的资源。所以,当线程一起并发运行时,同时对一个数据进行修改,就可能会造成数据的不一致性

解决线程不安全问题

(1)给共享的资源加把锁,保证每个资源变量每时每刻至多被一个线程占用。
(2)让线程也拥有资源,不用去共享进程中的资源。

1、线程安全的集合类接收回调结果数据,保证插入数据过程安全

1
2
3
4
5
6
7
8
9
10
11
List<FmAfgatherVo> newVerifydetaillist = Collections.synchronizedList(new ArrayList<FmAfgatherVo>());

ArrayList与LinkedList 线程不安全

ArrayList更适合读取数据,linkedList更多的时候添加或删除数据

(1)ArrayList是实现了基于动态数组的数据结构,LinkedList基于链表的数据结构。
(2)对于随机访问get和set,ArrayList觉得优于LinkedList,因为LinkedList要移动指针。
(3)对于新增和删除操作add和remove,LinedList比较占优势,因为ArrayList要移动数据。


2、多线程高并发,导致全部阻塞,线程终止,需要等待定时睡眠,让空闲线程先执行

3、多任务高并发,造成大量线程同时进行,延长的导出时间,加锁lock处理

1
private static Lock serviceLock = new ReentrantLock();
1
2
3
4
5
4、共享资源:接口方法、写入文件

接口方法:加锁Synchronized关键字
写入文件:因为流通道超时自动关闭,所以采用单线程写入,保证数据完整性

5、线程池ThreadPoolExecutor关闭,抛出拒绝线程进入的java.util.concurrent.RejectedExecutionException异常,选择调用者运行策略CallerRunsPolicy解决

1
2
//有界队列
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(threadCounts);
1
2
//调用者运行策略
ThreadPoolExecutor pool = new ThreadPoolExecutor(threadCounts, threadCounts, 0L, TimeUnit.MILLISECONDS, queue, new ThreadPoolExecutor.CallerRunsPolicy());

具体分析查看:
https://blog.csdn.net/nuowei_senlin/article/details/78631964

四、处理结果

1W数据导出大概0.7~1分钟,粗略统计24W数据导出需要18分钟

五、代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* 接口
* @author Howe Hsiang
*/

public class VerifyDetailReportApi {

@Autowired
private VerifyDetailService verifyDetailService;

//锁


public ResponseEntity<DealResultDto> exportOffLineVerifyDetail(
@ApiParam(value = "调用生成导出文件", required = true)
@RequestBody FmAfgatherVo vo) throws Exception {

DealResultDto dealResultDto = new DealResultDto();

try {

serviceLock.lock();
verifyDetailService.exportOffLineVerifyDetail(vo);
serviceLock.unlock();

} catch (Exception e) {
dealResultDto.setReturnCode("E");
dealResultDto.setReturnMessage(e.getMessage());
logger.info(e.getMessage());
}

dealResultDto.setReturnCode("S");
dealResultDto.setReturnMessage("导出成功!");

return Responses.ok(dealResultDto);
}


1
2
/**
* 离线导出收款核销明细表信息

*/

1
2
@Override
public void exportOffLineVerifyDetail(FmAfgatherVo dto)throws Exception{
1
logger.info("离线导出收款核销明细表信息(exportOffLineVerifyDetail) --- 传入参数:"+ JSONObject.toJSONString(dto));
1
sum = 0;
1
exportOffLineStart =System.currentTimeMillis();
1
String returnMessage = "正在离线导出文件,请稍后查看!";
1
BigDataExportDto bigData = new BigDataExportDto();
1
2
3
4
5
bigData.setId(dto.getId());
bigData.setLoginUser(dto.getLoginUser());
bigData.setFileName(dto.getFileName());
bigData.setMassage(dto.getMassage());
bigData.setDeleteFileName(dto.getDeleteFileName());
1
2
3
//1.调用下级单位查询接口
if (PubUtils.isNotNull(dto.getCorpInnercode())) {
restInterFaceService.insertSubCorpInfo(dto.getPkCorp());
}
 
1
2
//2.查询收款核销明细表总数
int count = fmAfgatherMapper.selectVerifyDetailCount(dto);
1
logger.info("收款核销明细表信息(exportOffLineVerifyDetail) -离线导出 --- 查询总数为" + count);
1
2
3
4
5
6
7
//3.查询总数为0时
if (count == Constant.MIN_EXPORT) {
returnMessage = "没有查询到任何数据,导出结束!";
bigData.setMassage(returnMessage);
//调用文件导出生成处理,进行任务回写
createExportReportService.createSuccess(bigData);
logger.error("收款核销明细表信息(exportOffLineVerifyDetail) -离线导出-"+returnMessage);
}
1
logger.info("收款核销明细表信息(exportOffLineVerifyDetail) -离线导出  --- 导出文件开始");
1
try{
1
logger.info("收款核销明细表信息(exportOffLineVerifyDetail) -离线导出   --- 按分页条数遍历写入开始");
1
2
//按分页条数遍历查询
for(int startnumber = 0; startnumber < count; startnumber+=10000){
1
2
3
int endnumber = startnumber+10000;
dto.setStartnumber(startnumber);
dto.setEndnumber(endnumber);
1
2
3
4
//多线程分组处理业务
logger.info("收款核销明细表信息(exportOffLineVerifyDetail) -离线导出 --- 多线程分组处理业务开始");
createExportFile(dto ,bigData, endnumber, count);
logger.info("收款核销明细表信息(exportOffLineVerifyDetail) -离线导出 --- 多线程分组处理业务结束");
    }
1
2
3
4
5
if (sumListData.size() >= count) {
//写入核销明细数据
createFileService.createFile(bigData, sumListData, title, column, WebConstant.FIELD_VALUE_MAP,true);
logger.info("收款核销明细表信息(exportOffLineVerifyDetail) -离线导出 ---按分页条数遍历写入总数:"+sumListData.size());
logger.info("收款核销明细表信息(exportOffLineVerifyDetail) -离线导出 --- 生成文件和任务回写结束");
    }
1
2
3
      //所有线程执行完毕
long exportOffLineEnd = System.currentTimeMillis();
logger.info("写入核销明细数据(exportOffLineVerifyDetail)-"+dto.getLoginUser()+"--"+dto.getFileName()+"所有任务执行完毕-离线导出总数: "+sumListData.size()+"--离线导出总耗时: " + (exportOffLineEnd - exportOffLineStart) + "毫秒");
1
2
3
4
}catch(Exception e){
logger.info("收款核销明细表信息(exportOffLineVerifyDetail)-离线导出任务异常: "+e.getMessage());
bigData.setMassage("收款核销明细表信息(exportOffLineVerifyDetail)-离线导出任务异常: "+e.getMessage());
createExportReportService.createError(bigData);
}

}

1
private void createExportFile(FmAfgatherVo  dto, BigDataExportDto bigData, int endnumber,int count) throws Exception {
1
2
   //查询核销明细数据
List<FmAfgatherVo> dataList = fmAfgatherMapper.selectVerifyDetailList(dto);
1
int threadCounts = interfaceConfig.getThreadCounts();
1
if (dataList != null && dataList.size() > 0) {
1
//ExecutorService pool = Executors.newFixedThreadPool(threadCounts);
1
2
3
4
5
6
7
int dataCount = dataList.size();
int len = dataCount/threadCounts;//分批数
if(len == 0){
//采用一个线程处理List中的一个元素
threadCounts = dataCount;
//重新平均分割List
len = 1;
    }
1
2
3
4
5
6
for(int i=0; i<threadCounts; i++){
final List<FmAfgatherVo> subList;
if(i == threadCounts-1){
subList = dataList.subList(i*len, dataCount);
}else{
subList = dataList.subList(i*len, len*(i+1)>dataList.size()?dataList.size():len*(i+1));
        }
1
2
3
Runnable run = new Runnable() {
public void run() {
try {
1
2
3
4
5
6
7
8
//调接口组装明细数据
logger.info("调接口组装明细数据----------开始");
List<FmAfgatherVo> fmAfgatherDtoList = assembleData(subList);
//写入计数
sum = sum + fmAfgatherDtoList.size();
sumListData.addAll(fmAfgatherDtoList);
long exportFileEnd = System.currentTimeMillis();
logger.info("多线程分组处理业务(createExportFile)-"+dto.getLoginUser()+"--"+dto.getFileName()+"接口获取核销明细数据---当前执行数:"+sum+"----耗时:"+(exportFileEnd-exportOffLineStart)+"毫秒---导出总数:"+count);
1
logger.info("多线程分组处理业务(createExportFile)-导出业务处理异常(createExportFile):", e.getMessage(),e);
                }	
        };
1
pool.execute(run);
    }
1
pool.shutdown();
1
2
3
4
while (true) {
if (pool.isTerminated()) {
logger.info("多线程分组处理业务(createExportFile)-------"+dto.getLoginUser()+"--"+dto.getFileName()+"-------多线程执行"+sum+"条数据结束");
break;
        }
1
Thread.sleep(100);
    }
    
1
pool.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);  
}


1、[谈谈我对多线程的理解](https://blog.csdn.net/dongmeng1994/article/details/54586466)

2、[线程池,这一篇或许就够了](https://www.jianshu.com/p/210eab345423)

3、[java多线程技能](https://www.cnblogs.com/-new/p/7156811.html)

4、[Java并发包:ExecutorService和ThreadPoolExecutor](https://blog.csdn.net/zxc123e/article/details/51891200)

本文标题: Java多线程处理大量数据

发布时间: 2023年02月05日 00:00

最后更新: 2025年12月30日 08:54

原始链接: https://haoxiang.eu.org/a0df3d6/

版权声明: 本文著作权归作者所有,均采用CC BY-NC-SA 4.0许可协议,转载请注明出处!

× 喜欢就赞赏一下呗!
打赏二维码