阿里云canal订阅mysql的binlog日志实现数据和表结构实时同步

1.开启mysql binlog日志 安装路径下的my.ini文件添加配置

log-bin=mysql-bin #开启日志

binlog-format=ROW #选择row模式

server_id=1 

开启日志需要重启mysql服务后生效

2.下载canal 地址:https://github.com/alibaba/canal/releases

修改配置conf/canal.properties 全局配置,设置自己的数据库

#################################################  
## mysql serverId  
canal.instance.mysql.slaveId = 1234  


# position info,需要改成自己的数据库信息  
canal.instance.master.address = 127.0.0.1:3306   
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =


#canal.instance.standby.address =   
#canal.instance.standby.journal.name =  
#canal.instance.standby.position =   
#canal.instance.standby.timestamp =   


# username/password,需要改成自己的数据库信息  
canal.instance.dbUsername = root
canal.instance.dbPassword = root
canal.instance.defaultDatabaseName = canneltest  
canal.instance.connectionCharset = UTF-8  


# table regex  
canal.instance.filter.regex = .*\\..* 

#################################################

配置slave,slave可以配置成订阅单表日志,也可以配置订阅多表,或所有的表的日志,也可以配置多个slave,只需修改canal.instance.filter.regex 参数,我这里订阅了所有的表日志

conf\example路径下的

#################################################
## mysql serverId
canal.instance.mysql.slaveId = 1234


# position info
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 


#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 


# username/password
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =canneltest
canal.instance.connectionCharset = UTF-8


# table regex
canal.instance.filter.regex = .*\\..*
# table black regex
canal.instance.filter.black.regex =  


#################################################

然后用程序需要订阅解析日志,引入相应版本的jar,我这里是1.0.24

<dependency>
   <groupId>com.alibaba.otter</groupId>
   <artifactId>canal.client</artifactId>
   <version>1.0.24</version>
</dependency>
public static void main(String[] args) throws InterruptedException {

    // 链接canal
    CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111),
            "example", "", "");
    connector.connect();

    // 开启订阅日志
    connector.subscribe();

    // 循环订阅
    while (true) {
        try {
            // 每次读取 1000 条
            Message message = connector.getWithoutAck(1000);
            System.out.println(message);
            long batchID = message.getId();

            int size = message.getEntries().size();

            if (batchID == -1 || size == 0) {
                Thread.sleep(1000); // 没有数据
            } else {
                System.out.println("数据进入===>"+message);
            }

            connector.ack(batchID);

        } catch (Exception e) {
            // TODO: handle exception

        } finally {
            Thread.sleep(1000);
        }
    }
}
可以打印解析message生成sql,集成kafka提供生产,供订阅同步数据


文章来源: 阿里云canal订阅mysql的binlog日志实现数据和表结构实时同步

人吐槽 人点赞

猜你喜欢

发表评论

用户名: 密码:
验证码: 匿名发表

你可以使用这些语言

查看评论:阿里云canal订阅mysql的binlog日志实现数据和表结构实时同步