• 售前

  • 售后

热门帖子
入门百科

剖析SQL Server CDC配合Kafka Connect监听数据变革的题目

[复制链接]
静美人2017 显示全部楼层 发表于 2022-1-8 09:58:44 |阅读模式 打印 上一主题 下一主题
写在前面

  好久没更新Blog了,从CRUD Boy转型大数据开发,拉宽了不少的知识面,从本年年初开始筹办、组建、招兵买马,到现在稳定开搞中,期间踏过无数的火坑,大概除了这篇还很写上三四篇。
  进入主题,通常企业为了实现数据统计、数据分析、数据挖掘、解决信息孤岛等全局数据的体系化运作管理 ,为BI、经营分析、决议支持体系等深度开发应用奠定基础,挖掘数据价值 ,企业会开始动手创建数据堆栈,数据中台。而这些数据泉源则来自于企业的各个业务体系的数据或爬取外部的数据,从业务体系数据到数据堆栈的过程就是一个ETL(Extract-Transform-Load)行为,包罗了收罗、洗濯、数据转换等重要过程,通常异构数据抽取转换利用Sqoop、DataX等,日志收罗Flume、Logstash、Filebeat等。
  数据抽取分为全量抽取和增量抽取,全量抽取雷同于数据迁移或数据复制,全量抽取很好理解;增量抽取在全量的基础上做增量,只监听、捕捉动态变革的数据。如何捕捉数据的变革是增量抽取的关键,一是正确性,必须保证正确的捕捉到数据的动态变革,二是性能,不能对业务体系造成太大的压力。
增量抽取方式

  通常增量抽取有几种方式,各有优缺点。
1. 触发器

  在源数据库上的目的表创建触发器,监听增、删、改操纵,捕捉到数据的变更写入临时表。
长处:操纵简单、规则清楚,对源表不影响;
缺点:对源数据库有侵入,对业务体系有一定的影响;
2. 全表比对

  在ETL过程中,抽取方创建临时表待全量抽取存储,然后在举行比对数据。
长处:对源数据库、源表都无需改动,完全交付ETL过程处理,同一管理;
缺点:ETL服从低、设计复杂,数据量越大,速度越慢,时效性不确定;
3. 全表删除后再插入

  在抽取数据之前,先将表中数据清空,然后全量抽取。
长处:ETL 操纵简单,速度快。
缺点:全量抽取一样平常采取T+1的情势,抽取数据量大的表轻易对数据库造成压力;
4. 时间戳

  时间戳的方式即在源表上增长时间戳列,对发生变更的表举行更新,然后根据时间戳举行提取。
长处:操纵简单,ELT逻辑清楚,性能比力好;
缺点:对业务体系有侵入,数据库表也需要额外增长字段。对于老的业务体系大概不轻易做变更。
5. CDC方式

  变更数据捕获Change Data Capture(简称CDC),SQLServer为实时更新数据同步提供了CDC机制,雷同于Mysql的binlog,将数据更新操纵维护到一张CDC表中。开启CDC的源表在插入INSERT、更新UPDATE和删除DELETE运动时会插入数据到日志表中。cdc通过捕获进程将变更数据捕获到变更表中,通过cdc提供的查询函数,可以捕获这部门数据。详情可以查看官方介绍:关于变更数据捕获 (SQL Server)

长处:提供易于利用的API 来设置CDC 环境,紧缩ETL 的时间,无需修改业务体系表结构。
缺点:受数据库版本的限定,实现过程相对复杂。
CDC增量抽取

先决条件

1. 已搭建好Kafka集群,Zookeeper集群;
2. 源数据库支持CDC,版本采用开发版或企业版。
案例环境:
Ubuntu 20.04
Kafka2.13-2.7.0
Zookeeper 3.6.2
SQL Server 2012
步调

  除了数据库开启CDC支持以外,重要照旧要将变更的数据通过Kafka Connect传输数据,Debezium是目前官方推荐的连接器,它支持绝大多数主流数据库:MySQL、PostgreSQL、SQL Server、Oracle等等,详情查看Connectors。
1. 数据库步调

开启数据库CDC支持
  在源数据库实行以下命令:
  1. EXEC sys.sp_cdc_enable_db GO
复制代码
  附上关闭语句:
  1. exec sys.sp_cdc_disable_db
复制代码
查询是否启用
  1. select * from sys.databases where is_cdc_enabled = 1
复制代码
创建测试数据表:(已有表则跳过此步调)
  1. create  table T_LioCDC
  2. (
  3.     ID int identity(1,1) primary key ,
  4.     Name nvarchar(16),
  5.     Sex bit,
  6.     CreateTime datetime,
  7.     UpdateTime datetime
  8. );
复制代码
对源表开启CDC支持:
  1. exec sp_cdc_enable_table
  2. @source_schema='dbo',
  3. @source_name='T_LioCDC',
  4. @role_name=null,
  5. @supports_net_changes = 1;
复制代码
确认是否有权限访问CDC Table:
  1. EXEC sys.sp_cdc_help_change_data_capture
复制代码

确认SQL Server Agent已开启:
  1. <p style="text-align: left;">EXEC master.dbo.xp_servicecontrol N'QUERYSTATE',N'SQLSERVERAGENT'
复制代码

  以上则完成对数据库的CDC操纵。
2. Kafka步调

  Kafka Connect的工作模式分为两种,分别是standalone模式和distributed模式。standalone用于单机测试,本文用distributed模式,用于生产环境。(Kafka必须先运行启动,再举行以下步调举行设置。)
下载Sql Server Connector
  下载连接器后,创建一个文件夹来存放,解压到该目录下即可,例子路径:/usr/soft/kafka/kafka_2.13_2.7.0/plugins(记取这个路径,设置中要用到)

下载地址:debezium-connector-sqlserver-1.5.0.Final-plugin.tar.gz


编辑connect-distributed.properties设置
  修改Kafka connect设置文件,$KAFKA_HOME/config/connect-distributed.properties,变更内容如下:
  1. //kafka集群ip+portbootstrap.servers=172.192.10.210:9092,172.192.10.211:9092,172.192.10.212:9092
  2. key.converter.schemas.enable=false
  3. value.converter.schemas.enable=false
  4. offset.storage.topic=connect-offsets
  5. offset.storage.replication.factor=1
  6. offset.storage.partitions=3
  7. offset.storage.cleanup.policy=compact
  8. config.storage.topic=connect-configs
  9. config.storage.replication.factor=1
  10. status.storage.topic=connect-status
  11. status.storage.replication.factor=1
  12. status.storage.partitions=3
  13. //刚刚下载连接器解压的路径
  14. plugin.path=/usr/soft/kafka/kafka_2.13_2.7.0/plugins
复制代码
看到设置中有三个Topic,分别是
  1. config.storage.topic:用以保存connector和task的配置信息,需要注意的是这个主题的分区数只能是1,而且是有多副本的。
  2. offset.storage.topic:用以保存offset信息。
  3. <span style="background-color: initial;">status.storage.topic:用以保存connetor的状态信息。</span>
复制代码
这些Topic可以不用创建,启动后会默认创建。
启动Kafka集群
  保存设置之后,将connect-distributed.properties分发到集群中,然后启动:
  1. bin/connect-distributed.sh config/connect-distributed.properties
复制代码
查抄是否启动
  connector支持REST API的方式举行管理,以是用Post man大概Fiddler可以调用相关接口举行管理。查抄是否启动:

不用奇怪,上面设置集群的IP是172段,这里的192.168.1.177还是我的集群中的一个服务器,由于服务器都利用了双网卡。由于还没有连接器相关设置,以是接口返回是一个空数组,接下来将新增一个连接器。
编写sqlserver-cdc-source.json
  1. {
  2.     "name": "sqlserver-cdc-source",
  3.     "config": {
  4.         "connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
  5.         "database.server.name" : "JnServer",
  6.         "database.hostname" : "172.192.20.2", --目标数据库的ip
  7.         "database.port" : "1433",  --目标数据库的端口
  8.         "database.user" : "sa",   --目标数据库的账号
  9.         "database.password" : "123456",  --密码
  10.         "database.dbname" : "Dis",  --目标数据库的数据库名称
  11.         "table.whitelist": "dbo.T_LioCDC", --监听表名
  12.          "schemas.enable" : "false",  
  13.          "mode":"incrementing",  --增量模式
  14.          "incrementing.column.name": "ID", --增量列名
  15.         "database.history.kafka.bootstrap.servers" : "172.192.10.210:9092,172.192.10.211:9092,172.192.10.212", --kafka集群
  16.         "database.history.kafka.topic": "TopicTLioCDC",  --kafka topic内部使用,不是由消费者使用
  17.         "value.converter.schemas.enable":"false",
  18.         "value.converter":"org.apache.kafka.connect.json.JsonConverter"
  19.     }
  20. }
  21. //源文地址: https://www.cnblogs.com/EminemJK/p/14688907.html
复制代码
另有其他额外的设置,可以参考官方文档。然后实行

继续实行查抄,就发现连接器已经乐成设置了:

其他API
  1. GET /connectors – 返回所有正在运行的connector名。
  2. POST /connectors – 新建一个connector; 请求体必须是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必须包含你的connector的配置信息。
  3. GET /connectors/{name} – 获取指定connetor的信息。
  4. GET /connectors/{name}/config – 获取指定connector的配置信息。
  5. PUT /connectors/{name}/config – 更新指定connector的配置信息。
  6. GET /connectors/{name}/status – 获取指定connector的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。
  7. GET /connectors/{name}/tasks – 获取指定connector正在运行的task。
  8. GET /connectors/{name}/tasks/{taskid}/status – 获取指定connector的task的状态信息。
  9. PUT /connectors/{name}/pause – 暂停connector和它的task,停止数据处理知道它被恢复。
  10. PUT /connectors/{name}/resume – 恢复一个被暂停的connector。
  11. POST /connectors/{name}/restart – 重启一个connector,尤其是在一个connector运行失败的情况下比较常用
  12. POST /connectors/{name}/tasks/{taskId}/restart – 重启一个task,一般是因为它运行失败才这样做。
  13. DELETE /connectors/{name} – 删除一个connector,停止它的所有task并删除配置。//源文地址: https://www.cnblogs.com/EminemJK/p/14688907.html
复制代码
查看Topic
  1. /usr/soft/kafka/kafka_2.13_2.7.0# bin/kafka-topics.sh --list --zookeeper localhost:2000
复制代码

TopicJnServer.dbo.T_LioCDC则是供我们消耗的主题,启动一个消耗者举行监听测试:
  1. bin/kafka-console-consumer.sh --bootstrap-server 172.192.10.210:9092  --consumer-property group.id=group1 --consumer-property client.id=consumer-1  --topic JnServer.dbo.T_LioCDC
复制代码
然后再源表举行一些列增删改操纵,
  1. --测试代码
  2. insert into T_LioCDC(name, sex, createtime,UpdateTime)  values ('A',1,getdate(),getdate())
  3. insert into T_LioCDC(name, sex, createtime,UpdateTime)  values ('B',0,getdate(),getdate())
  4. insert into T_LioCDC(name, sex, createtime,UpdateTime)  values ('C',1,getdate(),getdate())
  5. insert into T_LioCDC(name, sex, createtime,UpdateTime)  values ('D',0,getdate(),getdate())
  6. insert into T_LioCDC(name, sex, createtime,UpdateTime)  values ('E',1,getdate(),getdate())
  7. insert into T_LioCDC(name, sex, createtime,UpdateTime)  values ('F',1,getdate(),getdate())
  8. insert into T_LioCDC(name, sex, createtime,UpdateTime)  values ('G',0,getdate(),getdate())
  9. update T_LioCDC
  10. set Name='Lio.Huang',UpdateTime=getdate()
  11. where ID=7
复制代码

已经乐成捕捉到数据的变更,对比几个操纵Json,依次是insert、update、delete:

到此这篇关于SQL Server CDC配合Kafka Connect监听数据变革的文章就介绍到这了,更多相关SQL Server CDC监听数据变革内容请搜刮草根技能分享从前的文章或继续浏览下面的相关文章希望各人以后多多支持草根技能分享!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有帐号?立即注册

x

帖子地址: 

回复

使用道具 举报

分享
推广
火星云矿 | 预约S19Pro,享500抵1000!
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

草根技术分享(草根吧)是全球知名中文IT技术交流平台,创建于2021年,包含原创博客、精品问答、职业培训、技术社区、资源下载等产品服务,提供原创、优质、完整内容的专业IT技术开发社区。
  • 官方手机版

  • 微信公众号

  • 商务合作