Flinkjar开发 CDC 实时mysql到mysql

CDC 的全称是 Change Data Capture ,在广义的概念上,只要是能捕获数据变更的技术,我们都可以称之为 CDC 。目前通常描述的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。

mysqlcdc需要mysql开启binlog,找到my.cnf,在[mysqld]中加入如下信息

[mysqld]

server-id=1

log-bin=mysql-bin

binlog-format=row

重启数据库。

2.创建springboot项目,pom添加依赖

  UTF-8

    1.8
    1.14.2
    2.11
    1.7.30




  
    org.apache.flink
    flink-table-api-java
    ${flink.version}
  

  
    org.apache.flink
    flink-table-planner_2.12
    1.14.2

  
  
    org.apache.flink
    flink-table-api-java-bridge_${scala.binary.version}
    ${flink.version}
    <!--provided-->
  

  
    org.apache.flink
    flink-java
    1.14.2
    provided
  
  
    org.apache.flink
    flink-streaming-java_2.11
    1.14.2
    provided
  

  
  
    org.apache.flink
    flink-connector-jdbc_2.12
    1.14.2
  
  
    com.ververica
    flink-sql-connector-mysql-cdc
    2.2.0
  

  
    mysql
    mysql-connector-java
    8.0.17
  
 
  
  
    org.apache.flink
    flink-clients_2.11
    1.14.2
    test
  

  org.apache.flink
  flink-dist
  200
  system
  ${basedir}/src/main/lib/flink-dist_2.11-1.14.4.jar



  
    
      org.apache.maven.plugins
      maven-jar-plugin
      3.2.0
      
        
        
          
            org.example.FlinkMysqlToMysql
          
        
      
    
      
          org.apache.maven.plugins
          maven-compiler-plugin
          
              8
              8
          
      
  

Flink cdc实现mysql到mysql代码

import org.apache.flink.api.common.restartstrategy.RestartStrategies;

import org.apache.flink.streaming.api.CheckpointingMode;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkMysqlToMysql {

public static void main(String[] args) {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));

env.enableCheckpointing(5000);

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// 创建Table环境

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

// 注册源表和目标表

tEnv.executeSql(“create table sourceTable(id bigint,organization_code VARCHAR, organization_name VARCHAR, parent_code VARCHAR, parent_name VARCHAR,PRIMARY KEY (id) NOT ENFORCED ) WITH (\n” +

//源表连接器一定得是mysql-cdc

“‘connector’ = ‘mysql-cdc’,” +

“‘hostname’ = ‘localhost’,\n” +

” ‘port’ = ‘3306’,\n” +

” ‘database-name’ = ‘quarant_db’,\n” +

” ‘table-name’ = ‘organization_info’,\n” +

” ‘username’ = ‘root’,\n” +

” ‘password’ = ‘admin’\n” +

“)”);

// Table result = tEnv.sqlQuery(“SELECT id, name,card_num,phone,address FROM sourceTable”);

// tEnv.registerTable(“sourceTable”,result);

tEnv.executeSql(“create table targetTable(id bigint,organization_code VARCHAR, organization_name VARCHAR, parent_code VARCHAR, parent_name VARCHAR,PRIMARY KEY (id) NOT ENFORCED ) WITH (\n” +

//目标表连接器是jdbc

“‘connector’ = ‘jdbc’,” +

“‘url’ = ‘jdbc:mysql://localhost:3306/testdb?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false’,\n” +

” ‘table-name’ = ‘organization_info’,\n” +

” ‘username’ = ‘root’,\n” +

” ‘driver’ = ‘com.mysql.cj.jdbc.Driver’,\n” +

” ‘password’ = ‘admin’\n” +

“)”);

// 执行CDC过程

String query = “INSERT INTO targetTable SELECT * FROM sourceTable”;

tEnv.executeSql(query).print();

}

}

运行Main方法

Flinkjar开发 CDC 实时mysql到mysql

Flink会同步源表数据到目标表,后续源表的增删改都会实时同步至目标表中。

3.将程序打包成flink jar,修改pom

    
      
      
        maven-compiler-plugin
        
          1.8
          1.8
          UTF-8
        
      
      <!--  spring boot 项目打包
       
         org.springframework.boot
         spring-boot-maven-plugin
       -->
      
      
        org.apache.maven.plugins
        maven-assembly-plugin
        3.2.0
        
          
            
               org.example.FlinkMysqlToMysql 
            
          
          
          
            jar-with-dependencies
          
        
        
          
            make-assembly
            package
            
              single
            
          
        
      
    
    
  

点击idea package 进行打包

Flinkjar开发 CDC 实时mysql到mysql 

 选择包含依赖的jar包放到flink上运行。

Flinkjar开发 CDC 实时mysql到mysql

本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://net2asp.com/1ceffe2ad6.html