flink-connector-mongodb-cdc实战案例
cookqq ›博客列表 ›flink

flink-connector-mongodb-cdc实战案例

2024-04-01 21:11:42.0|分类: flink|浏览量: 789

摘要: MongoDB CDC 连接器允许从 MongoDB 读取快照数据和增量数据。 本文档描述了如何设置 MongoDB CDC 连接器以针对 MongoDB 运行 SQL 查询。

Flink CDC 是一个基于流的数据集成工具,旨在为用户提供一套功能更加全面的编程接口(API)。 该工具使得用户能够以 YAML 配置文件的形式,优雅地定义其 ETL(Extract, Transform, Load)流程,并协助用户自动化生成定制化的 Flink 算子并且提交 Flink 作业。 Flink CDC 在任务提交过程中进行了优化,并且增加了一些高级特性,如表结构变更自动同步(Schema Evolution)、数据转换(Data Transformation)、整库同步(Full Database Synchronization)以及 精确一次(Exactly-once)语义。

Flink CDC 深度集成并由 Apache Flink 驱动,提供以下核心功能:

  • ✅ 端到端的数据集成框架

  • ✅ 为数据集成的用户提供了易于构建作业的 API

  • ✅ 支持在 Source 和 Sink 中处理多个表

  • ✅ 整库同步

  • ✅具备表结构变更自动同步的能力(Schema Evolution),


flink-connector-base模块主要是提供连接外部系统和数据源的基础功能,为其他具体的连接器模块提供了通用的接口和类。通过使用flink-connector-base,可以方便地实现自定义的连接器,并将Flink与各种外部系统集成起来,所以需要引用DataStream API,均需要加上此依赖。


flink-sql-connector-mongodb-cdc和flink-connector-mongodb-cdc的区别是什么?

flink-sql-connector-mongodb-cdc 和 flink-connector-mongodb-cdc 都是 Flink 的 MongoDB CDC(Change Data Capture)连接器,用于从 MongoDB 数据库中捕获变化数据并将其传递给 Flink 进行实时处理。 它们的区别在于使用方式和集成方式: 1. flink-sql-connector-mongodb-cdc :这是一个基于 Flink SQL 的连接器,允许您使用 SQL 查询语句来捕获 MongoDB 数据库中的变化数据。您可以使用 Flink SQL 的语法来定义源表、目标表以及变化数据的处理逻辑。这个连接器提供了更高层次的抽象,使得使用 SQL 进行 MongoDB CDC 更加方便。 2. flink-connector-mongodb-cdc :这是一个基于 Flink 的连接器,使用 Flink DataStream API 来捕获 MongoDB 数据库中的变化数据。您可以使用 DataStream API 来定义源和目标以及处理逻辑。这个连接器提供了更灵活的编程方式,适用于那些更喜欢使用编程接口而不是 SQL 的用户。 总结来说, flink-sql-connector-mongodb-cdc 提供了使用 SQL 进行 MongoDB CDC 的高级抽象,而 flink-connector-mongodb-cdc 则提供了更灵活的编程接口。您可以根据自己的喜好和需求选择适合的连接器


设置 MongoDB #

可用性 #

MongoDB 版本

MongoDB 版本 >= 3.6

我们使用 更改流 功能(3.6 版中新增),以捕获更改数据。

集群部署

副本集 或者 分片集群 是必需的。

存储引擎

WiredTiger 存储引擎是必需的。

副本集协议版本

副本集协议版本 1 (pv1) 是必需的。

从 4.0 版本开始,MongoDB 只支持pv1。 pv1 是使用 MongoDB 3.2 或更高版本创建的所有新副本集的默认值。

权限

changeStream and read 是 MongoDB Kafka Connector 必需权限。


你可以使用以下示例进行简单的授权。

有关更详细的授权, 请参照 MongoDB 数据库用户角色.


更改流 #

我们将 MongoDB’s official Kafka Connector 从 MongoDB 中读取快照或更改事件,并通过 Debezium 的 EmbeddedEngine 进行驱动。

Debezium 的 EmbeddedEngine 提供了一种在应用程序进程中运行单个 Kafka Connect SourceConnector 的机制,并且它可以正确地驱动任何标准的 Kafka Connect SourceConnector,即使它不是由 Debezium 提供的。

我们选择 MongoDB 的官方 Kafka连接器,而不是 Debezium 的MongoDB 连接器,因为它们使用了不同的更改数据捕获机制。

  • 对于 Debezium 的 MongoDB 连接器,它读取每个复制集主节点的 oplog.rs 集合。

  • 对于 MongoDB 的 Kafka 连接器,它订阅了 MongoDB 的 更改流。

MongoDB 的oplog.rs 集合没有在状态之前保持更改记录的更新, 因此,很难通过单个 oplog.rs 记录提取完整的文档状态,并将其转换为 Flink 接受的更改日志流(Insert Only,Upsert,All)。 此外,MongoDB 5(2021 7月发布)改变了 oplog 格式,因此当前的 Debezium 连接器不能与其一起使用。

Change Stream是 MongoDB 3.6 为副本集和分片集群提供的一项新功能,它允许应用程序访问实时数据更改,而不会带来跟踪操作日志的复杂性和风险。
应用程序可以使用更改流来订阅单个集合上的所有数据更改, 数据库或整个部署,并立即对其做出反应。

查找更新操作的完整文档是变更流提供的一项功能,它可以配置变更流以返回更新文档的最新多数提交版本。由于该功能,我们可以轻松收集最新的完整文档,并将更改日志转换为 Flink 的Upsert Changelog Stream。


代码:

package com.conca.flink.mongodb;

import com.ververica.cdc.connectors.mongodb.MongoDBSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;

//https://blog.csdn.net/penngo/article/details/124913985
public class MongoDBExample {
    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //2.通过FlinkCDC构建SourceFunction
        SourceFunction<String> mongoDBSourceFunction = MongoDBSource.<String>builder()
                .hosts("127.0.0.1:27217,127.0.0.1:27317,127.0.0.1:27417/?replicaSet=r0")
                .username("root")
                .password("123123")
//                .databaseList("Tutorial1") // 设置捕获的数据库,支持正则表达式
//                .collectionList("products", "orders") //设置捕获的集合,支持正则表达式
                .database("Tutorial1")
                .collection("products")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();

        DataStreamSource<String> dataStreamSource = env.addSource(mongoDBSourceFunction);

        SingleOutputStreamOperator<Object> singleOutputStreamOperator = dataStreamSource
        .process(new ProcessFunction<String, Object>() {
            @Override
            public void processElement(String value, ProcessFunction<String, Object>.Context ctx, 
            Collector<Object> out) {
                try {
                    System.out.println("processElement=====" + value);
                }catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        dataStreamSource.print("原始流--");
        env.execute("Mongo");
    }
}

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.penngo.flinkcdc</groupId>
  <artifactId>FlickCDC</artifactId>
  <packaging>jar</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>FlickCDC_TEST</name>
  <url>https://21doc.net/</url>
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <maven.compiler.source>11</maven.compiler.source>
    <maven.compiler.target>11</maven.compiler.target>
    <flink-version>1.13.3</flink-version>
    <flink-cdc-version>2.1.1</flink-cdc-version>
  </properties>
  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>${flink-version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-base</artifactId>
      <version>${flink-version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.12</artifactId>
      <version>${flink-version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_2.12</artifactId>
      <version>${flink-version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-common</artifactId>
      <version>${flink-version}</version>
    </dependency>

    <dependency>
      <groupId>com.ververica</groupId>
      <artifactId>flink-connector-mysql-cdc</artifactId>
      <version>${flink-cdc-version}</version>
    </dependency>
    <dependency>
      <groupId>com.ververica</groupId>
      <artifactId>flink-connector-mongodb-cdc</artifactId>
      <version>${flink-cdc-version}</version>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.8.1</version>
        <configuration>
          <source>${maven.compiler.source}</source>
          <target>${maven.compiler.target}</target>
          <encoding>${project.build.sourceEncoding}</encoding>
        </configuration>
      </plugin>
    </plugins>
  </build>
  <repositories>
    <repository>
      <id>alimaven</id>
      <name>Maven Aliyun Mirror</name>
      <url>https://maven.aliyun.com/repository/central</url>
    </repository>
  </repositories>
</project>


测试mongodb更改流:

mongo --port 27317 -uroot -p 123123 --authenticationDatabase admin

use Tutorial1
db.orders.insertOne( { 'order_id' : 1, 'item' : 'coffee' } )

插入orders表,orders表没有监听更改流,所以录入数据,不打印任何日志信息。


db.products.insertOne( { 'order_id' : 1, 'item' : 'coffee' } )

products被监听了,插入上面记录,会输出更改流,日志打印记录如下:

processElement====={"_id":"{\"_data\": \"82660AB127000000012B022C0100296E5A1004E8BE766AD3444CB18ABA65EC172888CC46645F69640064660AB127A52B0B59CFA2F9A90004\"}","operationType":"insert","fullDocument":"{\"_id\": {\"$oid\": \"660ab127a52b0b59cfa2f9a9\"}, \"order_id\": 1.0, \"item\": \"coffee\"}","source":{"ts_ms":1711976743000,"snapshot":null},"ns":{"db":"Tutorial1","coll":"products"},"to":null,"documentKey":"{\"_id\": {\"$oid\": \"660ab127a52b0b59cfa2f9a9\"}}","updateDescription":null,"clusterTime":"{\"$timestamp\": {\"t\": 1711976743, \"i\": 1}}","txnNumber":null,"lsid":null}

原始流--> {"_id":"{\"_data\": \"82660AB127000000012B022C0100296E5A1004E8BE766AD3444CB18ABA65EC172888CC46645F69640064660AB127A52B0B59CFA2F9A90004\"}","operationType":"insert","fullDocument":"{\"_id\": {\"$oid\": \"660ab127a52b0b59cfa2f9a9\"}, \"order_id\": 1.0, \"item\": \"coffee\"}","source":{"ts_ms":1711976743000,"snapshot":null},"ns":{"db":"Tutorial1","coll":"products"},"to":null,"documentKey":"{\"_id\": {\"$oid\": \"660ab127a52b0b59cfa2f9a9\"}}","updateDescription":null,"clusterTime":"{\"$timestamp\": {\"t\": 1711976743, \"i\": 1}}","txnNumber":null,"lsid":null}


参照文档:

https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/zh/docs/connectors/legacy-flink-cdc-sources/mongodb-cdc/

https://www.jianshu.com/p/badcd0106ca1

https://junyao.tech/posts/504c3b98.html


一键分享文章

分类列表

  • • struts源码分析
  • • flink
  • • struts
  • • redis
  • • kafka
  • • ubuntu
  • • zookeeper
  • • hadoop
  • • activiti
  • • linux
  • • 成长
  • • NIO
  • • 关键词提取
  • • mysql
  • • android studio
  • • zabbix
  • • 云计算
  • • mahout
  • • jmeter
  • • hive
  • • ActiveMQ
  • • lucene
  • • MongoDB
  • • netty
  • • flume
  • • 我遇到的问题
  • • GRUB
  • • nginx
  • • 大家好的文章
  • • android
  • • tomcat
  • • Python
  • • luke
  • • android源码编译
  • • 安全
  • • MPAndroidChart
  • • swing
  • • POI
  • • powerdesigner
  • • jquery
  • • html
  • • java
  • • eclipse
  • • shell
  • • jvm
  • • highcharts
  • • 设计模式
  • • 列式数据库
  • • spring cloud
  • • docker+node.js+zookeeper构建微服务
版权所有 cookqq 感谢访问 支持开源 京ICP备15030920号
CopyRight 2015-2018 cookqq.com All Right Reserved.