产品动态
#下载jdk8[root@VM-10-18-tencentos ~]# wget --no-check-certificate --header "Cookie: oraclelicense=accept-securebackup-cookie" \\https://download.oracle.com/java/18/archive/jdk-18.0.2_linux-x64_bin.tar.gz#解压安装包[root@VM-10-18-tencentos ~]# tar -zxvf jdk-18.0.2_linux-x64_bin.tar.gz -C /usr/local/#重命名目录[root@VM-10-18-tencentos ~]# sudo mv /usr/local/jdk-18.0.2 /usr/local/jdk18
[root@VM-10-18-tencentos ~]# vim /etc/profile
export JAVA_HOME=/usr/local/jdk18export PATH=$JAVA_HOME/bin:$PATHexport CLASSPATH=.:$JAVA_HOME/lib
[root@VM-10-18-tencentos ~]# source /etc/profile
[root@VM-10-18-tencentos ~]# java -versionjava version "18.0.2" 2022-07-19Java(TM) SE Runtime Environment (build 18.0.2+9-61)Java HotSpot(TM) 64-Bit Server VM (build 18.0.2+9-61, mixed mode, sharing)
[root@VM-10-18-tencentos ~]# wget https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
#创建kafka的安装目录[root@VM-10-18-tencentos ~]# mkdir -p /data/zookeeper#解压kafka安装包[root@VM-10-18-tencentos ~]# tar -zxvf kafka_2.13-3.7.2.tgz -C /data/#重命名解压后的目录[root@VM-10-18-tencentos ~]# cd /data/[root@VM-10-18-tencentos data]# mv kafka_2.13-3.7.2 kafka_dev
root@VM-10-18-tencentos data]# cd /data/kafka_dev/config[root@VM-10-18-tencentos config]# vim /data/kafka_dev/config/zookeeper.properties
dataDir =/data/zookeeper
[root@VM-10-18-tencentos config]# mkdir -p /data/kafka_dev/logs/
[root@VM-10-18-tencentos config]# vim connect-distributed.properties
listeners=PLAINTEXT://部署kafka所在机器的ip:9092 #若该项所在行首有#号,需将#删去并修改log.dirs=/data/kafka_dev/logs/connect.logzookeeper.connect=部署kafka所在机器的ip:2181
[root@VM-10-18-tencentos config]# vim connect-distributed.properties
group.id=connect-clusterbootstrap.servers=部署kafka所在机器的ip:9092# 定义插件路径plugin.path=/data/kafka_connect/plugins
[root@VM-10-18-tencentos config]# cd /data/kafka_dev[root@VM-10-18-tencentos kafka_dev]# nohup bin/zookeeper-server-start.sh config/zookeeper.properties &> zookeeper.log &
[root@VM-10-18-tencentos kafka_dev]# jobs
[root@VM-10-18-tencentos kafka_dev]# nohup bin/kafka-server-start.sh config/server.properties &> kafka.log &
[root@VM-10-18-tencentos kafka_dev]# jobs

show wal_level;

CREATE PUBLICATION pg_demo_publication FOR ALL TABLES;
CREATE PUBLICATION pg_demo_publication FOR table_name1, table_name2;
SELECT * FROM pg_publication_tables WHERE pubname = ‘pg_demo_publication’;
SELECT * FROM pg_publication WHERE pubname = ‘pg_demo_publication’;
CREATE EXTENSION tencentdb_failover_slot;
SELECT pg_create_logical_failover_slot(‘failover_alot_name’,’pgoutput’);
SELECT * FROM pg_failover_slots;
[root@VM-10-18-tencentos ~]# mkdir -p /data/kafka_connect/plugins[root@VM-10-18-tencentos ~]# wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/2.7.3.Final/debezium-connector-postgres-2.7.3.Final-plugin.tar.gz[root@VM-10-18-tencentos ~]# tar -zxvf debezium-connector-postgres-2.7.3.Final-plugin.tar.gz -C /data/kafka_connect/plugins
[root@VM-10-18-tencentos ~]# cd /data/kafka_dev[root@VM-10-18-tencentos kafka_dev]# nohup bin/connect-distributed.sh config/connect-distributed.properties &> connect.log &
curl -XPOST "http://部署kafka所在云服务器的ip:8083/connectors/" \\-H 'Content-Type: application/json' \\-d '{"name": "test_connector","config": {"connector.class": "io.debezium.connector.postgresql.PostgresConnector","database.hostname": "PostgreSQL数据库所在机器的IP","database.port": "5432","database.user": "有发布权限的PostgreSQL用户名","database.password": "发布用户的密码","database.dbname": "创建publication的数据库","database.server.name": "pg_demo","slot.name": "pg_demo_failover_slot","topic.prefix": "pg_demo","publication.name": "pg_demo_publication","publication.autocreate.mode": "all_tables","plugin.name": "pgoutput"}}'
参数 | 说明 |
name | 连接器的名称,必须唯一。 |
database.hostname | 云数据库的 IP 地址。建议您填写内网 IP。 |
database.user | 用于连接云数据库的用户名。 该用户需要有足够权限完成发布。推荐使用类型为 pg_tecenten_superuser 的用户。 |
database.password | 用户的密码。 |
database.dbname | 创建 publication 的数据库名称。 |
slot.name | 逻辑复制槽的名称。 请填写之前创建的逻辑复制槽名称。 |
publication.name | 逻辑发布的名称。 请填写之前创建的逻辑发布名称。 |
SELECT * FROM pg_publication;
SELECT * FROM pg_failover_slots;
curl "http://部署kafka所在机器的ip:8083/connectors/pg_demo_connector/status"
[root@VM-10-18-tencentos kafka_dev]# su – postgres[postgres@VM-10-18-tencentos ~]$ /usr/local/pgsql/bin/psql -h *.*.*.* -p 5432 -U dbadmin -d postgresPassword for user dbadmin:psql (16.4, server 16.8)Type "help" for help.postgres=>
postgres=> CREATE TABLE linktest (id SERIAL PRIMARY KEY);CREATE TABLEpostgres=> insert into linktest values(1);INSERT 0 1
文档反馈