MongoDB之Change Stream实战

这篇具有很好参考价值的文章主要介绍了MongoDB之Change Stream实战。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

什么是 Chang Streams

Change Stream 指数据的变化事件流,MongoDB 从 3.6 版本开始提供订阅数据变更的功能。
Change Stream 是 MongoDB 用于实现变更追踪的解决方案,类似于关系数据库的触发器,但原理不完全相同:


Change Stream 触发器
触发方式 异步 同步(事务保证)
触发位置 应用回调事件 数据库触发器
触发次数 每个订阅事件的客户端 1次(触发器)
故障恢复 从上次断点重新触发 事务回滚

Change Stream 的实现原理

Change Stream 是基于 oplog 实现的,提供推送实时增量的推送功能。它在 oplog 上开启一个 tailable cursor 来追踪所有复制集上的变更操作,最终调用应用中定义的回调函数。
被追踪的变更事件主要包括:

  • insert/update/delete:插入、更新、删除;
  • drop:集合被删除;
  • rename:集合被重命名;
  • dropDatabase:数据库被删除;
  • invalidate:drop/rename/dropDatabase 将导致 invalidate 被触发, 并关闭 change stream;

MongoDB之Change Stream实战,# MongoDB,mongodb,数据库,change stream
如果只对某些类型的变更事件感兴趣,可以使用使用聚合管道的过滤步骤过滤事件:

var cs = db.user.watch([{
	$match:{operationType:{$in:["insert","delete"]}}
}])

Change Stream会采用 "readConcern:majority"这样的一致性级别,保证写入的变更不会被回滚。
因此:

  • 未开启 majority readConcern 的集群无法使用 Change Stream;
  • 当集群无法满足 {w: “majority”} 时,不会触发 Change Stream(例如 PSA 架构 中的 S 因故障宕)。

MongoShell 测试
窗口 1:

db.user.watch([],{maxAwaitTimeMS:1000000}).pretty()

窗口 2:

db.user.insert({name:"xxxx"})

变更事件字段说明:

名称 说明
_id 变更事件的 Token 对象
operationType 变更类型
fullDocument 文档完整内容
ns 监听的目标
ns.db 变更的数据库
ns.coll 变更的集合
to 对于 rename 操作变更后的目标
ns.db rename 操作后的数据库
documentKey 变更文档的键值,含 _id 字段
updateDescription 变更描述
updateDescription.updatedFields 变更中更新的字段
updateDescription.removedFields 变更中删除的字段
clusterTime 对应oplog关联的时间戳
txnNumber 事务编号,仅在多文档事务中出现,MongoDB4.0 版本支持
lsid 事务关联的会话号,仅在多文档事务中出现,MongoDB4.0 版本支持

Change Stream 故障恢复

假设在一系列写入操作的过程中,订阅 Change Stream 的应用在接收到“写3”之后 于 t0 时刻崩溃,重启后后续的变更怎么办?
MongoDB之Change Stream实战,# MongoDB,mongodb,数据库,change stream
想要从上次中断的地方继续获取变更流,只需要保留上次变更通知中的 _id 即可。 Change Stream 回调所返回的的数据带有 _id,这个 _id 可以用于断点恢复。例如:

var cs = db.collection.watch([], {resumeAfter: <_id>})

即可从上一条通知中断处继续获取后续的变更通知。

使用场景

  • 监控

用户需要及时获取变更信息(例如账户相关的表),ChangeStreams 可以提供监控功能,一旦相关的表信息发生变更,就会将变更的消息实时推送出去。

  • 分析平台

例如需要基于增量去分析用户的一些行为,可以基于 ChangeStreams 把数据拉出来,推到下游的计算平台, 比如类似 Flink、Spark 等计算平台等等。

  • 数据同步

基于 ChangeStreams,用户可以搭建额外的 MongoDB 集群,这个集群是从原端的 MongoDB 拉取过来的, 那么这个集群可以做一个热备份,假如源端集群发生网络不通等等之类的变故,备集群就可以接管服务。 还可以做一个冷备份,如用户基于 ChangeStreams 把数据同步到文件,万一源端数据库发生不可服务, 就可以从文件里恢复出完整的 MongoDB 数据库, 继续提供服务。(当然,此处还需要借助定期全量备份来一同完成恢复) 另外数据同步它不仅仅局限于同一地域,可以跨地域,从北京到上海甚至从中国到美国等等。

  • 消息推送

假如用户想实时了解公交车的信息,那么公交车的位置每次变动,都实时推送变更的信息给想了解的用户,用户能够实时收到公交车变更的数据,非常便捷实用。

注意事项

  • Change Stream 依赖于 oplog,因此中断时间不可超过 oplog 回收的最大时间窗;
  • 在执行 update 操作时,如果只更新了部分数据,那么 Change Stream 通知的也是增量部分;
  • 删除数据时通知的仅是删除数据的 _id。

Spring Boot 整合Chang Stream

(1)引入依赖

<!--spring data mongodb-->
  <dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>

(2)配置 yml

spring:
	data:
		mongodb:
			uri: mongodb://firechou:firechou@192.168.65.174:28017,192.168.65.174:28018,192.168.65.174:28019/test?authSource=admin&replicaSet=rs0

(3)配置 mongo 监听器的容器 MessageListenerContainer,spring 启动时会自动启动监听的任务用于接收 changestream

@Configuration
public class MongodbConfig {

    @Bean
    MessageListenerContainer messageListenerContainer(MongoTemplate template, DocumentMessageListener documentMessageListener) {

        Executor executor = Executors.newFixedThreadPool(5);

        MessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer(template, executor) {
            @Override
            public boolean isAutoStartup() {
                return true;
            }
        };

        ChangeStreamRequest<Document> request = ChangeStreamRequest.builder(documentMessageListener)
        .collection("user") // 需要监听的集合名
        // 过滤需要监听的操作类型,可以根据需求指定过滤条件
        .filter(Aggregation.newAggregation(Aggregation.match(
            Criteria.where("operationType").in("insert", "update",
                                               "delete"))))
        // 不设置时,文档更新时,只会发送变更字段的信息,设置UPDATE_LOOKUP会返回文档的全部信息
        .fullDocumentLookup(FullDocument.UPDATE_LOOKUP)
        .build();
        messageListenerContainer.register(request, Document.class);

        return messageListenerContainer;
    }
}

(4)配置 mongo 监听器,用于接收数据库的变更信息文章来源地址https://www.toymoban.com/news/detail-816720.html

@Component
public class DocumentMessageListener<S, T> implements MessageListener<S, T> {

    @Override
    public void onMessage(Message<S, T> message) {

        System.out.println(String.format("Received Message in collection                                         %s.\n\trawsource: %s\n\tconverted: %s",
             message.getProperties().getCollectionName(), 
             message.getRaw(), 
             message.getBody()));
    }
}

到了这里,关于MongoDB之Change Stream实战的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • MongoDB数据库从入门到精通系列文章之:MongoDB数据库百篇技术文章汇总

    MongoDB数据库系列文章持续更新中: 更多数据库内容请阅读博主数据库专栏,数据库专栏涵盖了Mysql、SQLServer、PostgreSQL、MongoDB、Oracle、Cassandra等数据库 数据库专栏 文章名称 文章链接 数据库安装部署系列之:部署Mongodb5.0.6高可用集群详细步骤 数据库安装部署系列之:部署M

    2024年02月11日
    浏览(54)
  • mongodb 数据库管理(数据库、集合、文档)

    目录 一、数据库操作 1、创建数据库 2、删除数据库 二、集合操作 1、创建集合 2、删除集合 三、文档操作 1、创建文档 2、 插入文档 3、查看文档 4、更新文档 1)update() 方法 2)replace() 方法 创建数据库的语法格式如下: 如果数据库不存在,则创建数据库,否则切换到该数据

    2024年02月12日
    浏览(49)
  • Mongodb连接数据库

    npm init   npm i mongoose  const mongoose=require(\\\"mongoose\\\") mongoose.connect(\\\"mongodb://127.0.0.1:27017/user\\\") 说明:mongodb是协议,user是数据库,如果没有会自动创建user数据库 。 node 文件名     mongoose.disconnect()

    2024年02月15日
    浏览(59)
  • mongodb数据库操作

    1、启动mongodb 在mongodb启动命令中 --dbpath 指定mongodb的数据存储路径 --logpath 指定mongodb的日志存储路径 2、停止mongodb 第一步先进入mongo命令行模式 第二步,使用use admin 命令进入admin数据库 第三步,执行 db.shutdownServer()命令 停止服务。代码及显示如下:  2 、导出Mongodb数据 mon

    2024年02月09日
    浏览(50)
  • MongoDb数据库

    1.显示所有数据库: show dbs 2.切换到指定数据库,如果没有则自动创建数据库 use databaseName 3.显示当前所在数据库 db 4.删除当前数据库 use 库名 db.dropDatabase() 1.创建集合 db.createCollection(\\\'集合名称\\\') 2.显示当前数据库中所有集合 show colletions  3.删除某个集合 db.xxx.drop(); 4.重命名集

    2024年02月04日
    浏览(54)
  • MongoDB数据库安装

    MongoDB数据的特点: 面相文档存储的分布式数据库 具有很强的扩展性 支持丰富的查询表达式,很接近于关系性数据库 使用类似于json的结构保存数据,可以轻易的查询到文档中内嵌的对象及数组 首先去官网下载安装包 Download MongoDB Community Server | MongoDB 启动MongoDB数据的服务 可

    2024年02月11日
    浏览(57)
  • 【数据库MongoDB】MongoDB与大数据关系以及MongoDB中重要的进程:mongod进程与mongo进程关系

    云计算的定义有多种说法,对于到底什么是云计算,我们至少可以找到100种解释。目前广为接受的是美国国家标准与技术研究院定义: 云计算是一种按使用量付费的模式,这种模式提供可用的、便捷的、按需的网络访问,进入可配置的计算资源共享池(资源包括网络、服务器

    2024年02月02日
    浏览(68)
  • MongoDB 数据库详细介绍

    MongoDB(来自“Humongous”,意为巨大的)是一个开源、高性能、无模式(NoSQL)、文档导向的分布式数据库。它以其灵活性、可扩展性和强大的查询功能而闻名于世。MongoDB 使用 JSON 格式的文档来存储数据,适用于多种应用场景,包括 Web 应用、移动应用、日志存储、大数据等。

    2024年02月12日
    浏览(65)
  • MongoDB:数据库初步应用

    1.MongoDBCompass连接数据库 连接路径:mongodb://用户名:密码@localhost:27017/ 2.创建数据库(集合) MongoDB中数据库被称为集合.  MongoDBCompass连接后,点击红色框加号创建集合,点击蓝色框加号创建文档(数据表) 文档中的数据结构(相当于表中的列)设计不用管,添加数据的时候,自动创建列和数

    2024年02月12日
    浏览(47)
  • python数据库——Mongodb

    MongoDB 是一个开源的 NoSQL数据库系统,它是一个面向文档的数据库,使用 JSON 格式来存储和查询数据。MongoDB 是一个非关系型数据库,它的设计目标是以高性能、高可用性和可扩展性为特点,适用于处理大量的非结构化数据。 特点: MongoDB 是一个面向文档存储的数据库,操作

    2024年02月07日
    浏览(55)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包