mongo-kafka-connect实战案例
cookqq ›博客列表 ›MongoDB

mongo-kafka-connect实战案例

2024-04-16 21:57:02.0|分类: MongoDB|浏览量: 869

摘要: ​Kafka Connect可以从数据库或应用程序服务 器收集数据到Kafka topic,使数据可用于低延迟的流处理。导出作业可以将数据从Kafka topic传输到二次存储和 查询系统,或者传递到批处理系统以进行离线分析。

Apache Kafka

Apache Kafka 是一个开源发布/订阅消息系统。Apache Kafka 提供了一个灵活、容错且可水平扩展的系统,

用于在数据存储和应用程序中移动数据。如果满足以下条件,则系统是容错的:即使系统的某些组件停止工作,

系统也可以继续运行。如果可以通过添加更多计算机(而不是改进计算机硬件)来扩展系统以处理更大的工作负载,

则系统是可水平扩展的。


Kafaka connect 是一种用于在Kafka和其他系统之间可扩展的、可靠的流式传输数据的工具。

它使得能够快速定义将大量数据集合移入和移出Kafka的连接器变得简单。


Kafka Connector

Kafka Connect 是 Apache Kafka 的一个组件,解决了 Apache Kafka 与 MongoDB 等数据存储的连接问题。Kafka Connect 通过提供以下资源解决了这个问题:

用于与数据存储之间传输数据的容错运行时环境。

Apache Kafka 社区的框架,用于分享将 Apache Kafka 连接到不同数据存储的解决方案。


Kafka Connect 框架定义一个 API,以使开发人员编写可重用的连接器。

连接器使 Kafka Connect 部署能够与作为数据源或数据接收器的特定数据存储进行交互。

MongoDB Kafka Connector 就是这些连接器之一。


在连接到数据存储时使用 Kafka Connect 而不是生产者/使用者客户端

虽然您可以编写自己的应用程序,以使用生产客户端和消费客户端将 Apache Kafka 连接到特定的数据存储,但 Kafka Connect 可能更适合您。以下是使用 Kafka Connect 的部分原因:

1、Kafka Connect 具有容错的分布式架构,以确保可靠的管道。

2、有大量由社区维护的连接器,可使用 Kafka Connect 框架将 Apache Kafka 连接到 MongoDB、PostgreSQL 和 MySQL 等流行的数据存储。这样就减少了需要编写和维护的模板代码数量,以管理数据库连接、错误处理、死信队列集成以及 Apache Kafka 与数据存储连接时涉及的其他问题。

3、您可以选择使用 Confluence 提供的托管 Kafka Connect 集群。


Kafaka connect的核心组件:

Source:负责将外部数据写入到kafka的topic中。

Sink:负责从kafka中读取数据到自己需要的地方去,比如读取到HDFS,hbase等。



官方文档:

https://www.mongodb.com/zh-cn/docs/kafka-connector/current/introduction/kafka-connect/


第一步:先安装kafka,

参照前面文档http://www.cookqq.com/blog/8a10a5f38d5a88bf018e8e951ddd6830


安装 MongoDB Kafka Connector

第二步:将 JAR 和任何依赖项复制到 Kafka 插件目录中

编辑配置文件C:\kafka3.7.0\config\connect-distributed.properties,末尾增加插件路径

plugin.path=C:\\kafka3.7.0\\plugins


第三步:创建C:\kafka3.7.0\plugins文件夹,plugins默认是不存在的

第四步:下载mongo-kafka-connect-1.11.2-all.jar,并且复制到C:\\kafka3.7.0\\plugins

下载jar路径:https://search.maven.org/artifact/org.mongodb.kafka/mongo-kafka-connect

https://repo1.maven.org/maven2/org/mongodb/kafka/mongo-kafka-connect/1.11.2/

源代码路径 :https://github.com/mongodb/mongo-kafka


第五步:下载mongodb驱动jar,并且复制到C:\\kafka3.7.0\\plugins

mongodb-driver-core-4.7.2.jar

mongodb-driver-sync-4.7.2.jar

bson-record-codec-4.7.2.jar

bson-4.7.2.jar


注意:

驱动程序版本:1.11 版的 MongoDB Kafka Connector 使用 4.7 版的 MongoDB Java 驱动程序。

https://www.mongodb.com/docs/drivers/

https://www.mongodb.com/docs/drivers/java-drivers/


第六步:启动kafka

cd C:\kafka3.7.0

.\bin\windows\kafka-server-start.bat .\config\server.properties


第六步:启动mongo-kafka-connect

cd C:\kafka3.7.0

.\bin\windows\connect-distributed.bat .\config\connect-distributed.properties


第七步:验证mongo-kafka-connect是否正常启动

访问http://127.0.0.1:8083/connector-plugins,启动成功之后会展示所有插件信息;

[
    {
        "class": "com.mongodb.kafka.connect.MongoSinkConnector",
        "type": "sink",
        "version": "1.11.2"
    }
,
    {
        "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "type": "sink",
        "version": "3.7.0"
    }
,
    {
        "class": "com.mongodb.kafka.connect.MongoSourceConnector",
        "type": "source",
        "version": "1.11.2"
    }
,
    {
        "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
        "type": "source",
        "version": "3.7.0"
    }
,
    {
        "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
        "type": "source",
        "version": "3.7.0"
    }
,
    {
        "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
        "type": "source",
        "version": "3.7.0"
    }
,
    {
        "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
        "type": "source",
        "version": "3.7.0"
    }

]


其他接口REST API

GET /connectors

POST /connectors

GET /connectors/{name}

GET /connectors/{name}/config

PUT /connectors/{name}/config

GET /connectors/{name}/status

GET /connectors/{name}/tasks

GET /connectors/{name}/tasks/{taskid}/status

PUT /connectors/{name}/pause

PUT /connectors/{namel/resume

POST /connectors/{name}/restart

POST /connectors/{namel/tasks/{taskld}/restart

DELETE /connectors/{name}


上面对应的说明

返回所有正在运行的connector

新建一个connector; 请求体必须是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必须包含你的connector的配置信息。

获取指定connetor的信息

获取指定connector的配置信息

更新指定connector的配置信息

获取指定connector的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。

获取指定connector正在运行的task。

获取指定connector的task的状态信息

暂停connector和它的task,停止数据处理知道它被恢复。

恢复一个被暂停的connector

重启-一个connector,尤其是在一个connector运行失败的情况下比较常用

重启一个task,一般是因为它运行失败才这样做

删除一个connector,停止它的所有task并删除配置


第八步:查看所有的connector

http://127.0.0.1:8083/connectors,此时返回的是一个空数组,说明没有任何的connector:


第九步:配置 MongoDB Kafka 源连接器以读取变更流中的数据并将其发布到 Apache Kafka 主题

http://127.0.0.1:8083/connectors post请求

curl -X POST -H "Content-Type: application/json" -d @simplesource.json http://connect:8083/connectors -w "\n"

{

  "name": "mongo-simple-source",

  "config": {

    "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",

    "connection.uri": "mongodb://root:123123@127.0.0.1:27217,127.0.0.1:27317,127.0.0.1:27417/?replicaSet=r0",

"publish.full.document.only": true,

    "database": "Tutorial1",

    "collection": "orders"

  }

}


第十步:查看链接集合http://127.0.0.1:8083/connectors

["mongo-simple-source"]


第十一步:输入测试mongo数据

mongo --port 27317 -uroot -p 123123 --authenticationDatabase admin

use Tutorial1

db.orders.insertOne( { 'order_id' : 1, 'item' : 'coffee' } )


第十二步:查看kafka主题:

cd C:\kafka3.7.0\bin\windows

kafka-topics.bat --bootstrap-server 127.0.0.1:9092 --list

输入结果是:

Tutorial1.orders

__consumer_offsets

connect-configs

connect-offsets

connect-status


第十三步:何配置 MongoDB Kafka 接收器连接器,以从 Apache Kafka 主题读取数据并将其写入 MongoDB 集合


http://127.0.0.1:8083/connectors post请求 


curl -X POST -H "Content-Type: application/json" -d @simplesink.json http://connect:8083/connectors -w "\n"


{

  "name": "mongo-tutorial-sink",

  "config": {

    "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",

    "topics": "Tutorial1.orders",

    "connection.uri": "mongodb://root:123123@127.0.0.1:27217,127.0.0.1:27317,127.0.0.1:27417/?replicaSet=r0",

    "key.converter": "org.apache.kafka.connect.storage.StringConverter",

    "value.converter": "org.apache.kafka.connect.json.JsonConverter",

    "value.converter.schemas.enable": false,

    "database": "Tutorial1",

    "collection": "back"

  }

}


第十四步:查看链接集合http://127.0.0.1:8083/connectors

["mongo-tutorial-sink","mongo-simple-source"]


第十五步:输入测试mongo数据

mongo --port 27317 -uroot -p 123123 --authenticationDatabase admin

use Tutorial1

db.orders.insertOne( { 'order_id' : 1, 'item' : 'coffee' } )


第十六步:查看back表数据

db.getCollection('back').find({})


输出结果

{

    "_id" : ObjectId("660abd1755910d57485a7061"),

    "schema" : {

        "optional" : false,

        "type" : "string"

    },

    "payload" : "{\"_id\": {\"$oid\": \"660abd17a52b0b59cfa2f9ab\"}, \"order_id\": 1.0, \"item\": \"coffee\"}"

}


上面的插入数据字段是_id,schema,payload。schema,payload是kafka信息字段,MongoDB表的字段都被包含在payload字段中了,现在希望去掉上面这层包裹。表自动变成实际用户字段order_id、item。


从schema中发现字段类型是String,因为前面咱们mongo-simple-source没有配置转换器,应该默认用是String转换器。


现在需要将mongo-simple-source和mongo-tutorial-sink配置成相同规则的转换器,现在使用官网提供的JSON转换器。配置如下:

来源连接器
{
  "name": "mongo-simple-source",
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
    "connection.uri": "mongodb://root:123123@127.0.0.1:27217,127.0.0.1:27317,127.0.0.1:27417/?replicaSet=r0",
"publish.full.document.only": true,
"key.converter.schemas.enable": false,
"value.converter.schemas.enable": false,
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
    "database": "Tutorial1",
    "collection": "orders"
  }

}

接受连接器
{

  "name": "mongo-tutorial-sink",
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
    "topics": "Tutorial1.orders",
    "connection.uri": "mongodb://root:123123@127.0.0.1:27217,127.0.0.1:27317,127.0.0.1:27417/?replicaSet=r0",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable":false,
    "value.converter.schemas.enable": false,
    "database": "Tutorial1",
    "collection": "back10"
  }

}



本指南介绍了如何配置 MongoDB Kafka Connector,以在 MongoDB 和 Apache Kafka 之间发送数据。

完成本指南后,您应该了解如何使用 Kafka Connect REST API 来配置 MongoDB Kafka Connector,

以便从 MongoDB 读取数据并将其写入 Kafka 主题,以及从 Kafca 主题读取数据并将其写入 MongoDB。


一键分享文章

分类列表

  • • struts源码分析
  • • flink
  • • struts
  • • redis
  • • kafka
  • • ubuntu
  • • zookeeper
  • • hadoop
  • • activiti
  • • linux
  • • 成长
  • • NIO
  • • 关键词提取
  • • mysql
  • • android studio
  • • zabbix
  • • 云计算
  • • mahout
  • • jmeter
  • • hive
  • • ActiveMQ
  • • lucene
  • • MongoDB
  • • netty
  • • flume
  • • 我遇到的问题
  • • GRUB
  • • nginx
  • • 大家好的文章
  • • android
  • • tomcat
  • • Python
  • • luke
  • • android源码编译
  • • 安全
  • • MPAndroidChart
  • • swing
  • • POI
  • • powerdesigner
  • • jquery
  • • html
  • • java
  • • eclipse
  • • shell
  • • jvm
  • • highcharts
  • • 设计模式
  • • 列式数据库
  • • spring cloud
  • • docker+node.js+zookeeper构建微服务
版权所有 cookqq 感谢访问 支持开源 京ICP备15030920号
CopyRight 2015-2018 cookqq.com All Right Reserved.