马上注册,结交更多好友,享用更多功能,让你轻松玩转人网
您需要 登录 才可以下载或查看,没有账号?免费注册
x
这篇文章主要介绍了php使用Canal监听msyql的全过程,文中通过图文和代码示例讲解的非常详细,对大家的学习或工资有一定的帮助,需要的朋友可以参考下
− 目录

安装Java
- #创建目录
- mkdir -p /usr/local/java/
- #解压到目录
- tar zxvf jdk-8u411-linux-x64.tar.gz -C /usr/local/java/
复制代码
配置环境变量在 /etc/profile 最后加入
- export JAVA_HOME=/usr/local/java/jdk1.8.0_411
- export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
- export PATH=$PATH:$JAVA_HOME/bin
复制代码
使之生效
查看是否安装成功
设置mysql用户权限
- #创建用户名和密码都为 canal 的用户
- create user 'canal'@'%' identified by 'canal';
- #授予该用户对所有数据库和表的查询、复制主节点数据的操作权限
- GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
- FLUSH PRIVILEGES; #重新加载权限
复制代码
修改mysql配置
修改部分内容如下
- # 开启 binlog
- log-bin=mysql-bin
- #master端的ID号,不能和 canal 的 slaveId 重复;
- server-id=1
- #行级,记录每次操作后每行记录的变化。
- binlog-format=row
- #指定库,缩小监控的范围。
- binlog-do-db=test
复制代码 查看是否开启主从
安装canal下载最新canal 
[size=1em]- 下载
- wget https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz
- #创建canal目录
- mkdir -p /usr/local/canal/
- #解压到canal目录
- tar -zxvf canal.deployer-1.1.7.tar.gz -C /usr/local/canal/
复制代码 查看canal主配置文件
[size=1em]- cat /usr/local/canal/conf/canal.properties
复制代码
[size=1em]查看会看到暴露了三个端口和指定了一个实例
[size=1em]- canal.admin.port = 11110
- canal.port = 11111
- canal.metrics.pull.port = 11112
- #多个实例使用逗号分隔: canal.destinations = example1,example2,
- #这里对应/usr/local/canal/conf/example/
- canal.destinations = example
复制代码
[size=1em]canal.destinations:canal能可以收集多个MySQL数据库数据,每个MySQL数据库都有独立的配置文件控制。具体配置规则: conf/目录下,使用文件夹放置,文件夹名代表一个MySQL实例。canal.destinations用于配置需要监控数据的数据库。如果是多个,使用,隔开。
修改实例配置文件
修改实例配置
- vim /usr/local/canal/conf/example/instance.properties
复制代码
在文件最后加入
- #配置 slaveId ,不能等于 mysql 配置里的 server Id 即可
- canal.instance.mysql.slaveId=10
- #数据库连接
- canal.instance.master.address=127.0.0.1:3306
- #数据库账号密码
- canal.instance.dbUsername=canal
- canal.instance.dbPassword=canal
- #代表数据库的编码方式
- canal.instance.connectionCharset = UTF-8
- #设置白名单,这里是监控test库下所有表
- canal.instance.filter.regex=test\\..*
- #设置黑名单,这里设置排除test库下以_noc结尾的表
- canal.instance.filter.black.regex=test\\..*_noc
复制代码
这个正则表达式 test\.*_noc 的含义是:
- test:精确匹配字符串 "test"。
- \\.:匹配一个点(.),因为点在正则表达式中有特殊含义,所以需要使用 \\ 进行转义。
- *:匹配零个或多个任意字符。
- _noc:精确匹配字符串 "_noc"。
- 因此,这个正则表达式可以用来匹配所有以 test. 开头且以 _noc 结尾的字符串。在 Canal 的配置中使用这个正则表达式,就可以实现排除以 test. 开头且以 _noc 结尾的数据库和表,而监控其他数据库和表。
复制代码比如我们要查询 test库下users表id为3的用户信息,sql语句可以这么写
SELECT * FROM test.users WHERE id = 3;
这里的test.users就是正则要匹配的地方
[size=1em]- 如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false
复制代码
[size=1em]启动和停止#启动 - /usr/local/canal/bin/startup.sh
复制代码- /usr/local/canal/bin/startup.sh
复制代码#停止 - /usr/local/canal/bin/stop.sh
复制代码
查看是否启动成功
php测试使用 canal-php. - composer require xingwenge/canal_php
复制代码
新建index.php文件 - <?php
- require __DIR__.'/vendor/autoload.php';
- use xingwenge\canal_php\CanalClient;
- use xingwenge\canal_php\CanalConnectorFactory;
- use xingwenge\canal_php\Fmt;
- try {
- $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE);
- # $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SWOOLE);
- $client->connect("127.0.0.1", 11111);
- $client->checkValid();
- //设置过滤tes库t下的所有表
- $client->subscribe("1001", "example", "test.*");
- while (true) {
- $message = $client->get(100);
- if ($entries = $message->getEntries()) {
- foreach ($entries as $entry) {
- Fmt::println($entry);
- }
- }
- sleep(1);
- }
- $client->disConnect();
- } catch (\Exception $e) {
- echo $e->getMessage(), PHP_EOL;
- }
复制代码
运行php文件
eventType:1、是新增行,2、修改行,3、删除行、4、新增表 也可以直接将数据写入rabbitmq 修改conf/canal.properties [size=1em]- vim conf/canal.properties
复制代码
[size=1em]修改RabbitMQ配置如下
[size=1em]- rabbitmq.host = 127.0.0.1
- rabbitmq.virtual.host = /
- #交换机名称
- rabbitmq.exchange =exchange.canal
- #队列名称
- rabbitmq.queue = canal_queue
- #路由键名
- rabbitmq.routingKey = canal-routing-key
- #账号密码
- rabbitmq.username =admin
- rabbitmq.password =admin
- #路由模式,这里是严格模式
- rabbitmq.deliveryMode =direct
复制代码
[size=1em]修改实例配置conf/example/instance.properties
[size=1em]- vim conf/example/instance.properties
复制代码
[size=1em]修改部分内容如下
[size=1em]- #rabbitmq的路由键配置
- #这里与canal.properties里的rabbitmq.routingKey一样
- canal.mq.topic=canal-routing-key
- canal.mq.partition=0
- canal.instance.multi.stream.on=false
- #rabbitmq 的 routing key
- #dynamic topic route by schema or table regex
- #canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
- #hash partition config
- #canal.mq.enableDynamicQueuePartition=false
- #canal.mq.partitionsNum=3
- #canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
- #canal.mq.partitionHash=test.table:id^name,.*\\..*
复制代码
|