用多线程只有一个目的,那就是更好的利用cpu的资源,因为所有的多线程代码都可以用单线程来实现。说这个话其实只有一半对,因为反应“多角色”的程序代码,最起码每个角色要给他一个线程吧,否则连实际场景都无法模拟,当然也没法说能用单线程来实现:比如最常见的“生产者,消费者模型”。

MultiThread

一、导出问题

每天大概有40W的数据多个任务同时进行离线导出,等待时间超过一个小时都未导出成功,需要优化缩短导出时间。

分析:

1、插入导出任务

2、定时抓取任务

3、处理导出业务

4、回写导出任务

离线导出业务处理过程分为三个步骤:

查询待导出的明细数据、调用接口组装相关字段数据、写入Excel导出文件,经过耗时分析:调用接口组装相关字段数据这一步,每条记录调用2个接口大概耗时0.01秒。

导出24W数据,粗略计算: 240000*0.01/60=40min

二、解决方案

1、按1W条遍历分批处理待导出的明细数据

2、Runnable分组多线程调用接口组装数据

三、安全问题

在操作系统中,线程是不拥有资源的,进程是拥有资源的。而线程是由进程创建的,一个进程可以创建多个线程,这些线程共享着进程中的资源。所以,当线程一起并发运行时,同时对一个数据进行修改,就可能会造成数据的不一致性

解决线程不安全问题

(1)给共享的资源加把锁,保证每个资源变量每时每刻至多被一个线程占用。
(2)让线程也拥有资源,不用去共享进程中的资源。

1、线程安全的集合类接收回调结果数据,保证插入数据过程安全

1
2
3
4
5
6
7
8
9
10
11

List<FmAfgatherVo> newVerifydetaillist = Collections.synchronizedList(new ArrayList<FmAfgatherVo>());

ArrayList与LinkedList 线程不安全

ArrayList更适合读取数据,linkedList更多的时候添加或删除数据

(1)ArrayList是实现了基于动态数组的数据结构,LinkedList基于链表的数据结构。
(2)对于随机访问get和set,ArrayList觉得优于LinkedList,因为LinkedList要移动指针。
(3)对于新增和删除操作add和remove,LinedList比较占优势,因为ArrayList要移动数据。

2、多线程高并发,导致全部阻塞,线程终止,需要等待定时睡眠,让空闲线程先执行

3、多任务高并发,造成大量线程同时进行,延长的导出时间,加锁lock处理

1
private static Lock serviceLock = new ReentrantLock();

4、共享资源:接口方法、写入文件

1
2
接口方法:加锁Synchronized关键字
写入文件:因为流通道超时自动关闭,所以采用单线程写入,保证数据完整性

5、线程池ThreadPoolExecutor关闭,抛出拒绝线程进入的java.util.concurrent.RejectedExecutionException异常,选择调用者运行策略CallerRunsPolicy解决

//有界队列
BlockingQueuequeue = new ArrayBlockingQueue(threadCounts);

//调用者运行策略
ThreadPoolExecutor pool = new ThreadPoolExecutor(threadCounts, threadCounts, 0L, TimeUnit.MILLISECONDS, queue, new ThreadPoolExecutor.CallerRunsPolicy());

具体分析查看:
https://blog.csdn.net/nuowei_senlin/article/details/78631964

四、处理结果

1W数据导出大概0.7~1分钟,粗略统计24W数据导出需要18分钟

MultiThread

五、代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

/**
*
* 接口
*
* @author Howe Hsiang
*/

public class VerifyDetailReportApi {

@Autowired
private VerifyDetailService verifyDetailService;

//锁
private static Lock serviceLock = new ReentrantLock();


public ResponseEntity<DealResultDto> exportOffLineVerifyDetail(
@ApiParam(value = "调用生成导出文件", required = true)
@RequestBody FmAfgatherVo vo) throws Exception {

DealResultDto dealResultDto = new DealResultDto();

try {

serviceLock.lock();
verifyDetailService.exportOffLineVerifyDetail(vo);
serviceLock.unlock();

} catch (Exception e) {
dealResultDto.setReturnCode("E");
dealResultDto.setReturnMessage(e.getMessage());
logger.info(e.getMessage());
}

dealResultDto.setReturnCode("S");
dealResultDto.setReturnMessage("导出成功!");

return Responses.ok(dealResultDto);
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152

/**
* 离线导出收款核销明细表信息
*/
@Override
public void exportOffLineVerifyDetail(FmAfgatherVo dto)throws Exception{

logger.info("离线导出收款核销明细表信息(exportOffLineVerifyDetail) --- 传入参数:"+ JSONObject.toJSONString(dto));

sum = 0;

exportOffLineStart =System.currentTimeMillis();

String returnMessage = "正在离线导出文件,请稍后查看!";

BigDataExportDto bigData = new BigDataExportDto();

bigData.setId(dto.getId());
bigData.setLoginUser(dto.getLoginUser());
bigData.setFileName(dto.getFileName());
bigData.setMassage(dto.getMassage());
bigData.setDeleteFileName(dto.getDeleteFileName());

//1.调用下级单位查询接口
if (PubUtils.isNotNull(dto.getCorpInnercode())) {
restInterFaceService.insertSubCorpInfo(dto.getPkCorp());
}

//2.查询收款核销明细表总数
int count = fmAfgatherMapper.selectVerifyDetailCount(dto);

logger.info("收款核销明细表信息(exportOffLineVerifyDetail) -离线导出 --- 查询总数为" + count);

//3.查询总数为0时
if (count == Constant.MIN_EXPORT) {
returnMessage = "没有查询到任何数据,导出结束!";
bigData.setMassage(returnMessage);
//调用文件导出生成处理,进行任务回写
createExportReportService.createSuccess(bigData);
logger.error("收款核销明细表信息(exportOffLineVerifyDetail) -离线导出-"+returnMessage);
}

logger.info("收款核销明细表信息(exportOffLineVerifyDetail) -离线导出 --- 导出文件开始");

try{

logger.info("收款核销明细表信息(exportOffLineVerifyDetail) -离线导出 --- 按分页条数遍历写入开始");

//按分页条数遍历查询
for(int startnumber = 0; startnumber < count; startnumber+=10000){

int endnumber = startnumber+10000;
dto.setStartnumber(startnumber);
dto.setEndnumber(endnumber);

//多线程分组处理业务
logger.info("收款核销明细表信息(exportOffLineVerifyDetail) -离线导出 --- 多线程分组处理业务开始");
createExportFile(dto ,bigData, endnumber, count);
logger.info("收款核销明细表信息(exportOffLineVerifyDetail) -离线导出 --- 多线程分组处理业务结束");
}

if (sumListData.size() >= count) {
//写入核销明细数据
createFileService.createFile(bigData, sumListData, title, column, WebConstant.FIELD_VALUE_MAP,true);
logger.info("收款核销明细表信息(exportOffLineVerifyDetail) -离线导出 ---按分页条数遍历写入总数:"+sumListData.size());
//调用文件导出生成处理,进行任务回写
createExportReportService.createSuccess(bigData);
logger.info("收款核销明细表信息(exportOffLineVerifyDetail) -离线导出 --- 生成文件和任务回写结束");

}

//所有线程执行完毕
long exportOffLineEnd = System.currentTimeMillis();
logger.info("写入核销明细数据(exportOffLineVerifyDetail)-"+dto.getLoginUser()+"--"+dto.getFileName()+"所有任务执行完毕-离线导出总数: "+sumListData.size()+"--离线导出总耗时: " + (exportOffLineEnd - exportOffLineStart) + "毫秒");

}catch(Exception e){
logger.info("收款核销明细表信息(exportOffLineVerifyDetail)-离线导出任务异常: "+e.getMessage());
bigData.setMassage("收款核销明细表信息(exportOffLineVerifyDetail)-离线导出任务异常: "+e.getMessage());
//调用文件导出生成处理,进行任务回写
createExportReportService.createError(bigData);
}

}

//多线程分组处理业务
private void createExportFile(FmAfgatherVo dto, BigDataExportDto bigData, int endnumber,int count) throws Exception {

//查询核销明细数据
List<FmAfgatherVo> dataList = fmAfgatherMapper.selectVerifyDetailList(dto);

int threadCounts = interfaceConfig.getThreadCounts();

if (dataList != null && dataList.size() > 0) {

//ExecutorService pool = Executors.newFixedThreadPool(threadCounts);

//有界队列
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(threadCounts);

//调用者运行策略
ThreadPoolExecutor pool = new ThreadPoolExecutor(threadCounts, threadCounts, 0L, TimeUnit.MILLISECONDS, queue, new ThreadPoolExecutor.CallerRunsPolicy());

int dataCount = dataList.size();
int len = dataCount/threadCounts;//分批数
if(len == 0){
//采用一个线程处理List中的一个元素
threadCounts = dataCount;
//重新平均分割List
len = 1;
}
for(int i=0; i<threadCounts; i++){
final List<FmAfgatherVo> subList;
if(i == threadCounts-1){
subList = dataList.subList(i*len, dataCount);
}else{
subList = dataList.subList(i*len, len*(i+1)>dataList.size()?dataList.size():len*(i+1));
}
Runnable run = new Runnable() {
@Override
public void run() {
try {

//调接口组装明细数据
logger.info("调接口组装明细数据----------开始");
List<FmAfgatherVo> fmAfgatherDtoList = assembleData(subList);
//写入计数
sum = sum + fmAfgatherDtoList.size();
sumListData.addAll(fmAfgatherDtoList);
long exportFileEnd = System.currentTimeMillis();
logger.info("多线程分组处理业务(createExportFile)-"+dto.getLoginUser()+"--"+dto.getFileName()+"接口获取核销明细数据---当前执行数:"+sum+"----耗时:"+(exportFileEnd-exportOffLineStart)+"毫秒---导出总数:"+count);

} catch (Exception e) {
logger.info("多线程分组处理业务(createExportFile)-导出业务处理异常(createExportFile):", e.getMessage(),e);
}
}
};
pool.execute(run);
}
pool.shutdown();

while (true) {
if (pool.isTerminated()) {
logger.info("多线程分组处理业务(createExportFile)-------"+dto.getLoginUser()+"--"+dto.getFileName()+"-------多线程执行"+sum+"条数据结束");
break;
}
Thread.sleep(100);
}

pool.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
}
}

六、参考链接

1、谈谈我对多线程的理解

2、线程池,这一篇或许就够了

3、java多线程技能

4、Java并发包:ExecutorService和ThreadPoolExecutor

本文标题: Java多线程处理大量数据离线导出-实战篇

本文作者: 狂欢马克思

发布时间: 2024年05月05日 00:00

最后更新: 2025年04月03日 11:07

原始链接: https://haoxiang.eu.org/ea524d8/

版权声明: 本文著作权归作者所有,均采用CC BY-NC-SA 4.0许可协议,转载请注明出处!

× 喜欢就赞赏一下呗!
打赏二维码