1 需求
在项目开发中需要处理100万多的数据,这些数据需要从mysql数据库中读取出来,再通过调用其他平台的接口推送数据。由于时间紧迫,数据需要在短时间内完成推送,采用单线程推送很慢,所以采用多线程推送来提高效率。
2 配置多线程
2.1 application.yml
thread-pool:
core-pool-size: 4
max-pool-size: 16
queue-capacity: 80
keep-alive-seconds: 120
2.2 创建ThreadPoolProperties
import lombok.Data;
import org.springframework.stereotype.Component;
import org.springframework.boot.context.properties.ConfigurationProperties;
@Data
@Component
@ConfigurationProperties(prefix = "thread-pool")
public class ThreadPoolProperties {
/**
* 线程池创建时候初始化的线程数
*/
private int corePoolSize;
/**
* 线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
*/
private int maxPoolSize;
/**
* 用来缓冲执行任务的队列
*/
private int queueCapacity;
/**
* 允许线程的空闲时间:当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
*/
private int keepAliveSeconds;
}
2.3 创建ThreadPoolConfig
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@EnableAsync
@Configuration
public class ThreadPoolConfig {
private final ThreadPoolProperties threadPoolProperties;
@Autowired
public ThreadPoolConfig(ThreadPoolProperties threadPoolProperties) {
this.threadPoolProperties = threadPoolProperties;
}
@Bean(name = "threadPoolTaskExecutor")
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(threadPoolProperties.getCorePoolSize());
executor.setMaxPoolSize(threadPoolProperties.getMaxPoolSize());
executor.setQueueCapacity(threadPoolProperties.getQueueCapacity());
executor.setKeepAliveSeconds(threadPoolProperties.getKeepAliveSeconds());
executor.setThreadNamePrefix("thread-pool-");
return executor;
}
}
3 多线程批量数据处理文章来源:https://www.toymoban.com/news/detail-835171.html
public RequestResult multiThreadPush() {
List<HistoryStudent> historyStudentList = historyStudentMapper.getList(0, 65867);
// 分割集合
List<List<HistoryStudent>> partitionData = partitionData(historyStudentList, 4);
ThreadPoolTaskExecutor executor = SpringUtil.getBean("threadPoolTaskExecutor", ThreadPoolTaskExecutor.class);
// 计数器
CountDownLatch latch = new CountDownLatch(partitionData.size());
for (List<HistoryStudent> historyStudents : partitionData) {
executor.execute(() -> {
try {
for (HistoryStudent historyStudent : historyStudents) {
// 单个数据处理
//processSingleData(historyStudent);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
latch.countDown();
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
return RequestResult.success();
}
private List<List<HistoryStudent>> partitionData(List<HistoryStudent> dataList, int partitionSize) {
List<List<HistoryStudent>> partitions = new ArrayList<>();
int size = dataList.size();
int batchSize = size / partitionSize;
for (int i = 0; i < partitionSize; i++) {
int fromIndex = i * batchSize;
int toIndex = (i == partitionSize - 1) ? size : fromIndex + batchSize;
partitions.add(dataList.subList(fromIndex, toIndex));
}
return partitions;
}
4 参考博客
Java多线程批量处理、线程池的使用
Java多线程处理大批量数据
java多线程批量处理数据文章来源地址https://www.toymoban.com/news/detail-835171.html
到了这里,关于Java 多线程批量处理数据的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!