阿里Canal框架数据库同步-实战教程

一、Canal简介:

  canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志剖析,提供增量数据订阅&消费,现在主要支持了MySQL(也支持mariaDB)。

二、靠山先容:

  早期,阿里巴巴B2B公司由于存在杭州和美国双机房部署,存在跨机房同步的营业需求。不外早期的数据库同步营业,主要是基于trigger的方式获取增量调换,不外从2010年最先,阿里系公司最先逐步的实验基于数据库的日志剖析,获取增量调换举行同步,由此衍生出了增量订阅&消费的营业,今后开启了一段新纪元。

三、适用场景:

  在一些庞大的营业逻辑中,可能插入或者查询数据都对照频仍,若是一直在数据库插入查询会造成速率异常慢,可以把数据库表分成两个库,一个库用来做查询,一个库作为插入数据,读写星散,怎么解决呢?就可以用canal框架来监听数据是否发生改变,来同步数据。

好比大部分人都做搜索引擎ES,咱们不可能每次数据库更新了数据手动去同步索引库,咱们就可以用Canal来监听数据库增删改时去重新导入索引库,保持数据一致性。

四、Canal的事情机制

  阿里Canal框架数据库同步-实战教程

 

 

 

复制历程分成三步:

(1) Master主库将改变纪录,写到二进制日志(binary log)中(这些纪录叫做二进制日志事宜,binary log events,可以通过show binlog events举行查看);

(2) Slave从库向mysql master发送dump协议,将master主库的binary log events拷贝到它的中继日志(relay log);

(3) Slave从库读取并重做中继日志中的事宜,将改变的数据同步到自己的数据库。

四、Canal中间件功效

基于纯java语言开发,可以用于做增量数据订阅和消费功效。

相比于传统的数据同步,我们通常需要举行先搭建主从架构,然后使用binlog日志举行读取,然后指定需要同步的数据库,数据库表等信息。然则随着我们营业的不停庞大,这种传统的数据同步方式以及最先变得较为繁琐,不够天真。

canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议mysql master收到dump请求,最先推送binary log给slave(也就是canal),canal剖析binary log工具(原始为byte流),通过对binlog数据举行剖析即可获取需要同步的数据,在举行同步数据的历程中还可以加入开发人员的一些分外逻辑处置,对照开放。

Binlog的三种基本类型分别为:

STATEMENT模式只纪录了sql语句,然则没有纪录上下文信息,在举行数据恢复的时刻可能会导致数据的丢失情形

ROW模式除了纪录sql语句之外,还会纪录每个字段的转变情形,能够清晰的纪录每行数据的转变历史,然则会占用较多的空间,需要使用mysqlbinlog工具举行查看。

MIX模式对照天真的纪录,例如说当遇到了表结构调换的时刻,就会纪录为statement模式。当遇到了数据更新或者删除情形下就会变为row模式

五、安装Canal

接口测试,接口协议以及常用接口测试工具详解

1.准备事情:win10系统、jdk1.8、mysql5.7、canal1.1.1

 2.毗邻自己的数据,检查binlog功效是否有开启,检查下令:show variables like ‘log_bin’;

阿里Canal框架数据库同步-实战教程

 

 3.若是显示状态为OFF示意该功效未开启,就需要找到自己安装的Mysql位置找到my.ini文件,在此文件的最下面一行加上如下(注重:保留文件后重启下自己的Mysql数据库):

1 server-id=1  #不能与canal的slaveId重复即可
2 log-bin=mysql-bin 3 binlog_format = ROW #设置ROW模式

4.再次查看binlog功效是否有开启,检查下令:show variables like ‘log_bin’;

阿里Canal框架数据库同步-实战教程

 

 5.我们需要建立一个用户操作数据库的写入操作,我们需要给用户权限,执行如下sql语句:

1 CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
2 GRANT SHOW VIEW, SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; 3 FLUSH PRIVILEGES;

6.下载我canal客户端,官方地址举行响应版本的安装包举行下载(注重:若是下载翻到本文最下面联系我): https://github.com/alibaba/canal/releases

阿里Canal框架数据库同步-实战教程

 

 

7.下载乐成后,解压压缩包后进入conf下面的example目录下面的instance.properties文件打开编辑如下地方:

阿里Canal框架数据库同步-实战教程

 

 8.返回bin目录点击startup.bat启动canal服务端,如下图示意启动乐成:

阿里Canal框架数据库同步-实战教程

 

 六、java代码实现

1.新建一个maven项目,导入maven jar包如下:

1 <dependency>
2     <groupId>com.alibaba.otter</groupId>
3     <artifactId>canal.client</artifactId>
4     <version>1.1.0</version>
5 </dependency>

2.编写测试代码

  1 package com.fuzongle.canal.conf;
  2 
  3 import com.alibaba.otter.canal.client.CanalConnector; 4 import com.alibaba.otter.canal.client.CanalConnectors; 5 import com.alibaba.otter.canal.protocol.CanalEntry; 6 import com.alibaba.otter.canal.protocol.CanalEntry.Column; 7 import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; 8 import com.alibaba.otter.canal.protocol.CanalEntry.EventType; 9 import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; 10 import com.alibaba.otter.canal.protocol.Message; 11 import com.google.protobuf.InvalidProtocolBufferException; 12 13 import java.net.InetSocketAddress; 14 import java.util.List; 15 import java.util.Queue; 16 import java.util.concurrent.ConcurrentLinkedQueue; 17 /** 18 * @Auther: fzl 19 * @Date: 2020/4/20 01:21 20 * @Description: 21 */ 22 public class TestCanal { 23 24 private static Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>(); 25 26 public static void main(String[] args) { 27 //获取canalServer毗邻:本机地址,端口号 28 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 29 11111), "example", "", ""); 30 int batchSize = 1000; 31 try { 32 //毗邻canalServer 33  connector.connect(); 34 //订阅Desctinstion 35  connector.subscribe(); 36  connector.rollback(); 37 try { 38 while (true) { 39 //实验从master那里拉去数据batchSize条纪录,有若干取若干 40 //轮询拉取数据 上面的where 41 Message message = connector.getWithoutAck(batchSize); 42 long batchId = message.getId(); 43 int size = message.getEntries().size(); 44 if (batchId == -1 || size == 0) { 45 //睡眠 46 Thread.sleep(1000); 47 } else { 48  dataHandle(message.getEntries()); 49  } 50  connector.ack(batchId); 51 System.out.println("aa"+size); 52 //当行列内里聚积的sql大于一定数值的时刻就模拟执行 53 if (SQL_QUEUE.size() >= 10) { 54  executeQueueSql(); 55  } 56  } 57 } catch (InterruptedException e) { 58  e.printStackTrace(); 59 } catch (InvalidProtocolBufferException e) { 60  e.printStackTrace(); 61  } 62 } finally { 63  connector.disconnect(); 64  } 65 66 67  } 68 69 70 71 72 /** 73 * 模拟执行行列内里的sql语句 74 */ 75 public static void executeQueueSql() { 76 int size = SQL_QUEUE.size(); 77 for (int i = 0; i < size; i++) { 78 String sql = SQL_QUEUE.poll(); 79 System.out.println("[sql]----> " + sql); 80  } 81  } 82 83 /** 84 * 数据处置 85 * 86 * @param entrys 87 */ 88 private static void dataHandle(List<CanalEntry.Entry> entrys) throws InvalidProtocolBufferException { 89 for (CanalEntry.Entry entry : entrys) { 90 if (EntryType.ROWDATA == entry.getEntryType()) { 91 RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); 92 CanalEntry.EventType eventType = rowChange.getEventType(); 93 if (eventType == EventType.DELETE) { 94  saveDeleteSql(entry); 95 } else if (eventType == EventType.UPDATE) { 96  saveUpdateSql(entry); 97 } else if (eventType == CanalEntry.EventType.INSERT) { 98  saveInsertSql(entry); 99  } 100  } 101  } 102  } 103 104 /** 105 * 保留更新语句 106 * 107 * @param entry 108 */ 109 private static void saveUpdateSql(CanalEntry.Entry entry) { 110 try { 111 RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); 112 List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); 113 for (CanalEntry.RowData rowData : rowDatasList) { 114 List<Column> newColumnList = rowData.getAfterColumnsList(); 115 StringBuffer sql = new StringBuffer("update " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " set "); 116 for (int i = 0; i < newColumnList.size(); i++) { 117 sql.append(" " + newColumnList.get(i).getName() 118 + " = '" + newColumnList.get(i).getValue() + "'"); 119 if (i != newColumnList.size() - 1) { 120 sql.append(","); 121  } 122  } 123 sql.append(" where "); 124 List<Column> oldColumnList = rowData.getBeforeColumnsList(); 125 for (Column column : oldColumnList) { 126 if (column.getIsKey()) { 127 //暂时只支持单一主键 128 sql.append(column.getName() + "=" + column.getValue()); 129 break; 130  } 131  } 132  SQL_QUEUE.add(sql.toString()); 133  } 134 } catch (InvalidProtocolBufferException e) { 135  e.printStackTrace(); 136  } 137  } 138 139 /** 140 * 保留删除语句 141 * 142 * @param entry 143 */ 144 private static void saveDeleteSql(CanalEntry.Entry entry) { 145 try { 146 RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); 147 List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); 148 for (CanalEntry.RowData rowData : rowDatasList) { 149 List<Column> columnList = rowData.getBeforeColumnsList(); 150 StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " where "); 151 for (Column column : columnList) { 152 if (column.getIsKey()) { 153 //暂时只支持单一主键 154 sql.append(column.getName() + "=" + column.getValue()); 155 break; 156  } 157  } 158  SQL_QUEUE.add(sql.toString()); 159  } 160 } catch (InvalidProtocolBufferException e) { 161  e.printStackTrace(); 162  } 163  } 164 165 /** 166 * 保留插入语句 167 * 168 * @param entry 169 */ 170 private static void saveInsertSql(CanalEntry.Entry entry) { 171 try { 172 RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); 173 List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); 174 for (CanalEntry.RowData rowData : rowDatasList) { 175 List<Column> columnList = rowData.getAfterColumnsList(); 176 StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " ("); 177 for (int i = 0; i < columnList.size(); i++) { 178 sql.append(columnList.get(i).getName()); 179 if (i != columnList.size() - 1) { 180 sql.append(","); 181  } 182  } 183 sql.append(") VALUES ("); 184 for (int i = 0; i < columnList.size(); i++) { 185 sql.append("'" + columnList.get(i).getValue() + "'"); 186 if (i != columnList.size() - 1) { 187 sql.append(","); 188  } 189  } 190 sql.append(")"); 191  SQL_QUEUE.add(sql.toString()); 192  } 193 } catch (InvalidProtocolBufferException e) { 194  e.printStackTrace(); 195  } 196  } 197 }

 3.若是数据库值发生改变之后会触发增删改,咱们可以拿到这个数据插入到其他数据库中。

阿里Canal框架数据库同步-实战教程

 

原创文章,作者:2d28新闻网,如若转载,请注明出处:https://www.2d28.com/archives/5461.html