前言 最近有个需求解析一个订单文件,并且说明文件可达到千万条数据,每条数据大概在20个字段左右,每个字段使用逗号分隔,需要尽量在半小时内入库。 思路 1。估算文件大小 因为告诉文件有千万。..
一、如何快速安全插入千万条数据 1.1 前言 最近遇到个需求,要解析一个订单文件,文件可能有千万条数据,每条数据大概20个字段,用逗号分隔,需要在半小时内入库。
1.2 思路 1.估算文件大小 文件有千万条,每条记录大概20个字段,先估算下整个文件的大小。方法很简单,用FileWriter往文件里插入一千万条数据,看看文件大小,测试下来大概1.5G左右。
2.如何批量插入 文件比较大,一次性读进内存肯定不行。方法是每次从订单文件里截取一部分数据,然后批量插入。批量插入可以用insert(…)values(…),(…)的方式,测试下来效率还是挺高的。
3.数据的完整性 截取数据的时候要注意,必须保证数据完整性。每条记录最后都是换行符,要根据这个标识保证每次截取都是整条数据,不能出现半条数据的情况。
4.数据库是否支持批次数据 需要做批次插入,数据库得支持大量数据写入。比如用的MySQL,可以通过设置max_allowed_packet来保证批次提交的数据量。
5.中途出错的情况 大文件解析,如果中途出错,比如数据插到900w的时候数据库连接失败,不可能重新来一遍。所以需要记录每次插入数据的位置,并且和批次插入的数据在同一个事务中,这样恢复之后可以从记录的位置继续插入。
1.3 实现 1.准备数据表 这里需要准备两张表分别是:订单状态位置信息表,订单表;
1 2 3 4 5 6 7 8 9 10 11 CREATE TABLE `file_analysis` ( `id` bigint (20 ) NOT NULL AUTO_INCREMENT, `file_type` varchar (255 ) NOT NULL COMMENT '文件类型 01:类型1,02:类型2' , `file_name` varchar (255 ) NOT NULL COMMENT '文件名称' , `file_path` varchar (255 ) NOT NULL COMMENT '文件路径' , `status` varchar (255 ) NOT NULL COMMENT '文件状态 0初始化;1成功;2失败:3处理中' , `position` bigint (20 ) NOT NULL COMMENT '上一次处理完成的位置' , `crt_time` datetime NOT NULL COMMENT '创建时间' , `upd_time` datetime NOT NULL COMMENT '更新时间' , PRIMARY KEY (`id`) ) ENGINE= InnoDB AUTO_INCREMENT= 2 DEFAULT CHARSET= utf8
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 CREATE TABLE `file_order` ( `file_id` bigint (20 ) DEFAULT NULL , `field1` varchar (255 ) DEFAULT NULL , `field2` varchar (255 ) DEFAULT NULL , `field3` varchar (255 ) DEFAULT NULL , `field4` varchar (255 ) DEFAULT NULL , `field5` varchar (255 ) DEFAULT NULL , `field6` varchar (255 ) DEFAULT NULL , `field7` varchar (255 ) DEFAULT NULL , `field8` varchar (255 ) DEFAULT NULL , `field9` varchar (255 ) DEFAULT NULL , `field10` varchar (255 ) DEFAULT NULL , `field11` varchar (255 ) DEFAULT NULL , `field12` varchar (255 ) DEFAULT NULL , `field13` varchar (255 ) DEFAULT NULL , `field14` varchar (255 ) DEFAULT NULL , `field15` varchar (255 ) DEFAULT NULL , `field16` varchar (255 ) DEFAULT NULL , `field17` varchar (255 ) DEFAULT NULL , `field18` varchar (255 ) DEFAULT NULL , ) ENGINE= InnoDB AUTO_INCREMENT= 10000024 DEFAULT CHARSET= utf8
2.配置数据库包大小 1 2 3 4 5 6 7 8 9 MySQL> show VARIABLES like '%max_allowed_packet%' ; + | Variable_name | Value | | max_allowed_packet | 1048576 | | slave_max_allowed_packet | 1073741824 | 2 rows in set MySQL> set global max_allowed_packet = 1024 * 1024 * 10 ; Query OK, 0 rows affected
通过设置max_allowed_packet,保证数据库能够接收批次插入的数据包大小;不然会出现如下错误:
1 2 3 4 5 Caused by: com.mysql.jdbc.PacketTooBigException: Packet for query is too large (4980577 > 1048576 ) . You can change this value on the server by setting the max_allowed_packet' variable. at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3915) at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2598) at com.MySQL.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2778) at com.MySQL.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2834)
3.准备测试数据 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public static void main (String[] args) throws IOException { FileWriter out = new FileWriter (new File ("D://xxxxxxx//orders.txt" )); for (int i = 0 ; i < 10000000 ; i++) { out.write( "vaule1,vaule2,vaule3,vaule4,vaule5,vaule6,vaule7,vaule8,vaule9,vaule10,vaule11,vaule12,vaule13,vaule14,vaule15,vaule16,vaule17,vaule18" ); out.write(System.getProperty("line.separator" )); } out.close(); } ``` 使用FileWriter遍历往一个文件里插入1000w条数据即可,这个速度还是很快的,不要忘了在每条数据的后面添加换行符(\n\r); ###### 4. 截取数据的完整性 除了需要设置每次读取文件的大小,同时还需要设置一个参数,用来每次获取一小部分数据,从这小部分数据中获取换行符(\n\r),如果获取不到一直累加直接获取为止,这个值设置大小大致同每条数据的大小差不多合适,部分实现如下: ByteBuffer byteBuffer = ByteBuffer.allocate(buffSize); long endPosition = batchFileSize + startPosition - buffSize;long startTime, endTime;for (int i = 0 ; i < count; i++) { startTime = System.currentTimeMillis(); if (i + 1 != count) { int read = inputChannel.read(byteBuffer, endPosition); readW: while (read != -1 ) { byteBuffer.flip(); byte [] array = byteBuffer.array(); for (int j = 0 ; j < array.length; j++) { byte b = array[j]; if (b == 10 || b == 13 ) { endPosition += j; break readW; } endPosition += buffSize; byteBuffer.clear(); read = inputChannel.read(byteBuffer, endPosition); } } else { endPosition = fileSize; } ...省略,更多可以查看Github完整代码... }
如上代码所示开辟了一个缓冲区,根据每行数据大小来定大概在200字节左右,然后通过遍历查找换行符(\n\r),找到以后将当前的位置加到之前的结束位置上,保证了数据的完整性。
5.批次插入数据 通过insert(…)values(…),(…)的方式批次插入数据,部分代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 SqlSession session = sqlSessionFactory.openSession();try { long startTime = System.currentTimeMillis(); FielAnalysisMapper fielAnalysisMapper = session.getMapper(FielAnalysisMapper.class); FileOrderMapper fileOrderMapper = session.getMapper(FileOrderMapper.class); fileOrderMapper.batchInsert(orderList); fileAnalysis.setPosition(endPosition + 1 ); fileAnalysis.setStatus("3" ); fileAnalysis.setUpdTime(new Date ()); fielAnalysisMapper.updateFileAnalysis(fileAnalysis); session.commit(); long endTime = System.currentTimeMillis(); System.out.println("===插入数据花费:" + (endTime - startTime) + "ms===" ); } catch (Exception e) { session.rollback(); } finally { session.close();
}
如上代码在一个事务中同时保存批次订单数据和文件解析位置信息,batchInsert通过使用mybatis的<foreach>标签来遍历订单列表,生成values数据;
#### 1.4 总结
以上展示了部分代码,完整的代码可以查看Github地址中的batchInsert模块,本地设置每次截取的文件大小为2M,经测试1000w条数据(大小1.5G左右)插入MySQL数据库中,大概花费时间在20分钟左右,当然可以通过设置截取的文件大小,花费的时间也会相应的改变。
#### 1.5 完整代码
Github
本文标题: 如何快速安全的插入千万条数据
发布时间: 2021年04月18日 00:00
最后更新: 2025年12月30日 08:54
原始链接: https://haoxiang.eu.org/bbe55a08/
版权声明: 本文著作权归作者所有,均采用CC BY-NC-SA 4.0 许可协议,转载请注明出处!