Java 实现对ES的Scroll读以及分片Scroll读
本文实现的是使用Java对ES索引全量数据的读取操作。ES版本是7.14。采用两种方式,一种是不分片读,一种是分片读。
对ES实现全量读取需要依赖到ES所提供的API,这里需要添加两个依赖
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.14</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.14</version>
<scope>compile</scope>
</dependency>
方式一不分片Scroll读取
public RestHighLevelClient initClient(String ip,String port,String scheme,String username,String password)
{
int intPort = Integer.parseInt(port);
HttpHost host=new HttpHost( ip, intPort, scheme);
RestClientBuilder builder=RestClient.builder(host);
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username,password));
builder.setHttpClientConfigCallback(f -> f.setDefaultCredentialsProvider(credentialsProvider));
RestHighLevelClient client = new RestHighLevelClient(builder);
return client;
}
public void testReadSpeed() throws IOException {
//与es节点建立连接
RestHighLevelClient esClient = initClient("192.168.11.137","9200","http","elastic","123456");
int scrollSize = 1000;//一次读取的doc数量
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.matchAllQuery());//读取全量数据
searchSourceBuilder.size(scrollSize);
Scroll scroll1 = new Scroll(TimeValue.timeValueMinutes(3));//设置一次读取的最大连接时长
String index1 = "alltype_test";//读取的索引名称
SearchRequest searchRequest1 = new SearchRequest(index1);
searchRequest1.types("_doc");
searchRequest1.source(searchSourceBuilder);
searchRequest1.scroll(scroll1);
SearchResponse searchResponse1 = null;
searchResponse1 = esClient.search(searchRequest1,RequestOptions.DEFAULT);
String scrollId1 = searchResponse1.getScrollId();
SearchHit[] hits1 = searchResponse1.getHits().getHits();
long st = System.currentTimeMillis();
while (hits1.length > 0) {
try {
SearchScrollRequest searchScrollRequest1 = new SearchScrollRequest(scrollId1);
searchScrollRequest1.scroll(scroll1);
SearchResponse searchScrollResponse1 = null;
searchScrollResponse1 = esClient.searchScroll(searchScrollRequest1,RequestOptions.DEFAULT);
scrollId1 = searchScrollResponse1.getScrollId();
hits1 = searchScrollResponse1.getHits().getHits();
} catch (IOException e) {
e.printStackTrace();
}
}
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(scrollId1);
try {
esClient.clearScroll(clearScrollRequest,RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("读取用时:" + (System.currentTimeMillis() - st));
}
方式二分片读取
分片就是将一个索引的全量数据分成几块,对每个块分别生成一个Scroll对象来读取。分片的数量最佳是根据索引的shards数来定。文章来源:https://www.toymoban.com/news/detail-504460.html
/**
*读线程
*/
class ReadTread extends Thread{
private RestHighLevelClient esClient;
private SliceBuilder slice;
private int scrollSize;
private String index;
public ReadTread(RestHighLevelClient esClient, SliceBuilder slice, int scrollSize, String index){
this.esClient = esClient;
this.slice = slice;
this.scrollSize = scrollSize;
this.index = index;
}
@Override
public void run() {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.matchAllQuery());
searchSourceBuilder.size(scrollSize);
Scroll scroll1 = new Scroll(TimeValue.timeValueMinutes(3));
SearchRequest searchRequest1 = new SearchRequest(index);
searchRequest1.types("_doc");
searchRequest1.source(searchSourceBuilder.slice(slice).sort("_doc"));
searchRequest1.scroll(scroll1);
SearchResponse searchResponse1 = null;
try {
searchResponse1 = esClient.search(searchRequest1,RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
String scrollId1 = searchResponse1.getScrollId();
SearchHit[] hits1 = searchResponse1.getHits().getHits();
while (hits1.length > 0) {
try {
SearchScrollRequest searchScrollRequest1 = new SearchScrollRequest(scrollId1);
searchScrollRequest1.scroll(scroll1);
SearchResponse searchScrollResponse1 = null;
searchScrollResponse1 = esClient.searchScroll(searchScrollRequest1,RequestOptions.DEFAULT);
scrollId1 = searchScrollResponse1.getScrollId();
hits1 = searchScrollResponse1.getHits().getHits();
} catch (IOException e) {
e.printStackTrace();
}
}
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(scrollId1);
try {
esClient.clearScroll(clearScrollRequest,RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
}
}
public RestHighLevelClient initClient(String ip,String port,String scheme,String username,String password)
{
int intPort = Integer.parseInt(port);
HttpHost host=new HttpHost( ip, intPort, scheme);
RestClientBuilder builder=RestClient.builder(host);
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username,password));
builder.setHttpClientConfigCallback(f -> f.setDefaultCredentialsProvider(credentialsProvider));
RestHighLevelClient client = new RestHighLevelClient(builder);
return client;
}
public void testSearchScrollSlice() throws IOException {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(4);//创建一个大小为4的线程池
RestHighLevelClient esClient = initClient("192.168.11.137","9200","http","elastic","123456");
int scrollSize = 1000;
String index1 = "alltype";
GetSettingsRequest settingsRequest = new GetSettingsRequest().indices(index1);
//获取索引的shards数量
settingsRequest.names("index.number_of_shards");
GetSettingsResponse getSettingsResponse = esClient.indices().getSettings(settingsRequest,RequestOptions.DEFAULT);
String num = getSettingsResponse.getSetting(index1, "index.number_of_shards");
int shardnum = Integer.parseInt(num);
long st = System.currentTimeMillis();
for(int i=0; i<shardnum; i++) {
SliceBuilder slice = new SliceBuilder(i,shardnum);
ReadTread readTread = new ReadTread(esClient, slice, scrollSize, index1, count);
executor.schedule(readTread, 0, TimeUnit.MILLISECONDS);
}
executor.shutdown();
while (!executor.isTerminated()) {
}
System.out.println("读取用时:" + (System.currentTimeMillis() - st));
}
创作不易,点赞关注予我动力,有问题欢迎留言和私信!文章来源地址https://www.toymoban.com/news/detail-504460.html
到了这里,关于Java 实现对ES的Scroll读以及分片Scroll读--全量数据读取的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!