Flinkjar开发 CDC 实时mysql到mysql
CDC 的全称是 Change Data Capture ,在广义的概念上,只要是能捕获数据变更的技术,我们都可以称之为 CDC 。目前通常描述的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。
mysqlcdc需要mysql开启binlog,找到my.cnf,在[mysqld]中加入如下信息
[mysqld]
server-id=1
log-bin=mysql-bin
binlog-format=row
重启数据库。
2.创建springboot项目,pom添加依赖
UTF-8 1.8 1.14.2 2.11 1.7.30 org.apache.flink flink-table-api-java ${flink.version} org.apache.flink flink-table-planner_2.12 1.14.2 org.apache.flink flink-table-api-java-bridge_${scala.binary.version} ${flink.version} <!--provided--> org.apache.flink flink-java 1.14.2 provided org.apache.flink flink-streaming-java_2.11 1.14.2 provided org.apache.flink flink-connector-jdbc_2.12 1.14.2 com.ververica flink-sql-connector-mysql-cdc 2.2.0 mysql mysql-connector-java 8.0.17 org.apache.flink flink-clients_2.11 1.14.2 testorg.apache.flink flink-dist 200 system ${basedir}/src/main/lib/flink-dist_2.11-1.14.4.jar org.apache.maven.plugins maven-jar-plugin 3.2.0 org.example.FlinkMysqlToMysql org.apache.maven.plugins maven-compiler-plugin 8 8
Flink cdc实现mysql到mysql代码
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkMysqlToMysql {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 创建Table环境
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
// 注册源表和目标表
tEnv.executeSql(“create table sourceTable(id bigint,organization_code VARCHAR, organization_name VARCHAR, parent_code VARCHAR, parent_name VARCHAR,PRIMARY KEY (id) NOT ENFORCED ) WITH (\n” +
//源表连接器一定得是mysql-cdc
“‘connector’ = ‘mysql-cdc’,” +
“‘hostname’ = ‘localhost’,\n” +
” ‘port’ = ‘3306’,\n” +
” ‘database-name’ = ‘quarant_db’,\n” +
” ‘table-name’ = ‘organization_info’,\n” +
” ‘username’ = ‘root’,\n” +
” ‘password’ = ‘admin’\n” +
“)”);
// Table result = tEnv.sqlQuery(“SELECT id, name,card_num,phone,address FROM sourceTable”);
// tEnv.registerTable(“sourceTable”,result);
tEnv.executeSql(“create table targetTable(id bigint,organization_code VARCHAR, organization_name VARCHAR, parent_code VARCHAR, parent_name VARCHAR,PRIMARY KEY (id) NOT ENFORCED ) WITH (\n” +
//目标表连接器是jdbc
“‘connector’ = ‘jdbc’,” +
“‘url’ = ‘jdbc:mysql://localhost:3306/testdb?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false’,\n” +
” ‘table-name’ = ‘organization_info’,\n” +
” ‘username’ = ‘root’,\n” +
” ‘driver’ = ‘com.mysql.cj.jdbc.Driver’,\n” +
” ‘password’ = ‘admin’\n” +
“)”);
// 执行CDC过程
String query = “INSERT INTO targetTable SELECT * FROM sourceTable”;
tEnv.executeSql(query).print();
}
}
运行Main方法

Flink会同步源表数据到目标表,后续源表的增删改都会实时同步至目标表中。
3.将程序打包成flink jar,修改pom
maven-compiler-plugin
1.8
1.8
UTF-8
<!-- spring boot 项目打包
org.springframework.boot
spring-boot-maven-plugin
-->
org.apache.maven.plugins
maven-assembly-plugin
3.2.0
org.example.FlinkMysqlToMysql
jar-with-dependencies
make-assembly
package
single
点击idea package 进行打包
选择包含依赖的jar包放到flink上运行。

本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://net2asp.com/1ceffe2ad6.html
