2024-04-16 21:57:02.0|分类: MongoDB|浏览量: 484
Apache Kafka Apache Kafka 是一个开源发布/订阅消息系统。Apache Kafka 提供了一个灵活、容错且可水平扩展的系统, 用于在数据存储和应用程序中移动数据。如果满足以下条件,则系统是容错的:即使系统的某些组件停止工作, 系统也可以继续运行。如果可以通过添加更多计算机(而不是改进计算机硬件)来扩展系统以处理更大的工作负载, 则系统是可水平扩展的。 Kafaka connect 是一种用于在Kafka和其他系统之间可扩展的、可靠的流式传输数据的工具。 它使得能够快速定义将大量数据集合移入和移出Kafka的连接器变得简单。 Kafka Connector Kafka Connect 是 Apache Kafka 的一个组件,解决了 Apache Kafka 与 MongoDB 等数据存储的连接问题。Kafka Connect 通过提供以下资源解决了这个问题: 用于与数据存储之间传输数据的容错运行时环境。 Apache Kafka 社区的框架,用于分享将 Apache Kafka 连接到不同数据存储的解决方案。 Kafka Connect 框架定义一个 API,以使开发人员编写可重用的连接器。 连接器使 Kafka Connect 部署能够与作为数据源或数据接收器的特定数据存储进行交互。 MongoDB Kafka Connector 就是这些连接器之一。 在连接到数据存储时使用 Kafka Connect 而不是生产者/使用者客户端 虽然您可以编写自己的应用程序,以使用生产客户端和消费客户端将 Apache Kafka 连接到特定的数据存储,但 Kafka Connect 可能更适合您。以下是使用 Kafka Connect 的部分原因: 1、Kafka Connect 具有容错的分布式架构,以确保可靠的管道。 2、有大量由社区维护的连接器,可使用 Kafka Connect 框架将 Apache Kafka 连接到 MongoDB、PostgreSQL 和 MySQL 等流行的数据存储。这样就减少了需要编写和维护的模板代码数量,以管理数据库连接、错误处理、死信队列集成以及 Apache Kafka 与数据存储连接时涉及的其他问题。 3、您可以选择使用 Confluence 提供的托管 Kafka Connect 集群。 Kafaka connect的核心组件: Source:负责将外部数据写入到kafka的topic中。 Sink:负责从kafka中读取数据到自己需要的地方去,比如读取到HDFS,hbase等。 官方文档: https://www.mongodb.com/zh-cn/docs/kafka-connector/current/introduction/kafka-connect/ 第一步:先安装kafka, 参照前面文档http://www.cookqq.com/blog/8a10a5f38d5a88bf018e8e951ddd6830 安装 MongoDB Kafka Connector 第二步:将 JAR 和任何依赖项复制到 Kafka 插件目录中 编辑配置文件C:\kafka3.7.0\config\connect-distributed.properties,末尾增加插件路径 plugin.path=C:\\kafka3.7.0\\plugins 第三步:创建C:\kafka3.7.0\plugins文件夹,plugins默认是不存在的 第四步:下载mongo-kafka-connect-1.11.2-all.jar,并且复制到C:\\kafka3.7.0\\plugins 下载jar路径:https://search.maven.org/artifact/org.mongodb.kafka/mongo-kafka-connect https://repo1.maven.org/maven2/org/mongodb/kafka/mongo-kafka-connect/1.11.2/ 源代码路径 :https://github.com/mongodb/mongo-kafka 第五步:下载mongodb驱动jar,并且复制到C:\\kafka3.7.0\\plugins mongodb-driver-core-4.7.2.jar mongodb-driver-sync-4.7.2.jar bson-record-codec-4.7.2.jar bson-4.7.2.jar 注意: 驱动程序版本:1.11 版的 MongoDB Kafka Connector 使用 4.7 版的 MongoDB Java 驱动程序。 https://www.mongodb.com/docs/drivers/ https://www.mongodb.com/docs/drivers/java-drivers/ 第六步:启动kafka cd C:\kafka3.7.0 .\bin\windows\kafka-server-start.bat .\config\server.properties 第六步:启动mongo-kafka-connect cd C:\kafka3.7.0 .\bin\windows\connect-distributed.bat .\config\connect-distributed.properties 第七步:验证mongo-kafka-connect是否正常启动 访问http://127.0.0.1:8083/connector-plugins,启动成功之后会展示所有插件信息; [ 其他接口REST API GET /connectors POST /connectors GET /connectors/{name} GET /connectors/{name}/config PUT /connectors/{name}/config GET /connectors/{name}/status GET /connectors/{name}/tasks GET /connectors/{name}/tasks/{taskid}/status PUT /connectors/{name}/pause PUT /connectors/{namel/resume POST /connectors/{name}/restart POST /connectors/{namel/tasks/{taskld}/restart DELETE /connectors/{name} 上面对应的说明 返回所有正在运行的connector 新建一个connector; 请求体必须是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必须包含你的connector的配置信息。 获取指定connetor的信息 获取指定connector的配置信息 更新指定connector的配置信息 获取指定connector的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。 获取指定connector正在运行的task。 获取指定connector的task的状态信息 暂停connector和它的task,停止数据处理知道它被恢复。 恢复一个被暂停的connector 重启-一个connector,尤其是在一个connector运行失败的情况下比较常用 重启一个task,一般是因为它运行失败才这样做 删除一个connector,停止它的所有task并删除配置 第八步:查看所有的connector http://127.0.0.1:8083/connectors,此时返回的是一个空数组,说明没有任何的connector: 第九步:配置 MongoDB Kafka 源连接器以读取变更流中的数据并将其发布到 Apache Kafka 主题 http://127.0.0.1:8083/connectors post请求 curl -X POST -H "Content-Type: application/json" -d @simplesource.json http://connect:8083/connectors -w "\n" { "name": "mongo-simple-source", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", "connection.uri": "mongodb://root:123123@127.0.0.1:27217,127.0.0.1:27317,127.0.0.1:27417/?replicaSet=r0", "publish.full.document.only": true, "database": "Tutorial1", "collection": "orders" } } 第十步:查看链接集合http://127.0.0.1:8083/connectors ["mongo-simple-source"] 第十一步:输入测试mongo数据 mongo --port 27317 -uroot -p 123123 --authenticationDatabase admin use Tutorial1 db.orders.insertOne( { 'order_id' : 1, 'item' : 'coffee' } ) 第十二步:查看kafka主题: cd C:\kafka3.7.0\bin\windows kafka-topics.bat --bootstrap-server 127.0.0.1:9092 --list 输入结果是: Tutorial1.orders __consumer_offsets connect-configs connect-offsets connect-status 第十三步:何配置 MongoDB Kafka 接收器连接器,以从 Apache Kafka 主题读取数据并将其写入 MongoDB 集合 http://127.0.0.1:8083/connectors post请求 curl -X POST -H "Content-Type: application/json" -d @simplesink.json http://connect:8083/connectors -w "\n" { "name": "mongo-tutorial-sink", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "topics": "Tutorial1.orders", "connection.uri": "mongodb://root:123123@127.0.0.1:27217,127.0.0.1:27317,127.0.0.1:27417/?replicaSet=r0", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": false, "database": "Tutorial1", "collection": "back" } } 第十四步:查看链接集合http://127.0.0.1:8083/connectors ["mongo-tutorial-sink","mongo-simple-source"] 第十五步:输入测试mongo数据 mongo --port 27317 -uroot -p 123123 --authenticationDatabase admin use Tutorial1 db.orders.insertOne( { 'order_id' : 1, 'item' : 'coffee' } ) 第十六步:查看back表数据 db.getCollection('back').find({}) 输出结果 { "_id" : ObjectId("660abd1755910d57485a7061"), "schema" : { "optional" : false, "type" : "string" }, "payload" : "{\"_id\": {\"$oid\": \"660abd17a52b0b59cfa2f9ab\"}, \"order_id\": 1.0, \"item\": \"coffee\"}" } 上面的插入数据字段是_id,schema,payload。schema,payload是kafka信息字段,MongoDB表的字段都被包含在payload字段中了,现在希望去掉上面这层包裹。表自动变成实际用户字段order_id、item。 从schema中发现字段类型是String,因为前面咱们mongo-simple-source没有配置转换器,应该默认用是String转换器。 现在需要将mongo-simple-source和mongo-tutorial-sink配置成相同规则的转换器,现在使用官网提供的JSON转换器。配置如下: 来源连接器 { "name": "mongo-simple-source", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", "connection.uri": "mongodb://root:123123@127.0.0.1:27217,127.0.0.1:27317,127.0.0.1:27417/?replicaSet=r0", "publish.full.document.only": true, "key.converter.schemas.enable": false, "value.converter.schemas.enable": false, "key.converter":"org.apache.kafka.connect.json.JsonConverter", "value.converter":"org.apache.kafka.connect.json.JsonConverter", "database": "Tutorial1", "collection": "orders" } } 接受连接器 { "name": "mongo-tutorial-sink", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "topics": "Tutorial1.orders", "connection.uri": "mongodb://root:123123@127.0.0.1:27217,127.0.0.1:27317,127.0.0.1:27417/?replicaSet=r0", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable":false, "value.converter.schemas.enable": false, "database": "Tutorial1", "collection": "back10" } } 本指南介绍了如何配置 MongoDB Kafka Connector,以在 MongoDB 和 Apache Kafka 之间发送数据。 完成本指南后,您应该了解如何使用 Kafka Connect REST API 来配置 MongoDB Kafka Connector, 以便从 MongoDB 读取数据并将其写入 Kafka 主题,以及从 Kafca 主题读取数据并将其写入 MongoDB。 |