canal

canal

起男 826 2021-10-24

canal-上手

canal是阿里巴巴旗下的一款开源项目,基于java开发。基于数据库增量日志解析,提供增量数据订阅&消费

原理

canal是基于MySQL的主从同步来实现的

主从同步原理:

  • mysql master将数据变更写入日志(binary log),其中纪录的数据叫做binary log events
  • mysql slave将master的binary log events拷贝到它的中继日志(relay log)
  • mysql slave重放relay log种的事件,将数据变更反映它自己的数据

canal就是把自己伪装成mysql的一个slave节点,从而监听master的binary log变化。再把得到的变化信息通知给canal的客户端,进而完成对其它数据库的同步

使用

  1. 开启binlog

    server-id = 200				#必须唯一
    log_bin = mysql-bin 		#开启及设置二进制日志文件名称
    binlog_format = MIXED
    sync_binlog = 1
    expire_logs_days =7			#二进制日志自动删除/过期的天数。默认值为0,表示不自动删除。
    
    binlog-do-db = canal 		#要同步的数据库 
    
    binlog-ignore-db = mysql 	#不需要同步的数据库 
    binlog_ignore_db = information_schema
    binlog_ignore_db = performation_schema
    binlog_ignore_db = sys
    
  2. 设置用户权限

    CREATE USER canal@'%' IDENTIFIED BY 'canal';
    
    GRANT SELECT,replication SLAVE,replication client,super ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
    
    FLUSH PRIVILEGES;
    
  3. 查看状态:SHOW MASTER STATUS

  4. 安装并启动canal服务

    docker run -p 11111:11111 --name canal \
    -e canal.destinations=canals \       集群名称
    -e canal.instance.master.address=mysql:3306  \     MySQL地址
    -e canal.instance.dbUsername=canal  \  
    -e canal.instance.dbPassword=canal  \
    -e canal.instance.connectionCharset=UTF-8 \
    -e canal.instance.tsdb.enable=true \
    -e canal.instance.gtidon=false  \
    -e canal.instance.filter.regex=canal\\..* \   库名称
    --network wl \   网络名称
    -d canal/canal-server:v1.1.5
    
  5. 监听canal:canal提供了各种语言的客户端,当canal监听到binlog变化时,会通知canal的客户端

第三方客户端

  1. 导入依赖

            <dependency>
                <groupId>top.javatool</groupId>
                <artifactId>canal-spring-boot-starter</artifactId>
                <version>1.2.1-RELEASE</version>
            </dependency>
    
  2. 编写配置 文件

    canal:
      destination:   #实例名称
      server:  #canal地址
    
  3. 编写监听器

    @CanalTable("t_user") //指定监听哪张表
    @Component
    @Slf4j
    public class UserHandler implements EntryHandler<User> {
    
        @Override
        public void insert(User user) {
            log.info("新增数据:"+user);
        }
    
        @Override
        public void update(User before, User after) {
            log.info("修改前数据:"+before);
            log.info("修改后数据:"+after);
        }
    
        @Override
        public void delete(User user) {
            log.info("新增数据:"+user);
        }
    }
    
  4. 编写实体类和表的映射关系

    @Data
    public class User {
        @Id
        private Integer id;
        @Column(name = "name")
        private String name;
        private String password;
        private Integer age;
    
        @Transient
        private Integer sex; //不需要的字段
    }