现在遇到一个整体项目:源端数据:MySQL rds, oracle 目的端 两个不同的oracle 这种多订阅多消费端的架构,在这里我想到了用使用消息队列kafka。
首先考虑的是MySQL rds的问题,找了很多解决方案,最终决定使用canal来做。整体架构是这样:
mysql----->canal-server------>kafka----多个自定义消费端
这里对接oracle目的端和hbase 目的端用了两个 canal-adapter来做消费
canal-server配置
修改instance 配置文件 vi conf/rds_test/instance.properties
# 按需修改成自己的数据库信息
#################################################
...
canal.instance.master.address=192.168.1.20:3306
# username/password,数据库的用户名和密码
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.connectionCharset = UTF-8
canal.instance.defaultDatabaseName=test
# rds oss binlog
canal.instance.rds.accesskey= rds accesskey
canal.instance.rds.secretkey= rds secretkey
canal.instance.rds.instanceId= rds实例id
...
# mq config
canal.mq.topic=rds_test
# 针对库名或者表名发送动态topic
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#库名.表名: 唯一主键,多个表之间用逗号分隔
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
#################################################
对应ip 地址的MySQL 数据库需进行相关初始化与设置, 可参考 Canal QuickStart
修改canal 配置文件vi /usr/local/canal/conf/canal.properties
# ...
# 可选项: tcp(默认), kafka, RocketMQ
canal.serverMode = kafka
# ...
# kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092
canal.mq.servers = 127.0.0.1:9092
canal.mq.retries = 0
# flagMessage模式下可以调大该值, 但不要超过MQ消息体大小上限
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
# flatMessage模式下请将该值改大, 建议50-200
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
canal.mq.canalGetTimeout = 100
# 是否为flat json格式对象
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
# kafka消息投递是否使用事务
canal.mq.transaction = false
canal-adapter配置
总配置文件 application.yml
adapter定义配置部分
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: kafka # tcp kafka rocketMQ
# canalServerHost: 127.0.0.1:11111
# zookeeperHosts: slave1:2181
mqServers: 127.0.0.1:9092 #or rocketmq
# flatMessage: true
batchSize: 500
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
canalAdapters:
- instance: rds_test # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: rdb
key: oracle1
properties:
jdbc.driverClassName: oracle.jdbc.OracleDriver
jdbc.url: jdbc:oracle:thin:@192.168.0.131:1521/orcl
jdbc.username: system
jdbc.password: manager
- groupId: g2
outerAdapters:
- name: rdb
key: oracle2
properties:
jdbc.driverClassName: oracle.jdbc.OracleDriver
jdbc.url: jdbc:oracle:thin:@192.168.0.131:1521/orcl
jdbc.username: system
jdbc.password: manager
说明:
一份数据可以被多个group同时消费, 多个group之间会是一个并行执行, 一个group内部是一个串行执行多个outerAdapters, 比如例子中logger和hbase
目前client adapter数据订阅的方式支持两种,直连canal server 或者 订阅kafka/RocketMQ的消息
配置 ./rdb/rds_test0.yml 和./rdb/rds_test1.yml
dataSourceKey: defaultDS
destination: rds_test
groupId: g1
outerAdapterKey: oracle1
concurrent: true
dbMapping:
database: test
table: c1c
targetTable: c1c
targetPk:
id: id
mapAll: true
targetColumns:
id:
name:
## Mirror schema synchronize config
#dataSourceKey: defaultDS
#destination: example
#groupId: g1
#outerAdapterKey: mysql1
#concurrent: true
#dbMapping:
# mirrorDb: true
# database: mytest
然后分别启动server和adpder就可以了