Change Stream 是 MongoDB 用于实现变更追踪的解决方案,类似于关系数据库的触发器,但原理不完全相同: Change Stream | 触发器 |
|
---|
触发方式 | 异步 | 同步(事务保证) | 触发位置 | 应用回调事件 | 数据库触发器 | 触发次数 | 每个订阅事件的客户端 | 1次(触发器) | 故障恢复 | 从上次断点重新触发 | 事务回滚 |
Change Stream 的实现原理 Change Stream 是基于 oplog 实现的,提供推送实时增量的推送功能。它在 oplog 上开启一个 tailable cursor 来追踪所有复制集上的变更操作,最终调用应用中定义的回调函数。 被追踪的变更事件主要包括: insert/update/delete:插入、更新、删除; drop:集合被删除; rename:集合被重命名; dropDatabase:数据库被删除; invalidate:drop/rename/dropDatabase 将导致 invalidate 被触发, 并关闭 change stream;
由于Change Stream 利用了存储在 oplog 中的信息,因此对于单进程部署的MongoDB无法支持Change Stream功能,其只能用于启用了副本集的独立集群或分片集群 监听目标
名称 | 说明 |
---|
单个集合 | 除系统库(admin/local/config)之外的集合,3.6版本支持 | 单个数据库 | 除系统库(admin/local/config)之外的数据库集合,4.0版本支持 | 整个集群 | 整个集群内除去系统库( (admin/local/config)之外的集合 ,4.0版本支持 |
使用条件
1、WiredTiger 存储引擎 2、副本集 (测试环境下,也可以使用单节点的副本集) 或分片集群部署 3、副本集协议版本:pv1 (默认) 4、4.0 版本之前允许 Majority Read Concern: replication.enableMajorityReadConcern = true (默认允许)// 5、MongoDB 用户拥有 find 和 changeStream 权限
Change Events Change Events 是 Change Streams 返回的变更记录,其数据结构如下所示: { _id : { <BSON Object> }, "operationType" : "<operation>", "fullDocument" : { <document> }, "ns" : { "db" : "<database>", "coll" : "<collection>" }, "to" : { "db" : "<database>", "coll" : "<collection>" }, "documentKey" : { "_id" : <value> }, "updateDescription" : { "updatedFields" : { <document> }, "removedFields" : [ "<field>", ... ], "truncatedArrays" : [ { "field" : <field>, "newSize" : <integer> }, ... ] }, "clusterTime" : <Timestamp>, "txnNumber" : <NumberLong>, "lsid" : { "id" : <UUID>, "uid" : <BinData> } }
变更事件字段说明:
名称说明 _id 变更事件的Token对象 operationType 变更类型(见下面介绍) fullDocument 文档内容 ns 监听的目标 ns.db 变更的数据库 ns.coll 变更的集合 documentKey 变更文档的键值,含_id字段 updateDescription 变更描述 updateDescription.updatedFields 变更中更新字段 updateDescription.removedFields 变更中删除字段 clusterTime 对应oplog的时间戳 txnNumber 事务编号,仅在多文档事务中出现,4.0版本支持 lsid 事务关联的会话编号,仅在多文档事务中出现,4.0版本支持
Update Lookup 由于 oplog 的更新操作仅包含了有变更后的字段,变更后完整的文档无法从 oplog 直接获取,但是在转换为 UPSERT 模式的 changelog 时,UPDATE_AFTER RowData 必须拥有完整行记录。Change Streams 通过设置 fullDocument = updateLookup,可以在获取变更记录时返回该文档的最新状态。另外,Change Event 的每条记录都包含 documentKey (_id 以及 shard key),标识发生变更记录的主键信息,即满足幂等更新的条件。所以通过 Update Lookup 特性,可以将 MongoDB 的变更记录转换成 Flink 的 UPSERT changelog。
java实现代码: MongoCollection<Document> mongoCollection = formMongo.getCollection(数据库名字, table);
// Create $match pipeline stage.
List<Bson> pipeline = Collections.singletonList(
Aggregates.match(Filters.in("operationType", Arrays.asList("insert",
"update", "delete"))));
// Create the change stream cursor, passing the pipeline to the
// collection.watch() method
ChangeStreamIterable<Document> changeStreamIterable = mongoCollection.watch(pipeline);
// 不设置时,文档更新时,只会发送变更字段的信息,设置UPDATE_LOOKUP会返回文档的全部信息
changeStreamIterable.fullDocument(FullDocument.UPDATE_LOOKUP);
MongoCursor<ChangeStreamDocument<Document>> cursor = changeStreamIterable.iterator();
while (cursor.hasNext()) {
ChangeStreamDocument<Document> next = cursor.next();
String operation = next.getOperationType().getValue();
String tableName = next.getNamespace().getCollectionName();
System.out.println(tableName + ">>" + operation + ":" + JSON.toJSONString(next));
}
启动程序之后,对记录进行增加和删除等操作,操作日志如下: { "clusterTime": { "array": false, "binary": false, "boolean": false, "bsonType": "TIMESTAMP", "dBPointer": false, "dateTime": false, "decimal128": false, "document": false, "double": false, "inc": 2, "int32": false, "int64": false, "javaScript": false, "javaScriptWithScope": false, "null": false, "number": false, "objectId": false, "regularExpression": false, "string": false, "symbol": false, "time": 1711619810, "timestamp": true, "value": 7351351107135733762 }, "databaseName": "数据库名字", "documentKey": { "_id": { "array": false, "binary": false, "boolean": false, "bsonType": "STRING", "dBPointer": false, "dateTime": false, "decimal128": false, "document": false, "double": false, "int32": false, "int64": false, "javaScript": false, "javaScriptWithScope": false, "null": false, "number": false, "objectId": false, "regularExpression": false, "string": true, "symbol": false, "timestamp": false, "value": "104esmb" } }, "fullDocument": { "_id": "104esmb", "status": true, "query_status": true, "source_id": "104esmq", "data_set": "[\"104esmb\"]", "104eslz": "河北省", "104esm1": "廊坊市", "104esm2": "固安", "104esm3": "conca", "104esm4": 36 }, "namespace": { "collectionName": "402881708e846308018e8463d93e0016", "databaseName": "数据库名字", "fullName": "数据库名字.402881708e846308018e8463d93e0016" }, "namespaceDocument": { "db": { "array": false, "binary": false, "boolean": false, "bsonType": "STRING", "dBPointer": false, "dateTime": false, "decimal128": false, "document": false, "double": false, "int32": false, "int64": false, "javaScript": false, "javaScriptWithScope": false, "null": false, "number": false, "objectId": false, "regularExpression": false, "string": true, "symbol": false, "timestamp": false, "value": "数据库名字" }, "coll": { "array": false, "binary": false, "boolean": false, "bsonType": "STRING", "dBPointer": false, "dateTime": false, "decimal128": false, "document": false, "double": false, "int32": false, "int64": false, "javaScript": false, "javaScriptWithScope": false, "null": false, "number": false, "objectId": false, "regularExpression": false, "string": true, "symbol": false, "timestamp": false, "value": "402881708e846308018e8463d93e0016" } }, "operationType": "UPDATE", "resumeToken": { "_data": { "array": false, "binary": false, "boolean": false, "bsonType": "STRING", "dBPointer": false, "dateTime": false, "decimal128": false, "document": false, "double": false, "int32": false, "int64": false, "javaScript": false, "javaScriptWithScope": false, "null": false, "number": false, "objectId": false, "regularExpression": false, "string": true, "symbol": false, "timestamp": false, "value": "8266053EE2000000022B022C0100296E5A10044B888E952F824B12B38FA3A2314E01D3463C5F6964003C31303465736D62000004" } }, "updateDescription": { "removedFields": [
], "truncatedArrays": [
], "updatedFields": { "104esm3": { "array": false, "binary": false, "boolean": false, "bsonType": "STRING", "dBPointer": false, "dateTime": false, "decimal128": false, "document": false, "double": false, "int32": false, "int64": false, "javaScript": false, "javaScriptWithScope": false, "null": false, "number": false, "objectId": false, "regularExpression": false, "string": true, "symbol": false, "timestamp": false, "value": "conca" }, "source_id": { "array": false, "binary": false, "boolean": false, "bsonType": "STRING", "dBPointer": false, "dateTime": false, "decimal128": false, "document": false, "double": false, "int32": false, "int64": false, "javaScript": false, "javaScriptWithScope": false, "null": false, "number": false, "objectId": false, "regularExpression": false, "string": true, "symbol": false, "timestamp": false, "value": "104esmq" }, "updated_at": { "array": false, "binary": false, "boolean": false, "bsonType": "DATE_TIME", "dBPointer": false, "dateTime": true, "decimal128": false, "document": false, "double": false, "int32": false, "int64": false, "javaScript": false, "javaScriptWithScope": false, "null": false, "number": false, "objectId": false, "regularExpression": false, "string": false, "symbol": false, "timestamp": false, "value": 1711619810720 } } } }
|