需求背景:新增了ES,现在要讲数据库某张表的数据同步到ES中,百万级的数据量一次性读取同步肯定不行,所以可以用多线程同步执行同步数据。
1.线程池配置类
@Configuration
public class ThreadPoolConfig {
/**
* 核心线程池大小
*/
private static final int CORE_POOL_SIZE = 17;
/**
* 最大可创建的线程数
*/
private static final int MAX_POOL_SIZE = 50;
/**
* 队列最大长度
*/
private static final int QUEUE_CAPACITY = 1000;
/**
* 线程池维护线程所允许的空闲时间
*/
private static final int KEEP_ALIVE_SECONDS = 500;
@Bean("taskExecutor")
public ExecutorService executorService(){
//使用原子类,保证线程命名的唯一性和连续性
AtomicInteger c = new AtomicInteger(1);
//创建链表结构的阻塞队列
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(QUEUE_CAPACITY);
return new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_SECONDS,
TimeUnit.MILLISECONDS,
queue,
r -> new Thread(r, "es-pool-" + c.getAndIncrement()),
new ThreadPoolExecutor.DiscardPolicy()
);
}
}
2.ES配置类文章来源:https://www.toymoban.com/news/detail-821558.html
@Getter
@Setter
@Configuration
@ConfigurationProperties(prefix = "elasticsearch")
public class ElasticSearchConfig {
private String host;
private int port;
@Bean
public RestHighLevelClient client(){
return new RestHighLevelClient(RestClient.builder(
new HttpHost(
host,
port,
"http"
)
));
}
}
3.主要代码逻辑文章来源地址https://www.toymoban.com/news/detail-821558.html
@Service
@Transactional
@Slf4j
public class TestService{
@Autowired
private TestMapper testMapper;
@Autowired
private RestHighLevelClient client; //ES客户端
@Autowired
private ExecutorService executorService; //线程池
private static final String ARTICLE_ES_INDEX = "test_info";//ES索引库名称
private static final int PAGE_SIZE = 5000; //每页记录数
/**
* 批量导入逻辑
*/
public void importAll() {
//查询数据总数
int count = testMapper.selectCount();
//总页数用 数据库数据总数%每页记录数
int totalPageSize = count % PAGE_SIZE == 0 ? count / PAGE_SIZE : count / PAGE_SIZE + 1;
//记录开始执行时间
long startTime = System.currentTimeMillis();
//一共有多少页,就创建多少个CountDownLatch的计数
CountDownLatch countDownLatch = new CountDownLatch(totalPageSize);
int fromIndex;
List<TestVo> testVoList= null;
for (int i = 0; i < totalPageSize; i++) {
//起始分页条数
fromIndex = i * PAGE_SIZE;
//查询数据库当前页数的数据 SELECT*FROM 表名 LIMIT fromIndex,PAGE_SIZE
testVoList= testMapper.selectCurrentData(fromIndex, PAGE_SIZE);
//创建线程,做批量插入es数据操作
TaskThread taskThread = new TaskThread(testVoList, countDownLatch);
//把当前线程任务交由线程池执行
executorService.execute(taskThread);
}
//调用await()方法,用来等待计数归零
countDownLatch.await();
long endTime = System.currentTimeMillis();
log.info("es索引数据批量导入共:{}条,共消耗时间:{}秒", count, (endTime - startTime) / 1000);
}
//这里为了方便,写了线程内部类。
class TaskThread implements Runnable {
List<TestVo> testVoList;
CountDownLatch cdl;
//数据和倒计时锁
public TaskThread(List<TestVo> testVoList, CountDownLatch cdl) {
this.articleList = articleList;
this.cdl = cdl;
}
@Override
public void run() {
//创建ES对象,并指定名称
BulkRequest bulkRequest = new BulkRequest(ARTICLE_ES_INDEX);
for (SearchArticleVo searchArticleVo : articleList) {
//存储到ES
bulkRequest.add(new IndexRequest().id(searchArticleVo.getId().toString())
.source(JSON.toJSONString(testVoList), XContentType.JSON));
}
//发送请求,批量添加数据到es索引库中
client.bulk(bulkRequest, RequestOptions.DEFAULT);
//添加成功后计数减一
cdl.countDown();
}}}
到了这里,关于多线程批量同步数据到ES的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!