什么是mongodb Change Streams?
cookqq ›博客列表 ›MongoDB

什么是mongodb Change Streams?

2024-03-28 21:41:16.0|分类: MongoDB|浏览量: 545

摘要: Change Stream 指数据的变化事件流,MongoDB 从 3.6 版本开始提供订阅数据变更的功能。

Change Stream 是 MongoDB 用于实现变更追踪的解决方案,类似于关系数据库的触发器,但原理不完全相同:

Change Stream触发器
触发方式异步同步(事务保证)
触发位置应用回调事件数据库触发器
触发次数每个订阅事件的客户端1次(触发器)
故障恢复从上次断点重新触发事务回滚


Change Stream 的实现原理

Change Stream 是基于 oplog 实现的,提供推送实时增量的推送功能。它在 oplog 上开启一个 tailable cursor 来追踪所有复制集上的变更操作,最终调用应用中定义的回调函数。

被追踪的变更事件主要包括:

insert/update/delete:插入、更新、删除;

drop:集合被删除;

rename:集合被重命名;

dropDatabase:数据库被删除;

invalidate:drop/rename/dropDatabase 将导致 invalidate 被触发, 并关闭 change stream;


由于Change Stream 利用了存储在 oplog 中的信息,因此对于单进程部署的MongoDB无法支持Change Stream功能,其只能用于启用了副本集的独立集群或分片集群

监听目标

名称说明
单个集合除系统库(admin/local/config)之外的集合,3.6版本支持
单个数据库除系统库(admin/local/config)之外的数据库集合,4.0版本支持
整个集群整个集群内除去系统库( (admin/local/config)之外的集合 ,4.0版本支持



使用条件


1、WiredTiger 存储引擎

2、副本集 (测试环境下,也可以使用单节点的副本集) 或分片集群部署

3、副本集协议版本:pv1 (默认)

4、4.0 版本之前允许 Majority Read Concern: replication.enableMajorityReadConcern = true (默认允许)//

5、MongoDB 用户拥有 find 和 changeStream 权限


Change Events 

Change Events 是 Change Streams 返回的变更记录,其数据结构如下所示:

{

   _id : { <BSON Object> },

   "operationType" : "<operation>",

   "fullDocument" : { <document> },

   "ns" : {

      "db" : "<database>",

      "coll" : "<collection>"

   },

   "to" : {

      "db" : "<database>",

      "coll" : "<collection>"

   },

   "documentKey" : { "_id" : <value> },

   "updateDescription" : {

      "updatedFields" : { <document> },

      "removedFields" : [ "<field>", ... ],

      "truncatedArrays" : [

         { "field" : <field>, "newSize" : <integer> },

         ...

      ]

   },

   "clusterTime" : <Timestamp>,

   "txnNumber" : <NumberLong>,

   "lsid" : {

      "id" : <UUID>,

      "uid" : <BinData>

   }

}


变更事件字段说明:

名称说明

_id 变更事件的Token对象

operationType 变更类型(见下面介绍)

fullDocument 文档内容

ns 监听的目标

ns.db 变更的数据库

ns.coll 变更的集合

documentKey 变更文档的键值,含_id字段

updateDescription 变更描述

updateDescription.updatedFields 变更中更新字段

updateDescription.removedFields 变更中删除字段

clusterTime 对应oplog的时间戳

txnNumber 事务编号,仅在多文档事务中出现,4.0版本支持

lsid 事务关联的会话编号,仅在多文档事务中出现,4.0版本支持


Update Lookup

由于 oplog 的更新操作仅包含了有变更后的字段,变更后完整的文档无法从 oplog 直接获取,但是在转换为 UPSERT 模式的 changelog 时,UPDATE_AFTER RowData 必须拥有完整行记录。Change Streams 通过设置 fullDocument = updateLookup,可以在获取变更记录时返回该文档的最新状态。另外,Change Event 的每条记录都包含 documentKey (_id 以及 shard key),标识发生变更记录的主键信息,即满足幂等更新的条件。所以通过 Update Lookup 特性,可以将 MongoDB 的变更记录转换成 Flink 的 UPSERT changelog。


java实现代码:

MongoCollection<Document> mongoCollection = formMongo.getCollection(数据库名字, table);

// Create $match pipeline stage.
List<Bson> pipeline = Collections.singletonList(
		Aggregates.match(Filters.in("operationType", Arrays.asList("insert",
		 "update", "delete"))));

// Create the change stream cursor, passing the pipeline to the
// collection.watch() method
ChangeStreamIterable<Document> changeStreamIterable = mongoCollection.watch(pipeline);
// 不设置时,文档更新时,只会发送变更字段的信息,设置UPDATE_LOOKUP会返回文档的全部信息
changeStreamIterable.fullDocument(FullDocument.UPDATE_LOOKUP);

MongoCursor<ChangeStreamDocument<Document>> cursor = changeStreamIterable.iterator();
while (cursor.hasNext()) {
	ChangeStreamDocument<Document> next = cursor.next();
	String operation = next.getOperationType().getValue();
	String tableName = next.getNamespace().getCollectionName();
	System.out.println(tableName + ">>" + operation + ":" + JSON.toJSONString(next));
}


启动程序之后,对记录进行增加和删除等操作,操作日志如下:

{
    "clusterTime": {
        "array": false,
        "binary": false,
        "boolean": false,
        "bsonType": "TIMESTAMP",
        "dBPointer": false,
        "dateTime": false,
        "decimal128": false,
        "document": false,
        "double": false,
        "inc": 2,
        "int32": false,
        "int64": false,
        "javaScript": false,
        "javaScriptWithScope": false,
        "null": false,
        "number": false,
        "objectId": false,
        "regularExpression": false,
        "string": false,
        "symbol": false,
        "time": 1711619810,
        "timestamp": true,
        "value": 7351351107135733762
    }
,
    "databaseName": "数据库名字",
    "documentKey": {
        "_id": {
            "array": false,
            "binary": false,
            "boolean": false,
            "bsonType": "STRING",
            "dBPointer": false,
            "dateTime": false,
            "decimal128": false,
            "document": false,
            "double": false,
            "int32": false,
            "int64": false,
            "javaScript": false,
            "javaScriptWithScope": false,
            "null": false,
            "number": false,
            "objectId": false,
            "regularExpression": false,
            "string": true,
            "symbol": false,
            "timestamp": false,
            "value": "104esmb"
        }

    }
,
    "fullDocument": {
        "_id": "104esmb",
        "status": true,
        "query_status": true,
        "source_id": "104esmq",
        "data_set": "[\"104esmb\"]",
        "104eslz": "河北省",
        "104esm1": "廊坊市",
        "104esm2": "固安",
        "104esm3": "conca",
        "104esm4": 36
    }
,
    "namespace": {
        "collectionName": "402881708e846308018e8463d93e0016",
        "databaseName": "数据库名字",
        "fullName": "数据库名字.402881708e846308018e8463d93e0016"
    }
,
    "namespaceDocument": {
        "db": {
            "array": false,
            "binary": false,
            "boolean": false,
            "bsonType": "STRING",
            "dBPointer": false,
            "dateTime": false,
            "decimal128": false,
            "document": false,
            "double": false,
            "int32": false,
            "int64": false,
            "javaScript": false,
            "javaScriptWithScope": false,
            "null": false,
            "number": false,
            "objectId": false,
            "regularExpression": false,
            "string": true,
            "symbol": false,
            "timestamp": false,
            "value": "数据库名字"
        }
,
        "coll": {
            "array": false,
            "binary": false,
            "boolean": false,
            "bsonType": "STRING",
            "dBPointer": false,
            "dateTime": false,
            "decimal128": false,
            "document": false,
            "double": false,
            "int32": false,
            "int64": false,
            "javaScript": false,
            "javaScriptWithScope": false,
            "null": false,
            "number": false,
            "objectId": false,
            "regularExpression": false,
            "string": true,
            "symbol": false,
            "timestamp": false,
            "value": "402881708e846308018e8463d93e0016"
        }

    }
,
    "operationType": "UPDATE",
    "resumeToken": {
        "_data": {
            "array": false,
            "binary": false,
            "boolean": false,
            "bsonType": "STRING",
            "dBPointer": false,
            "dateTime": false,
            "decimal128": false,
            "document": false,
            "double": false,
            "int32": false,
            "int64": false,
            "javaScript": false,
            "javaScriptWithScope": false,
            "null": false,
            "number": false,
            "objectId": false,
            "regularExpression": false,
            "string": true,
            "symbol": false,
            "timestamp": false,
            "value": "8266053EE2000000022B022C0100296E5A10044B888E952F824B12B38FA3A2314E01D3463C5F6964003C31303465736D62000004"
        }

    }
,
    "updateDescription": {
        "removedFields": [

        ]
,
        "truncatedArrays": [

        ]
,
        "updatedFields": {
            "104esm3": {
                "array": false,
                "binary": false,
                "boolean": false,
                "bsonType": "STRING",
                "dBPointer": false,
                "dateTime": false,
                "decimal128": false,
                "document": false,
                "double": false,
                "int32": false,
                "int64": false,
                "javaScript": false,
                "javaScriptWithScope": false,
                "null": false,
                "number": false,
                "objectId": false,
                "regularExpression": false,
                "string": true,
                "symbol": false,
                "timestamp": false,
                "value": "conca"
            }
,
            "source_id": {
                "array": false,
                "binary": false,
                "boolean": false,
                "bsonType": "STRING",
                "dBPointer": false,
                "dateTime": false,
                "decimal128": false,
                "document": false,
                "double": false,
                "int32": false,
                "int64": false,
                "javaScript": false,
                "javaScriptWithScope": false,
                "null": false,
                "number": false,
                "objectId": false,
                "regularExpression": false,
                "string": true,
                "symbol": false,
                "timestamp": false,
                "value": "104esmq"
            }
,
            "updated_at": {
                "array": false,
                "binary": false,
                "boolean": false,
                "bsonType": "DATE_TIME",
                "dBPointer": false,
                "dateTime": true,
                "decimal128": false,
                "document": false,
                "double": false,
                "int32": false,
                "int64": false,
                "javaScript": false,
                "javaScriptWithScope": false,
                "null": false,
                "number": false,
                "objectId": false,
                "regularExpression": false,
                "string": false,
                "symbol": false,
                "timestamp": false,
                "value": 1711619810720
            }

        }

    }

}


一键分享文章

分类列表

  • • struts源码分析
  • • flink
  • • struts
  • • redis
  • • kafka
  • • ubuntu
  • • zookeeper
  • • hadoop
  • • activiti
  • • linux
  • • 成长
  • • NIO
  • • 关键词提取
  • • mysql
  • • android studio
  • • zabbix
  • • 云计算
  • • mahout
  • • jmeter
  • • hive
  • • ActiveMQ
  • • lucene
  • • MongoDB
  • • netty
  • • flume
  • • 我遇到的问题
  • • GRUB
  • • nginx
  • • 大家好的文章
  • • android
  • • tomcat
  • • Python
  • • luke
  • • android源码编译
  • • 安全
  • • MPAndroidChart
  • • swing
  • • POI
  • • powerdesigner
  • • jquery
  • • html
  • • java
  • • eclipse
  • • shell
  • • jvm
  • • highcharts
  • • 设计模式
  • • 列式数据库
  • • spring cloud
  • • docker+node.js+zookeeper构建微服务
版权所有 cookqq 感谢访问 支持开源 京ICP备15030920号
CopyRight 2015-2018 cookqq.com All Right Reserved.