博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
使用Goldengate同步异构数据库Kafka中间件之一
阅读量:6093 次
发布时间:2019-06-20

本文共 5226 字,大约阅读时间需要 17 分钟。

收到业务部门需求,要求将Oracle数据库某表同步至Mysql数据库中,异构环境我们用kafka来实现,下面是具体的一些配置;

由于业务需要,现申请使用架构组数据同步服务同步以下数据到管家MySQL数据库

代理商用户数据:

a. 数据源:SSP库 AAA.system_user
b. 数据目标:MySQL DLS库 DLS_SYSTEM_USER
c. 同步逻辑: 无
d. 同步数据及对应关系:参见附件
e. 是否涉及敏感信息:否

准备工作;由于目标库Mysql库该表已经存在,我们将该表备份并且获取建表语句;

--获取建表语句
mysql> show create table dls_system_user;

--导出单个数据表结构和数据

mysqldump -uroot -p dls DLS_SYSTEM_USER > DLS_SYSTEM_USER_180622.sql

--重命名表

ALTER TABLE DLS_SYSTEM_USERRENAME DLS_SYSTEM_USER_BAK0622;

--新建空表

CREATE TABLE dls_system_user (
ID varchar(100) NOT NULL,
ACCOUNT_EXPIRED int(1) NOT NULL DEFAULT '0',
ACCOUNT_LOCKED int(1) NOT NULL DEFAULT '0',
ENABLED int(1) NOT NULL DEFAULT '0',
ORG_NO varchar(255) NOT NULL DEFAULT '',
USER_CODE varchar(100) NOT NULL DEFAULT '',
REMARK_NAME varchar(255) NOT NULL DEFAULT '',
IS_CREATE_PERSON varchar(255) NOT NULL DEFAULT '',
STATUS int(10) NOT NULL DEFAULT '0',
PRIMARY KEY (ID),
KEY IDX_DLS_SYSTEM_USER_USER_CODE (USER_CODE)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

Oracle源端GoldenGate配置:

1、为要同步的表添加附加日志
dblogin USERID ggs@ntestdb, password ggs
add trandata AAA.system_user

2、 添加抽取进程

add extract ext_kafb, tranlog, begin now
add EXTTRAIL ./dirdat/a2, extract ext_kafb,MEGABYTES 200

edit params EXT_KAFB

extract EXT_KAFB

USERID ggs@ntestdb, password ggs
LOGALLSUPCOLS
exttrail ./dirdat/a2,FORMAT RELEASE 11.2
table AAA.system_user;

3、添加投递进程:

add extract pmp_kafb, exttrailsource ./dirdat/a2
add rmttrail ./dirdat/b2,EXTRACT pmp_kafb,MEGABYTES 200

eidt params pmp_kafb

EXTRACT pmp_kafb

USERID ggs@ntestdb, password ggs
PASSTHRU
RMTHOST 172.16.xxx.5, MGRPORT 9178 --kafka服务器地址
RMTTRAIL ./dirdat/b2,format release 11.2
table AAA.system_user;

----初始化文件存放在 /ggs/ggs12/dirprm/

4.添加初始化进程

ADD EXTRACT ek_20, sourceistable ---源端添加

edit params ek_20

EXTRACT ek_20

USERID ggs@ntestdb, password ggs
RMTHOST 172.16.154.5, MGRPORT 9178
RMTFILE ./dirdat/lb,maxfiles 999, megabytes 500
table AAA.system_user;

5.生成def文件:

GGSCI> edit param defgen_n9

USERID ggs@ntestdb, password ggs

defsfile /goldengate/ggskafka/dirdef/defgen_n9.def,format release 11.2
table AAA.system_user;

在OGG_HOME下执行如下命令生成def文件

defgen paramfile /goldengate/ggskafka/dirprm/defgen_n9.prm

将生成的def文件传到kafka服务器$OGG_HOME/dirdef下

---目标端mysql 数据库地址172.16.xxx.148,需要新建kafka用户

grant select,insert,update,delete,create,drop on DLS.* to 'kafka'@'%' identified by 'jiubugaosuni';

--kafka服务器GoldenGate操作

1、添加初始化进程:---dirprm
GGSCI> ADD replicat rn_3,specialrun

EDIT PARAMS rn_3

SPECIALRUN

end runtime
setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
targetdb libfile libggjava.so set property=./dirprm/kafkat_n3.props
SOURCEDEFS ./dirdef/defgen_n9.def
EXTFILE ./dirdat/lb
reportcount every 1 minutes, rate
grouptransops 10000
MAP AAA.system_user, TARGET DLS.DLS_SYSTEM_USER;

2、添加复制进程:

GGSCI>add replicat RN_KF3,exttrail ./dirdat/b2
GGSCI>edit params RN_KF3

REPLICAT RN_KF3

setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
HANDLECOLLISIONS
targetdb libfile libggjava.so set property=./dirprm/kafkat_n3.props
SOURCEDEFS ./dirdef/defgen_n9.def
reportcount every 1 minutes, rate
grouptransops 10000
MAP AAA.system_user, TARGET DLS.DLS_SYSTEM_USER;

3、参数配置:

cd /home/app/ogg/ggs12/dirprm

custom_kafka_producer.properties 文件内容如下:

[app@test-datamanager dirprm]$ more custom_kafka_producer.properties

bootstrap.servers=172.16.xxx.5:9092,172.16.xxx.7:9092
acks=1
reconnect.backoff.ms=1000

value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

100KB per partition

batch.size=16384

linger.ms=0

---vi添加对应文件 kafkat_n3.props

kafka.props文件内容如下:
gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
#The following resolves the topic name using the short table name
gg.handler.kafkahandler.topicMappingTemplate= DLS.DLS_MERCHANT_STATUS
#gg.handler.kafkahandler.format=avro_op
gg.handler.kafkahandler.format =json --指定文件类型
gg.handler.kafkahandler.format.insertOpKey=I
gg.handler.kafkahandler.format.updateOpKey=U
gg.handler.kafkahandler.format.deleteOpKey=D
gg.handler.kafkahandler.format.truncateOpKey=T
gg.handler.kafkahandler.format.prettyPrint=false
gg.handler.kafkahandler.format.jsonDelimiter=CDATA[]
gg.handler.kafkahandler.format.includePrimaryKeys=true
gg.handler.kafkahandler.SchemaTopicName= DLS.DLS_MERCHANT_STATUS --指定topic名称
gg.handler.kafkahandler.BlockingSend =false
gg.handler.kafkahandler.includeTokens=false
gg.handler.kafkahandler.mode=op
goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
gg.log=log4j
gg.log.level=INFO
gg.report.time=30sec
#Sample gg.classpath for Apache Kafka
gg.classpath=dirprm/:/opt/cloudera/parcels/KAFKA/lib/kafka/libs/ --patch路径
#Sample gg.classpath for HDP
#gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

至此我们配置算是基本完成,现在我们来开启进程,初始化数据;

1、启动源端抓取进程
GGSCI> start EXT_KAFB
2、启动源端投递进程
GGSCI> start pmp_kafb
3、启动源端初始化进程
GGSCI> start ek_20
4、启动目标端初始化进程
GGSCI> start rn_3
在$OGG_HOME下执行如下命令:

./replicat paramfile ./dirprm/rn_3.prm reportfile ./dirrpt/rn_3.rpt -p INITIALDATALOAD

5、启动目标端恢复进程
GGSCI> start RN_KF3

转载地址:http://mrrwa.baihongyu.com/

你可能感兴趣的文章
ijkplayer实现IMediaDataSource
查看>>
Docker - 创建支持SSH服务的容器镜像
查看>>
[TC13761]Mutalisk
查看>>
三级菜单
查看>>
Data Wrangling文摘:Non-tidy-data
查看>>
加解密算法、消息摘要、消息认证技术、数字签名与公钥证书
查看>>
while()
查看>>
常用限制input的方法
查看>>
Ext Js简单事件处理和对象作用域
查看>>
IIS7下使用urlrewriter.dll配置
查看>>
12.通过微信小程序端访问企查查(采集工商信息)
查看>>
WinXp 开机登录密码
查看>>
POJ 1001 Exponentiation
查看>>
HDU 4377 Sub Sequence[串构造]
查看>>
云时代架构阅读笔记之四
查看>>
WEB请求处理一:浏览器请求发起处理
查看>>
Lua学习笔记(8): 元表
查看>>
PHP经典算法题
查看>>
LeetCode 404 Sum of Left Leaves
查看>>
LeetCode 83 Remove Duplicates from Sorted List
查看>>