分布式数据库同步系统otter之canal分析

李成凯

使用背景

随着点我达业务的迅速发展,异地多活的系统架构也非常的迫切。需要直面多机房之间的数据同步问题,而otter恰恰是这个问题的成熟的解决方案。otter是基于数据库增量日志解析的分布式数据库同步系统,能够提供准实时的数据同步方案。通过otter的manager的web模块,可以轻松管理多个otter集群,配置数据库表过滤以及字段映射等功能。接下来将对otter的canal模块进行详细地分析。

otter介绍

阿里巴巴公司因为业务的特性,卖家主要集中在国内,买家主要集中在国外,所以衍生出了杭州和美国异地机房的需求,同时为了提升用户体验,整个机房的架构为双A,两边均可写,由此诞生了otter这样一个产品。
定位: 基于数据库增量日志解析,准实时同步到本机房或异地机房的mysql/oracle数据库. 一个分布式数据库同步系统。
系统架构图:
在架构图中可以看到,首先通过canal从各个db中采集数据,然后通过s·e·t·l四个处理过程进行数据库同步。

canal介绍

canal是基于数据库增量日志解析,提供增量数据订阅&消费的系统,主要支持mysql数据库。cannal支持独立部署和内嵌使用两种模式。otter使用canal的内嵌方法获取数据库增量日志。

工作原理

mysql主从复制实现:
从上层来看,复制分成三步:
1.master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
2.slave将master的binary log events拷贝到它的中继日志(relay log);
3.slave重做中继日志中的事件,将改变反映它自己的数据。
canal工作原理:

1.canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
2.mysql master收到dump请求,开始推送binary log给slave(也就是canal)
3.canal解析binary log对象(原始为byte流)

mysql和canal配置

上文介绍canal获取mysql的binlog方式为模拟slave发送dump协议。因此要求mysql开启主从复制功能,且主从复制的模式为row模式,同时因为伪装成slave,必须要求canal的slaveId不能与其他从库的slaveId相同。在my.cnf配置文件中开启og-bin = mysql-binbinlog_format = ROW。还有要求必须提供一个具有复制权限的账号。
*以下是canal使用独立模式的配置方式,使用otter需要在manager的web页面进行配置 *
在canal的配置文件中配置:
//canal的slaveId,不能与其他从库相同
canal.instance.mysql.slaveId = xxxx
//配置连接主库的地址
Canal.instance.master.address=xxxx
//配置canal连接主库的账号
canal.instance.dbUsername=canal
//密码
canal.instance.dbPassword=canal
//需要获取binlog的数据库 canal.instance.defaultDatabaseName=test

binlog解析

通过以上配置就能启动一个canal,能对binlog进行获取。那么canal又是如何对binlog进行解析的呢?先来看mysql是如何记录binlog日志的。

mysql的binlog记录

1.首先在test库创建一张用户表

CREATE TABLE `user_info` (  
  `id` int(10) NOT NULL  COMMENT 'id',
  `username` varchar(255) DEFAULT NULL COMMENT '名称',
  `sex` varchar(10) DEFAULT NULL COMMENT '性别',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB  COMMENT='user';

2.在表中插入一条记录,然后进行更新

insert into `user_info`(`id`,`username`,`sex`) values (1,'lick','male');

update `user_info` set username = 'lick_test' where id=1;  

3.show master status;获取当前binlog记录文件以及位置。
可以看到当前的binlog文件为mysql-bin.000013,位置为18379.
4.在命令行中输入show binlog events in 'mysql-bin.000013';拉到最下获取最新的binlog events。
从上图可以看到binlog events记录了之前的创建表、插入和更新事件。接下来我们以更新数据event进行binlog分析。

binlog文件可以看成是各种event的集合,event就是事件,当然一次更新可能会产生多个event,event有很多种,常见的有QUERY_EVENT(记录一条query语句,在基于语句的复制和基于行的复制都会有,譬如begin事件)、ROTATE_EVENT(二进制日志更换一个新文件,可能是因为文件大小达到限制,或者是mysql服务器重启了)、XID_EVENT(commit事件)、WRITE_ROWS_EVENT,UPDATE_ROWS_EVENT,DELETE_ROWS_EVENT(统称为row event,只有在基于行的复制方式下才会产生)、TABLE_MAP_EVENT(row event之前产生,为的是对row event解析提供依据)。  

上图我们可以看到一个更新操作包含了四个events,Query、Tablemap、Updaterows、Xid。这里的Query是指begin事件,代表事物的开始,Tablemap用来描述表的结构为Updaterows解析提供数据结构信息,Xid代表事物的结束。接下来我们重点分析下Tablemap、Updaterows事件。
5.在mysql的data文件夹下找到mysql-bin.000013。

Table_map

执行hexdump -C mysql-bin.000013 -s 18219 -n 60
mysql官方文档对table_map的解释是

post-header:  
    if post_header_len == 6 {
  4              table id
    } else {
  6              table id
    }
  2              flags

payload:  
  1              schema name length
  string         schema name
  1              [00]
  1              table name length
  string         table name
  1              [00]
  lenenc-int     column-count
  string.var_len [length=$column-count] column-def
  lenenc-str     column-meta-def
  n              NULL-bitmask, length: (column-count + 8) / 7

其中前4个字节为公共头,后15个字节为event header信息,暂不分析
0x77 00 00 00 00 00:table id
0x0100:flags
0x04:schema name length
0x74 65 73 74:schema name,即test
0x00:占位符,空白字节
0x09:table name length,即表名长度
0x75 73 65 72 5f 69 6e 66 6f:table name,即user_info
0x00:占位符,空白字节
0x03: column-count,即表的列数,当前表有三个字段
0x03、0x0f、0x0f分别代表三个字段的字段类型

0x03是MYSQL_TYPE_LONG,  
0x0f是MYSQL_TYPE_VARCHAR,  
0xfe是MYSQL_TYPE_STRING,  
0xfc是MYSQL_TYPE_BLOB;  

0x04:是metadata的开头,表明总共有4个字节是metadata。由于MYSQLTYPELONG是不占字节空间的,所以metadata只有四个字节。
0xff 00:表示字段username的长度,即255
0x0a 00:表示字段sex的长度,即10
0x06:表示掩码,代表不为空的字段位置。其二进制为110,代表只有id字段不为空。
table_map解析完成

Update_rows

执行hexdump -C mysql-bin.000013 -s 18277 -n 100 mysql官方文档对table_map的解释是

header:  
  if post_header_len == 6 {
4                    table id  
  } else {
6                    table id  
  }
2                    flags  
  if version == 2 {
2                    extra-data-length  
string.var_len       extra-data  
  }

body:  
lenenc_int           number of columns  
string.var_len       columns-present-bitmap1, length: (num of columns+7)/8  
  if UPDATE_ROWS_EVENTv1 or v2 {
string.var_len       columns-present-bitmap2, length: (num of columns+7)/8  
  }

rows:  
string.var_len       nul-bitmap, length (bits set in 'columns-present-bitmap1'+7)/8  
string.var_len       value of each field as defined in table-map  
  if UPDATE_ROWS_EVENTv1 or v2 {
string.var_len       nul-bitmap, length (bits set in 'columns-present-bitmap2'+7)/8  
string.var_len       value of each field as defined in table-map  
  }
  ... repeat rows until event-end

其中前4个字节为公共头,后15个字节为event header信息,暂不分析
0x77 00 00 00 00 00:table id
0x01 00:flags
0x02 00:extra-data-length
0x03:number of columns,即表字段数量
0xff:columns-present-bitmap1,代表在该event中每列是否有数据,如果没有数据设置为0。外部解析的时候会把这个字段设置为null。ff代表所有字段都有数据
0xff:columns-present-bitmap2,同上
[以下为变化前数据的值]
0xf8:nul-bitmap,代表一行数据中哪些字段为null,0xf8末尾是1000,表示这三个字段都不为null
0x01 00 00 00:因为id字段是long类型的,不用表示占位,直接读取4个字节的数据,表示id为1
0x04:代表第二列username的占用字节为4个字节
0x6c 69 63 6b:代表username的值,即lick
0x04:代表第三列sex的占用字节长度
0x6d 61 6c 65:代表sex的值,即male
[以下为变化后数据的值]
0xf8:nul-bitmap,同上
0x01 00 00 00:表示id为1
0x09:代表第二列username的占用字节为9个字节
0x6c 69 63 6b 5f 74 65 73 74:代表username的值,即lick_test
0x04:代表第三列sex的占用字节长度
0x6d 61 6c 65:代表sex的值,即male
Update_rows event解析完成

canal的binlog解析

上文完成了mysql的binlog解析,那么cannal是如何将二进制的binlog转换成内部类的呢?追踪canal源码,我们会看到: 1.MysqlConnection类中的dump方法对主库的binlog增量信息进行获取。调用LogDecoder类对二进制数据进行初步解析,根据event的类型不同封装成不同的LogEvent子类,并将字段存储形式改写成google protobuf支持的格式进行存储。
2.利用binlogParser解析器,将LogEvent类中用protobuf转换成Canal.Entry
3.最后封装成Message类,用于多模块之间传输和解析。在otter的selectTask模块对canal的信息进行获取。

protobuf

Protocol Buffers 是一种轻便高效的结构化数据存储格式,可以用于结构化数据串行化,或者说序列化。它很适合做数据存储或 RPC 数据交换格式。可用于通讯协议、数据存储等领域的语言无关、平台无关、可扩展的序列化结构数据格式。
篇幅原因在此就不进行具体介绍,可以百度搜索google protobuf。
Google Protocol Buffer 的使用和原理

canal模块介绍

canal源码的目录结构:
common模块:主要是提供了一些公共的工具类和接口。
client模块:canal的客户端。核心接口为CanalConnector
example模块:提供client模块使用案例。
protocol模块:client和server模块之间的通信协议
deployer模块。通过该模块提供的CanalLauncher来启动canal server
server模块:canal服务器端。核心接口为CanalServer
instance模块:一个server有多个instance。每个instance都会模拟成一个mysql实例的slave。instance模块有四个核心组成部分:parser模块、sink模块、store模块,meta模块。核心接口为CanalInstance
parser模块:数据源接入,模拟slave协议和master进行交互,协议解析。parser模块依赖于dbsync、driver模块。
driver模块和dbsync模块:从这两个模块的artifactId(canal.parse.driver、canal.parse.dbsync),就可以看出来,这两个模块实际上是parser模块的组件。事实上parser 是通过driver模块与mysql建立连接,从而获取到binlog。由于原始的binlog都是二进制流,需要解析成对应的binlog事件,这些binlog事件对象都定义在dbsync模块中,dbsync 模块来自于淘宝的tddl。
sink模块:parser和store链接器,进行数据过滤,加工,分发的工作。核心接口为CanalEventSink
store模块:数据存储。核心接口为CanalEventStore
meta模块:增量订阅&消费信息管理器,核心接口为CanalMetaManager,主要用于记录canal消费到的mysql binlog的位置

小结

canal模块功能比较简单,主要功能是从数据库中获取binlog并进行解析。因此在本文中重点描述了binlog的结构,并没有源码上讲解如何解析。如果有感兴趣的朋友也可以翻看源码进行一一对照。canal仅是otter进行数据同步的第一步,接下来也会带来更多otter其他模块的分析,若有错误的地方还请指正。

参考文章

otter介绍(github)
canal介绍(github)