canal-上手
canal是阿里巴巴旗下的一款开源项目,基于java开发。基于数据库增量日志解析,提供增量数据订阅&消费
原理
canal是基于MySQL的主从同步来实现的
主从同步原理:
- mysql master将数据变更写入日志(binary log),其中纪录的数据叫做binary log events
- mysql slave将master的binary log events拷贝到它的中继日志(relay log)
- mysql slave重放relay log种的事件,将数据变更反映它自己的数据
canal就是把自己伪装成mysql的一个slave节点,从而监听master的binary log变化。再把得到的变化信息通知给canal的客户端,进而完成对其它数据库的同步
使用
-
开启binlog
server-id = 200 #必须唯一 log_bin = mysql-bin #开启及设置二进制日志文件名称 binlog_format = MIXED sync_binlog = 1 expire_logs_days =7 #二进制日志自动删除/过期的天数。默认值为0,表示不自动删除。 binlog-do-db = canal #要同步的数据库 binlog-ignore-db = mysql #不需要同步的数据库 binlog_ignore_db = information_schema binlog_ignore_db = performation_schema binlog_ignore_db = sys
-
设置用户权限
CREATE USER canal@'%' IDENTIFIED BY 'canal'; GRANT SELECT,replication SLAVE,replication client,super ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal'; FLUSH PRIVILEGES;
-
查看状态:SHOW MASTER STATUS
-
安装并启动canal服务
docker run -p 11111:11111 --name canal \ -e canal.destinations=canals \ 集群名称 -e canal.instance.master.address=mysql:3306 \ MySQL地址 -e canal.instance.dbUsername=canal \ -e canal.instance.dbPassword=canal \ -e canal.instance.connectionCharset=UTF-8 \ -e canal.instance.tsdb.enable=true \ -e canal.instance.gtidon=false \ -e canal.instance.filter.regex=canal\\..* \ 库名称 --network wl \ 网络名称 -d canal/canal-server:v1.1.5
-
监听canal:canal提供了各种语言的客户端,当canal监听到binlog变化时,会通知canal的客户端
第三方客户端
-
导入依赖
<dependency> <groupId>top.javatool</groupId> <artifactId>canal-spring-boot-starter</artifactId> <version>1.2.1-RELEASE</version> </dependency>
-
编写配置 文件
canal: destination: #实例名称 server: #canal地址
-
编写监听器
@CanalTable("t_user") //指定监听哪张表 @Component @Slf4j public class UserHandler implements EntryHandler<User> { @Override public void insert(User user) { log.info("新增数据:"+user); } @Override public void update(User before, User after) { log.info("修改前数据:"+before); log.info("修改后数据:"+after); } @Override public void delete(User user) { log.info("新增数据:"+user); } }
-
编写实体类和表的映射关系
@Data public class User { @Id private Integer id; @Column(name = "name") private String name; private String password; private Integer age; @Transient private Integer sex; //不需要的字段 }