背景:使用JdbcTemplate查询500万数据,然后插入到数据库。
这么多的数据按照普通的方式直接查询然后插入,服务器肯定会挂掉,我尝试过使用分页查询的方式去进行分批查询插入,虽然也能达到保证服务器不挂掉的效果,但是有一个严重的问题,每次查询的数据很难保证顺序性,第一次一查询的数据可能又出现在第N次的查询结果中,虽然可以通过在查询sql中加上排序,可以保证多次查询的顺序不变,但是这种分页查询方式还是不够严谨,因为在多次查询过程中,可能数据有新增或删除,即使保证了排序唯一性,也会导致数据少取或取重复问题。
这个过程中需要解决的问题:
一、内存溢出
使用jdbcTemplate.queryForList查询一次读取500万条数据,会占用大量内存,一般的服务器都会内存溢出报错,jdbcTemplate默认使用RowMapperResultSetExtractor来处理ResultSet结果集,会将数据全部读取到内存:
因此我们需要自己写一个实现类继承ResultSetExtractor,去实现读取ResultSet的逻辑。
一、批量插入速度慢
我们使用jdbcTemplate的batchUpdate方法批量保存数据时,要想真正进行批量保存需要几个条件
1.首先要数据库本身要支持批量更新,一般主流数据库都会支持。
2.插入的sql语句不要使用子查询
插入语句只使用insert into table() values()这种,不要在values中使用select语句
3.数据源连接设置rewriteBatchedStatements=true这个参数
在oracle驱动中rewriteBatchedStatements参数默认是开启的,mysql没有开启,需要在数据源url连接中手动设置:
自定义ResultSetExtractor如下:
package com.zhou.db.model;
import com.zhou.db.util.SqlUtil;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.ResultSetExtractor;
import org.springframework.jdbc.support.JdbcUtils;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
/**
* 查询数据自定义处理ResultSet
* @author lang.zhou
* @since 2023/1/9 17:42
*/
@Slf4j
public abstract class DataMapCallBackExtractor implements ResultSetExtractor<List<Map<String,Object>>> {
/**
* 每次读取10000条时开始插入
*/
@Getter
private int batchQuerySize = 1000;
@Getter
private List<DbColumn> columnList = new ArrayList<>(0);
/**
* 数据条数
*/
@Getter
private int dataCount = 0;
public DataMapCallBackExtractor() {
}
public DataMapCallBackExtractor(int batchQuerySize) {
if(batchQuerySize > 1000){
this.batchQuerySize = batchQuerySize;
}
}
@Override
public List<Map<String,Object>> extractData(ResultSet rs) throws SQLException, DataAccessException {
ResultSetMetaData resultSetMetaData = rs.getMetaData();
//结果集列数
int count = resultSetMetaData.getColumnCount();
//已经执行回调的次数
int times = 0;
//读取列信息
for (int i = 1; i < count + 1; i++) {
columnList.add(SqlUtil.readResultColumn(resultSetMetaData,i));
}
//读取列信息后回调
this.columnInfoCallback(columnList);
List<Map<String, Object>> list = new ArrayList<>();
while(rs.next()){
//总条数增加
dataCount ++;
Map<String, Object> e = new LinkedHashMap<>(count);
//读取这一行的数据
for (int i = 1; i < count + 1; i++) {
e.putIfAbsent(JdbcUtils.lookupColumnName(resultSetMetaData, i), JdbcUtils.getResultSetValue(rs, i));
}
list.add(e);
//读取满10000条时开始插入数据
if(list.size() >= batchQuerySize){
times ++;
this.dataCallback(list,times,dataCount);
//处理完成清空已读取的数据,释放内存
list.clear();
}
}
//可能最后一次读取不满10000条,插入剩余的数据
if(list.size() > 0){
times ++;
this.dataCallback(list,times,dataCount);
list.clear();
}
return new ArrayList<>(0);
}
/**
* 读取batchQuerySize条数据后自定义处理回调
*/
public abstract void dataCallback(List<Map<String, Object>> list, int times, int n);
/**
* 读取列信息后回调
*/
public void columnInfoCallback(List<DbColumn> columnList){
}
}
我们拿到ResultSet后,每次只读取10000条数据存到List中,然后将这些数据插入数据库,在插入结束之后清空这个List,jvm会回收这些数据释放内存,一直重复这个过程直到结果集读取完毕,这样能保证内存中只流程10000条数据,就避免了内存泄漏的情况产生。
分批插入代码,提升插入速度:
/**
* 数据分批插入
*/
public void batchSizeUpdate(List<Map<String,Object>> list, String sql,NamedParameterJdbcTemplate namedParameterJdbcTemplate, int batchSize){
int size = list.size();
int n = size / batchSize;
int l = size % batchSize;
if(l > 0){
n++;
}
log.info("总共分"+n+"次插入");
for (int i = 0; i < n; i++) {
int start = i*batchSize;
int end = (i+1)*batchSize;
if(end > size){
end = size;
}
batchUpdate(list.subList(start,end),sql, namedParameterJdbcTemplate);
log.info("第"+(i+1)+"次插入完毕");
}
}
private void batchUpdate(List<Map<String,Object>> list, String sql,NamedParameterJdbcTemplate namedParameterJdbcTemplate){
Map<String,?> [] param = new Map[list.size()];
for(int c= 0;c<list.size();c++){
param[c] = list.get(c);
}
namedParameterJdbcTemplate.batchUpdate(sql,param);
}
最终调用方式:
//一次读取10000条后进行回调
DataMapCallBackExtractor extractor = new DataMapCallBackExtractor(10000){
@Override
public void dataCallback(List<Map<String, Object>> list, int times, int n) {
log.info("第{}次读取{}条,共{}条",times,list.size(),n);
//分批插入,一次1000条
batchSizeUpdate(list,insertSql,insertJdbcTemplate);
}
@Override
public void columnInfoCallback(List<DbColumn> columnList) {
//读取结果集之前回调,拿到列信息进行一些处理
//比如拼接插入sql
}
};
jdbcTemplate.query(sql, new HashMap<>(0),extractor);
SqlUtil中读取列信息的代码:文章来源:https://www.toymoban.com/news/detail-768619.html
/**
* 从ResultSet中读取sql列信息
* @param rs 结果集
* @param i 列位置
*/
@SneakyThrows
public static DbColumn readResultColumn(ResultSetMetaData rs,int i){
DbColumn c = new DbColumn();
c.setName(rs.getColumnName(i));
c.setComments(rs.getColumnLabel(i));
int type = rs.getColumnType(i);
c.setDataType(rs.getColumnTypeName(i));
c.setNullable(rs.isNullable(i) == ResultSetMetaData.columnNoNulls ? "N" : "Y");
if(type == Types.VARCHAR || type == Types.CHAR || type == Types.LONGVARCHAR || type == Types.CLOB){
c.setDataLength(rs.getColumnDisplaySize(i));
}else if(type == Types.NUMERIC || type == Types.INTEGER || type == Types.BIGINT || type == Types.DECIMAL
|| type == Types.DOUBLE || type == Types.FLOAT || type == Types.REAL || type == Types.SMALLINT || type == Types.TINYINT){
c.setDataLength(rs.getPrecision(i));
}else if(type == Types.DATE || type == Types.TIMESTAMP){
c.setDataLength(rs.getPrecision(i));
}
c.setDataScale(rs.getScale(i));
return c;
}
字段列信息实体:文章来源地址https://www.toymoban.com/news/detail-768619.html
import lombok.Data;
import java.util.Objects;
/**
* 数据库表字段基本信息
* @author lang.zhou
* @since 2022/10/17 14:31
*/
@Data
public class DbColumn {
private String tableName;
/**
* 字段名
*/
private String name;
/**
* 字段描述
*/
private String comments;
/**
* 可为空
*/
private String nullable = "Y";
private String dataType = null;
private Integer isPk = 0;
private Integer dataLength = 0;
private Integer dataScale = 0;
public boolean isPk(){
return name != null && isPk > 0;
}
public boolean isDate(){
return name != null && ("DATE".equalsIgnoreCase(dataType) || "TIMESTAMP".equalsIgnoreCase(dataType) || "DATETIME".equalsIgnoreCase(dataType));
}
public boolean isNumber(){
return name != null && ("NUMBER".equalsIgnoreCase(dataType) || "DECIMAL".equalsIgnoreCase(dataType) || "INTEGER".equalsIgnoreCase(dataType)
|| "INT".equalsIgnoreCase(dataType)|| "BIGINT".equalsIgnoreCase(dataType)|| "DOUBLE".equalsIgnoreCase(dataType)|| "LONG".equalsIgnoreCase(dataType)));
}
public boolean isChar(){
return name != null && "CHAR".equalsIgnoreCase(dataType);
}
public boolean allowNull(){
return !isPk() && Objects.equals(nullable,"Y");
}
}
到了这里,关于java使用jdbcTemplate查询并插入百万级数据解决方案的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!