现在遇到一个整体项目:源端数据:MySQL rds, oracle 目的端 两个不同的oracle 这种多订阅多消费端的架构,在这里我想到了用使用消息队列kafka。
首先考虑的是MySQL rds的问题,找了很多解决方案,最终决定使用canal来做。整体架构是这样:

mysql----->canal-server------>kafka----多个自定义消费端
这里对接oracle目的端和hbase 目的端用了两个 canal-adapter来做消费

canal-server配置

修改instance 配置文件 vi conf/rds_test/instance.properties

#  按需修改成自己的数据库信息
#################################################
...
canal.instance.master.address=192.168.1.20:3306
# username/password,数据库的用户名和密码
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.connectionCharset = UTF-8
canal.instance.defaultDatabaseName=test
# rds oss binlog
canal.instance.rds.accesskey=  rds accesskey
canal.instance.rds.secretkey=  rds secretkey
canal.instance.rds.instanceId= rds实例id
...
# mq config
canal.mq.topic=rds_test
# 针对库名或者表名发送动态topic
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#库名.表名: 唯一主键,多个表之间用逗号分隔
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
#################################################

对应ip 地址的MySQL 数据库需进行相关初始化与设置, 可参考 Canal QuickStart

修改canal 配置文件vi /usr/local/canal/conf/canal.properties

# ...
# 可选项: tcp(默认), kafka, RocketMQ
canal.serverMode = kafka
# ...
# kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092 
canal.mq.servers = 127.0.0.1:9092
canal.mq.retries = 0
# flagMessage模式下可以调大该值, 但不要超过MQ消息体大小上限
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
# flatMessage模式下请将该值改大, 建议50-200
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
canal.mq.canalGetTimeout = 100
# 是否为flat json格式对象
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
# kafka消息投递是否使用事务
canal.mq.transaction = false

canal-adapter配置

总配置文件 application.yml

adapter定义配置部分

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: kafka # tcp kafka rocketMQ
#  canalServerHost: 127.0.0.1:11111
#  zookeeperHosts: slave1:2181
  mqServers: 127.0.0.1:9092 #or rocketmq
#  flatMessage: true
  batchSize: 500
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:

  canalAdapters:
  - instance: rds_test # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
       - name: rdb
         key: oracle1
         properties:
           jdbc.driverClassName: oracle.jdbc.OracleDriver
           jdbc.url: jdbc:oracle:thin:@192.168.0.131:1521/orcl
           jdbc.username: system
           jdbc.password: manager
    - groupId: g2
      outerAdapters:
       - name: rdb
         key: oracle2
         properties:
           jdbc.driverClassName: oracle.jdbc.OracleDriver
           jdbc.url: jdbc:oracle:thin:@192.168.0.131:1521/orcl
           jdbc.username: system
           jdbc.password: manager

说明:

一份数据可以被多个group同时消费, 多个group之间会是一个并行执行, 一个group内部是一个串行执行多个outerAdapters, 比如例子中logger和hbase
目前client adapter数据订阅的方式支持两种,直连canal server 或者 订阅kafka/RocketMQ的消息

配置 ./rdb/rds_test0.yml 和./rdb/rds_test1.yml

dataSourceKey: defaultDS
destination: rds_test
groupId: g1
outerAdapterKey: oracle1
concurrent: true
dbMapping:
  database: test
  table: c1c
  targetTable: c1c
  targetPk:
    id: id
  mapAll: true
  targetColumns:
    id:
    name:


## Mirror schema synchronize config
#dataSourceKey: defaultDS
#destination: example
#groupId: g1
#outerAdapterKey: mysql1
#concurrent: true
#dbMapping:
#  mirrorDb: true
#  database: mytest

然后分别启动server和adpder就可以了