tonglin0325的个人主页

Mongo的oplog和change stream

1.oplog#

MongoDB副本集由一组服务器组成,这些服务器都具有相同数据的副本,复制可确保客户端对副本集主副本集上的文档所做的所有更改都正确应用于其他副本集的服务器,称为secondaries副本。

MongoDB 复制的工作原理是让主节点在其操作日志(或操作日志)中记录更改,然后每个从节点读取主节点的操作日志并将所有操作按顺序应用到它们自己的文档中。

将新服务器添加到副本集时,该服务器首先对主服务器上的所有数据库和集合执行snapshot,然后读取主服务器的
 oplog 以应用自启动快照以来可能所做的所有更改。这个新服务器在赶上主服务器oplog的尾部时成为secondary服务器(并且能够处理查询)。

2.change stream#

Debezium MongoDB 连接器使用与上述类似的复制机制,尽管它实际上并没有成为副本集的成员。主要区别在于连接器不直接读取 oplog,而是将捕获和解码 oplog 委托给 MongoDB 的 Change Streams 功能。

使用change stream,MongoDB服务器将集合的更改公开为事件流。Debezium 连接器监视流并将更改传递到下游。而且,当连接器第一次看到副本集时,它会查看 oplog 以获取上次记录的事务,然后执行主数据库和集合的快照。复制完所有数据后,连接器会从之前从操作日志中读取的位置创建更改流。

当MongoDB连接器进程发生变化时,它会定期记录oplog/stream中事件发起的位置。当连接器停止时,它会记录它处理的最后一个oplog/流位置,以便在重新启动时它只是从该位置开始流式传输。换句话说,连接器可以停止、升级或维护,并在一段时间后重新启动,它将准确地从中断的位置继续,而不会丢失单个事件。当然,MongoDB的oplog通常被限制在最大大小,这意味着连接器不应该停止太久,否则oplog中的某些操作可能会在连接器有机会读取它们之前被清除。在这种情况下,重新启动时,连接器将检测缺少的oplog操作,执行快照,然后继续流式传输更改。

限制:change stream只能用于 replica set 集群和 sharded cluster 集群,单节点mongo因为没有oplog,所以不支持

参考:MongoDB Change Stream之一——上手及初体验

3.change stream相关操作#

1.获取某个database的某个collection的Change Stream

1
2
3
use xx_db
db.your_collection.watch([],{maxAwaitTimeMS:60000})

2.指定从resume token开始获取某个database的某个collection的Change Stream

1
2
db.document.watch([],{maxAwaitTimeMS:60000,resumeAfter:{"_data": "xxxxxxx"}})

其他参数可以参考官方文档:https://www.mongodb.com/docs/manual/reference/method/db.collection.watch/#mongodb-method-db.collection.watch