2024-04-01 21:11:42.0|分类: flink|浏览量: 405
Flink CDC 是一个基于流的数据集成工具,旨在为用户提供一套功能更加全面的编程接口(API)。 该工具使得用户能够以 YAML 配置文件的形式,优雅地定义其 ETL(Extract, Transform, Load)流程,并协助用户自动化生成定制化的 Flink 算子并且提交 Flink 作业。 Flink CDC 在任务提交过程中进行了优化,并且增加了一些高级特性,如表结构变更自动同步(Schema Evolution)、数据转换(Data Transformation)、整库同步(Full Database Synchronization)以及 精确一次(Exactly-once)语义。 Flink CDC 深度集成并由 Apache Flink 驱动,提供以下核心功能:
flink-connector-base模块主要是提供连接外部系统和数据源的基础功能,为其他具体的连接器模块提供了通用的接口和类。通过使用flink-connector-base,可以方便地实现自定义的连接器,并将Flink与各种外部系统集成起来,所以需要引用DataStream API,均需要加上此依赖。 flink-sql-connector-mongodb-cdc和flink-connector-mongodb-cdc的区别是什么? flink-sql-connector-mongodb-cdc 和 flink-connector-mongodb-cdc 都是 Flink 的 MongoDB CDC(Change Data Capture)连接器,用于从 MongoDB 数据库中捕获变化数据并将其传递给 Flink 进行实时处理。 它们的区别在于使用方式和集成方式: 1. flink-sql-connector-mongodb-cdc :这是一个基于 Flink SQL 的连接器,允许您使用 SQL 查询语句来捕获 MongoDB 数据库中的变化数据。您可以使用 Flink SQL 的语法来定义源表、目标表以及变化数据的处理逻辑。这个连接器提供了更高层次的抽象,使得使用 SQL 进行 MongoDB CDC 更加方便。 2. flink-connector-mongodb-cdc :这是一个基于 Flink 的连接器,使用 Flink DataStream API 来捕获 MongoDB 数据库中的变化数据。您可以使用 DataStream API 来定义源和目标以及处理逻辑。这个连接器提供了更灵活的编程方式,适用于那些更喜欢使用编程接口而不是 SQL 的用户。 总结来说, flink-sql-connector-mongodb-cdc 提供了使用 SQL 进行 MongoDB CDC 的高级抽象,而 flink-connector-mongodb-cdc 则提供了更灵活的编程接口。您可以根据自己的喜好和需求选择适合的连接器 设置 MongoDB #可用性 # MongoDB 版本 MongoDB 版本 >= 3.6 我们使用 更改流 功能(3.6 版中新增),以捕获更改数据。 集群部署 副本集 或者 分片集群 是必需的。 存储引擎 WiredTiger 存储引擎是必需的。 副本集协议版本 副本集协议版本 1 (pv1) 是必需的。 从 4.0 版本开始,MongoDB 只支持pv1。 pv1 是使用 MongoDB 3.2 或更高版本创建的所有新副本集的默认值。 权限 changeStream and read 是 MongoDB Kafka Connector 必需权限。 你可以使用以下示例进行简单的授权。 有关更详细的授权, 请参照 MongoDB 数据库用户角色. 更改流 #我们将 MongoDB’s official Kafka Connector 从 MongoDB 中读取快照或更改事件,并通过 Debezium 的 Debezium 的 我们选择 MongoDB 的官方 Kafka连接器,而不是 Debezium 的MongoDB 连接器,因为它们使用了不同的更改数据捕获机制。
MongoDB 的 Change Stream是 MongoDB 3.6 为副本集和分片集群提供的一项新功能,它允许应用程序访问实时数据更改,而不会带来跟踪操作日志的复杂性和风险。 查找更新操作的完整文档是变更流提供的一项功能,它可以配置变更流以返回更新文档的最新多数提交版本。由于该功能,我们可以轻松收集最新的完整文档,并将更改日志转换为 Flink 的Upsert Changelog Stream。 代码: package com.conca.flink.mongodb; import com.ververica.cdc.connectors.mongodb.MongoDBSource; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.commons.lang3.StringEscapeUtils; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.util.Collector; //https://blog.csdn.net/penngo/article/details/124913985 public class MongoDBExample { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2.通过FlinkCDC构建SourceFunction SourceFunction<String> mongoDBSourceFunction = MongoDBSource.<String>builder() .hosts("127.0.0.1:27217,127.0.0.1:27317,127.0.0.1:27417/?replicaSet=r0") .username("root") .password("123123") // .databaseList("Tutorial1") // 设置捕获的数据库,支持正则表达式 // .collectionList("products", "orders") //设置捕获的集合,支持正则表达式 .database("Tutorial1") .collection("products") .deserializer(new JsonDebeziumDeserializationSchema()) .build(); DataStreamSource<String> dataStreamSource = env.addSource(mongoDBSourceFunction); SingleOutputStreamOperator<Object> singleOutputStreamOperator = dataStreamSource .process(new ProcessFunction<String, Object>() { @Override public void processElement(String value, ProcessFunction<String, Object>.Context ctx, Collector<Object> out) { try { System.out.println("processElement=====" + value); }catch (Exception e) { e.printStackTrace(); } } }); dataStreamSource.print("原始流--"); env.execute("Mongo"); } } pom.xml <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.penngo.flinkcdc</groupId> <artifactId>FlickCDC</artifactId> <packaging>jar</packaging> <version>1.0-SNAPSHOT</version> <name>FlickCDC_TEST</name> <url>https://21doc.net/</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> <flink-version>1.13.3</flink-version> <flink-cdc-version>2.1.1</flink-cdc-version> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink-version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink-version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>${flink-version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>${flink-version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink-version}</version> </dependency> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>${flink-cdc-version}</version> </dependency> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-mongodb-cdc</artifactId> <version>${flink-cdc-version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <configuration> <source>${maven.compiler.source}</source> <target>${maven.compiler.target}</target> <encoding>${project.build.sourceEncoding}</encoding> </configuration> </plugin> </plugins> </build> <repositories> <repository> <id>alimaven</id> <name>Maven Aliyun Mirror</name> <url>https://maven.aliyun.com/repository/central</url> </repository> </repositories> </project> 测试mongodb更改流: mongo --port 27317 -uroot -p 123123 --authenticationDatabase admin use Tutorial1 db.orders.insertOne( { 'order_id' : 1, 'item' : 'coffee' } ) 插入orders表,orders表没有监听更改流,所以录入数据,不打印任何日志信息。 db.products.insertOne( { 'order_id' : 1, 'item' : 'coffee' } ) products被监听了,插入上面记录,会输出更改流,日志打印记录如下: processElement====={"_id":"{\"_data\": \"82660AB127000000012B022C0100296E5A1004E8BE766AD3444CB18ABA65EC172888CC46645F69640064660AB127A52B0B59CFA2F9A90004\"}","operationType":"insert","fullDocument":"{\"_id\": {\"$oid\": \"660ab127a52b0b59cfa2f9a9\"}, \"order_id\": 1.0, \"item\": \"coffee\"}","source":{"ts_ms":1711976743000,"snapshot":null},"ns":{"db":"Tutorial1","coll":"products"},"to":null,"documentKey":"{\"_id\": {\"$oid\": \"660ab127a52b0b59cfa2f9a9\"}}","updateDescription":null,"clusterTime":"{\"$timestamp\": {\"t\": 1711976743, \"i\": 1}}","txnNumber":null,"lsid":null} 原始流--> {"_id":"{\"_data\": \"82660AB127000000012B022C0100296E5A1004E8BE766AD3444CB18ABA65EC172888CC46645F69640064660AB127A52B0B59CFA2F9A90004\"}","operationType":"insert","fullDocument":"{\"_id\": {\"$oid\": \"660ab127a52b0b59cfa2f9a9\"}, \"order_id\": 1.0, \"item\": \"coffee\"}","source":{"ts_ms":1711976743000,"snapshot":null},"ns":{"db":"Tutorial1","coll":"products"},"to":null,"documentKey":"{\"_id\": {\"$oid\": \"660ab127a52b0b59cfa2f9a9\"}}","updateDescription":null,"clusterTime":"{\"$timestamp\": {\"t\": 1711976743, \"i\": 1}}","txnNumber":null,"lsid":null} 参照文档: https://www.jianshu.com/p/badcd0106ca1 https://junyao.tech/posts/504c3b98.html |