




create database mysqltestdb;
create table `student` (`id` int(11) not null auto_increment comment '主键id',`name` varchar(10) collate utf8mb4_bin default '' comment '名字',`age` int(11) default null comment '年龄',`create_time` timestamp null default current_timestamp comment '数据创建时间',primary key (`id`)) engine=innodb auto_increment=4 default charset=utf8mb4 collate=utf8mb4_bin row_format=compact comment='学生表'
insert into mysqltestdb.student(id,name,age) values(1,“xiaomin”,20);




# 进入HBase命令root@172~# hbase shell
# 建表语句create ‘dim_hbase’, ‘cf’# 插入数据put ‘dim_hbase’,’1’,’cf:name’,’MingDeSchool’



clickhouse-client -h用户自己的ClickHouse服务IP --port 9000
create database testdb on cluster default_cluster;
CREATE TABLE testdb.student_school on cluster default_cluster (`id` Int32,`name` Nullable(String),`school_name` Nullable(String),`Sign` Int8) ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/{layer}-{shard}/testdb/ student_school, '{replica}', Sign) ORDER BY id;
--学生信息作为cdc源表CREATE TABLE `student` (`id` INT NOT NULL,`name` varchar,`age` INT,proc_time AS PROCTIME(),PRIMARY KEY (`ID`) NOT ENFORCED) WITH ('connector' = 'mysql-cdc',-- 必须为 'mysql-cdc''hostname' = 'YoursIp',-- 数据库的 IP'port' = '3306',-- 数据库的访问端口'username' = '用户名',-- 数据库访问的用户名(需要提供 SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT, RELOAD 权限)'password' = 'YoursPassword,-- 数据库访问的密码'database-name' = 'mysqltestdb',-- 需要同步的数据库'table-name' = 'student' -- 需要同步的数据表名);
--示例使用school学校信息作为维表CREATE TABLE dim_hbase (rowkey STRING,cf ROW <school_name STRING>, -- 如果有多个列簇,写法 cf Row<age INT,name String>PRIMARY KEY (rowkey) NOT ENFORCED) WITH ('connector' = 'hbase-1.4','table-name' = 'dim_hbase','zookeeper.quorum' = '用户自己的hbase服务器zookeeper地址,多个用逗号隔开');
--关联后存入clickhouse表CREATE TABLE `student_school` (stu_id INT,stu_name STRING,school_name STRING,PRIMARY KEY (`id`) NOT ENFORCED) WITH (-- 指定数据库连接参数'connector' = 'clickhouse','url' = 'clickhouse://yourIP:8123',-- 如果TCHouse-C集群未配置账号密码可以不指定--'username' = 'root',--'password' = 'root','database-name' = 'testdb','table-name' = ' student_school ','table.collapsing.field' = 'Sign');
INSERT INTOstudent_schoolSELECTstudent.id as stu_id,student.name as stu_name,dim_hbase.cf.school_nameFROMstudentJOIN dim_hbase for SYSTEM_TIME as of student.proc_timeON CAST(student.id AS STRING) = dim_hbase.rowkey;
select * from testdb.student_school;
文档反馈