Flink实现kafka到kafka、kafka到doris的精准一次消费
•
大数据
1 流程图

2 Flink来源表建模
--来源-城市topic
CREATE TABLE NJ_QL_JC_SSJC_SOURCE (
record string
) WITH (
'connector' = 'kafka',
'topic' = 'QL_JC_SSJC',
'properties.bootstrap.servers' = '172.*.*.*:9092',
'properties.group.id' = 'QL_JC_SSJC_NJ_QL_JC_SSJC_SOURCE',
'scan.startup.mode' = 'group-offsets',
'properties.isolation.level' = 'read_committed',
'properties.auto.offset.reset' = 'earliest',
'format' = 'raw'
);
--来源-中台kafka-topic
CREATE TABLE ODS_QL_JC_SSJC_SOURCE (
sscsdm string,
extract_time TIMESTAMP,
record string
) WITH (
'connector' = 'kafka',
'topic' = 'ODS_QL_JC_SSJC',
'properties.bootstrap.servers' = '172.*.*.*:21007,172.*.*.*:21007,172.*.*.*:21007',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.kerberos.service.name' = 'kafka',
'properties.kerberos.domain.name' = 'hadoop.hadoop.com',
'properties.group.id' = 'ODS_QL_JC_SSJC_SOURCE_ODS_QL_JC_SSJC_SOURCE',
'scan.startup.mode' = 'group-offsets',
'properties.auto.offset.reset' = 'earliest',
'properties.isolation.level' = 'read_committed',
'sink.semantic' = 'exactly-once',
'format' = 'json'
);
3 Flink去向表建模
--去向-中台kafka-topic CREATE TABLE KAFKA_ODS_QL_JC_SSJC_SINK ( sscsdm string, extract_time TIMESTAMP, record string ) WITH ( 'connector' = 'kafka', 'topic' = 'ODS_QL_JC_SSJC', 'properties.bootstrap.servers' = '172.*.*.*:21007,172.*.*.*:21007,172.*.*.*:21007', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.kerberos.domain.name' = 'hadoop.hadoop.com', 'format' = 'json', 'properties.transaction.timeout.ms' = '900000' ); --去向-Doris表 CREATE TABLE DORIS_ODS_QL_JC_SSJC_SINK ( sscsdm STRING, extract_time TIMESTAMP, record STRING ) WITH ( 'connector' = 'doris', 'fenodes' = '3.*.*.*:8030,3.*.*.*:8030,3.*.*.*:8030', 'table.identifier' = 'doris_d.ods_ql_jc_ssjc', 'username' = 'root', 'password' = '********', 'sink.properties.two_phase_commit' = 'true' );
4 城市Topic至中台Topic的Flinksql
insert into KAFKA_ODS_QL_JC_SSJC_SINK SELECT '320100' as sscsdm, CURRENT_TIMESTAMP as extract_time, record FROM NJ_QL_JC_SSJC_SOURCE UNION ALL SELECT '320200' as sscsdm, CURRENT_TIMESTAMP as extract_time, record FROM WX_QL_JC_SSJC_SOURCE . . . UNION ALL SELECT '320583' as sscsdm, CURRENT_TIMESTAMP as extract_time, record FROM KS_QL_JC_SSJC_SOURCE
5 中台Topic至Doris的Flinksql
insert into DORIS_ODS_QL_JC_SSJC_SINK SELECT sscsdm, CURRENT_TIMESTAMP as extract_time, record FROM ODS_QL_JC_SSJC_SOURCE
本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://net2asp.com/6e8c049a7b.html
