解决在使用 Elasticsearch(ES)多线程批量操作时导致并发一致性的问题!!

这篇具有很好参考价值的文章主要介绍了解决在使用 Elasticsearch(ES)多线程批量操作时导致并发一致性的问题!!。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

先说一下什么是数据库数据库中并发一致性问题!

1、在并发环境下,事务的隔离性很难保证,因此会出现很多并发一致性问题。

  • 数据丢失
    T1 和 T2 两个事务都对一个数据进行修改,T1 先修改,T2 随后修改,T2 的修改覆盖了 T1 的修改。
    解决在使用 Elasticsearch(ES)多线程批量操作时导致并发一致性的问题!!
  • 读脏数据
    T1 修改一个数据,T2 随后读取这个数据。如果 T1 撤销了这次修改,那么 T2 读取的数据是脏数据。
    解决在使用 Elasticsearch(ES)多线程批量操作时导致并发一致性的问题!!
  • 不可重复读
    T2 读取一个数据,T1 对该数据做了修改。如果 T2 再次读取这个数据,此时读取的结果和第一次读取的结果不同。
    解决在使用 Elasticsearch(ES)多线程批量操作时导致并发一致性的问题!!
  • 幻影读
    T1 读取某个范围的数据,T2 在这个范围内插入新的数据,T1 再次读取这个范围的数据,此时读取的结果和和第一次读取的结果不同。

当然上面只是提一下什么是一致性的问题。

现在就用一个在开发中的实际例子来说一下ES中多线程的情况下如何避免不一致性的问题。
这里我使用了批量更新数据的一个例子;
业务: 前端发送任务后,将ES中所有符合条件的数据的某一个字段全部更新一遍。由于数据量太大,所以准备使用多线程的方式去执行。例如前端发送了两个任务,这就代表我需要去更新两次。但是如果使用多线程就会导致最后一次更新完的数据可能将第一次更新的部分数据覆盖。

在多线程情况下,更新 Elasticsearch 数据库可能会导致并发一致性问题。这意味着,如果多个线程同时访问数据库并进行更新操作,可能会导致冲突,从而使数据库的状态混乱不堪。

为了解决这个问题,可以使用 Elasticsearch 的冲突解决机制。这包括使用版本号来跟踪文档的更新,并在更新操作时使用比较-替换模式(compare-and-swap)。
例如,假设你有一个文档,其中包含一个版本号字段:

	{
	  "title": "My Document",
	  "description": "This is my document",
	  "version": 1
	}

当你想要更新这个文档时,可以使用 version 字段来进行并发控制。例如,假设你想要更新文档的 description 字段:


POST my_index/my_type/123/_update
{
  "doc": {
    "description": "This is an updated description"
  },
  "version": 1
}

这将会检查文档的当前版本是否为 1,并只有在版本匹配的情况下才会执行更新操作。如果文档的版本已被更新,则会返回冲突错误。

在 Java 中使用 Elasticsearch 的 HighLevelRestClient 实现版本号控制时,可以使用 UpdateRequest 类来创建更新请求。该请求可以使用 setIfSeqNo(long) 和 setIfPrimaryTerm(long) 方法来设置版本号。
例如,假设你想要更新文档的 description 字段,并且想要使用版本号进行并发控制:

RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(new HttpHost("localhost", 9200, "http")));

UpdateRequest request = new UpdateRequest("my_index", "my_type", "123")
  .doc("description", "This is an updated description")
  .setIfSeqNo(1)
  .setIfPrimaryTerm(1);

client.update(request);

这将会检查文档的当前版本是否为 1,并只有在版本匹配的情况下才会执行更新操作。如果文档的版本已被更新,则会返回冲突错误。

如果多次请求更新我怎么知道版本号具体设置多少?

在获取文档时获取版本号。可以使用 GetRequest.fetchSourceContext(FetchSourceContext) 方法来获取文档的版本号。例如:

GetRequest request = new GetRequest("my_index", "my_type", "123")
  .fetchSourceContext(new FetchSourceContext(true, new String[]{"_seq_no", "_primary_term"}, null));

GetResponse response = client.get(request);
long seqNo = response.getSeqNo();
long primaryTerm = response.getPrimaryTerm();

在执行更新操作时使用 Get 操作获取版本号。在执行更新操作之前,可以使用 Get 操作获取文档的版本号,然后将版本号设置到更新请求中。例如:文章来源地址https://www.toymoban.com/news/detail-439810.html

GetRequest getRequest = new GetRequest("my_index", "my_type", "123")
  .fetchSourceContext(new FetchSourceContext(true, new String[]{"_seq_no", "_primary_term"}, null));
//在执行前获取版本号
GetResponse getResponse = client.get(getRequest);
long seqNo = getResponse.getSeqNo();
long primaryTerm = getResponse.getPrimaryTerm();
//设置版本号
UpdateRequest updateRequest = new UpdateRequest("my_index", "my_type", "123")
  .doc("description", "This is an updated description")
  .setIfSeqNo(seqNo)
  .setIfPrimaryTerm(primaryTerm);

client.update(updateRequest);

到了这里,关于解决在使用 Elasticsearch(ES)多线程批量操作时导致并发一致性的问题!!的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 使用python对ES进行批量操作

    在kibana中进行批量操作:  使用python代码进行对es进行批量操作 示例代码: 运行结果: 案例一: python读取mysql数据写入ES: 参考博文:https://www.jianshu.com/p/c0e42121f054 python 批量导入mysql数据到Elastic Search_追逐时光的博客-CSDN博客  参考博文: Python简单实现与ElasticSearch交互插入

    2024年02月13日
    浏览(37)
  • Java使用线程池批量处理数据操作

    疑问思路: 1.如何保证数据按顺序批量处理 2.如何保证数据全部处理完统一返回 3.如何保证是多任务异步操作 4.如何提高运行效率,减少运行时间 1.使用ArrayList 插入数据有序且可重复 2.CountDownLatch / Future / CompletableFuture 3.多线程 4.线程池创建多线程 具体流程: 获取需要进行批

    2024年02月09日
    浏览(57)
  • C++并发线程 - 如何线程间共享数据【详解:如何使用锁操作】

    点击进入系列文章目录 C++技能系列 Linux通信架构系列 C++高性能优化编程系列 深入理解软件架构设计系列 高级C++并发线程编程 期待你的关注哦!!! 快乐在于态度,成功在于细节,命运在于习惯。 Happiness lies in the attitude, success lies in details, fate is a habit. 具体哪个线程按何种

    2024年02月08日
    浏览(32)
  • C# 使用屏障来使多线程并发操作保持同步

    以下是微软官方对屏障类的介绍,System.Threading.Barrier 可用来作为实现并发同步操作的基本单元,让多个线程(参与者)分阶段并行处理目标算法。在达到代码中的屏障点之前,每个参与者将继续执行,屏障表示工作阶段的末尾;单个参与者到达屏障后将被阻止,直至所有参与者

    2024年01月24日
    浏览(38)
  • 使用Elasticsearch进行数据批量操作

    Elasticsearch是一个开源的搜索和分析引擎,基于Lucene库开发。它可以用来实现文本搜索、数据分析、实时数据处理等功能。在大数据时代,Elasticsearch成为了处理和分析大量数据的首选工具之一。 数据批量操作是Elasticsearch中的一种常见操作,它可以用来对大量数据进行创建、更

    2024年02月22日
    浏览(36)
  • 多线程批量同步数据到ES

    需求背景:新增了ES,现在要讲数据库某张表的数据同步到ES中,百万级的数据量一次性读取同步肯定不行,所以可以用多线程同步执行同步数据。 1.线程池配置类 2.ES配置类 3.主要代码逻辑

    2024年01月24日
    浏览(29)
  • 项目中使用es(一):使用springboot操作elasticsearch

    写在前面 对于elasticsearch的搭建,前面写了一篇文章有简单描述如何搭建es,本次主要介绍如何在项目里使用,主要使用ElasticsearchRepository和ElasticsearchRestTemplate操作es。 搭建项目环境和选择合适版本 首先选择合适的项目组件版本,因为es版本和springboot版本有对应,如果不合适会

    2024年02月08日
    浏览(32)
  • 项目中使用es(二):使用RestHighLevelClient操作elasticsearch

    写在前面 之前写了有关elasticsearch的搭建和使用springboot操作elasticsearch,这次主要简单说下使用RestHighLevelClient工具包操作es。 搭建环境和选择合适的版本 环境还是以springboot2.7.12为基础搭建的,不过这不重要,因为这次想说的是RestHighLevelClient操作elasticsearch,RestHighLevelClient版本

    2024年02月14日
    浏览(38)
  • Elasticsearch学习3-使用RestClient操作es

    JavaREST客户端有两种模式: Java Low Level REST Client:ES官方的低级客户端。低级别的客户端通过http与Elasticearch集群通信。 Java High Level REST Client:ES官方的高级客户端。基于上面的低级客户端,也是通过HTTP与ES集群进行通信。它提供了更多的接口。 此外Spring也对RestClient进行了封装

    2024年02月11日
    浏览(40)
  • ES中同时使用should和must导致只有must生效解决方案

    背景:es嵌套查询,条件a等于某一个值且条件b等于某两个值甚至更多。 第一步:单独条件b采用 should 匹配多个值,可以查到预期的结果,代码如下: 第二步:单独条件a采用must匹配单个值,也可以查到预期的结果,代码如下: 第三步:合并起来查询,a和b条件同时启用,代

    2024年02月15日
    浏览(25)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包