实战Flink Java api消费kafka实时数据落盘HDFS

文章目录

  • 1 需求分析
  • 2 实验过程
    • 2.1 启动服务程序
    • 2.2 启动kafka生产
  • 3 Java API 开发
    • 3.1 依赖
    • 3.2 代码部分
  • 4 实验验证
    • STEP1
    • STEP2
    • STEP3
  • 5 时间窗口

1 需求分析

在Java api中,使用flink本地模式,消费kafka主题,并直接将数据存入hdfs中。

flink版本1.13

kafka版本0.8

hadoop版本3.1.4

2 实验过程

2.1 启动服务程序

为了完成 Flink 从 Kafka 消费数据并实时写入 HDFS 的需求,通常需要启动以下组件:

[root@hadoop10 ~]# jps
3073 SecondaryNameNode
2851 DataNode
2708 NameNode
12854 Jps
1975 StandaloneSessionClusterEntrypoint
2391 QuorumPeerMain
2265 TaskManagerRunner
9882 ConsoleProducer
9035 Kafka
3517 NodeManager
3375 ResourceManager

确保 Zookeeper 在运行,因为 Flink 的 Kafka Consumer 需要依赖 Zookeeper。

确保 Kafka Server 在运行,因为 Flink 的 Kafka Consumer 需要连接到 Kafka Broker。

启动 Flink 的 JobManager 和 TaskManager,这是执行 Flink 任务的核心组件。

确保这些组件都在运行,以便 Flink 作业能够正常消费 Kafka 中的数据并将其写入 HDFS。

  • 具体的启动命令在此不再赘述。

2.2 启动kafka生产

  • 当前kafka没有在守护进程后台运行;
  • 创建主题,启动该主题的生产者,在kafka的bin目录下执行;
  • 此时可以生产数据,从该窗口键入任意数据进行发送。
kafka-topics.sh --zookeeper hadoop10:2181 --create --topic topic1 --partitions 1 --replication-factor 1

kafka-console-producer.sh --broker-list hadoop10:9092 --topic topic1

在这里插入图片描述

3 Java API 开发

3.1 依赖

此为项目的所有依赖,包括flink、spark、hbase、ck等,实际本需求无需全部依赖,均可在阿里云或者maven开源镜像站下载。



    4.0.0

    org.example
    flink-test
    1.0-SNAPSHOT

    
        1.13.6
        2.4.0
    

    
        
            org.apache.flink
            flink-streaming-java_2.11
            ${flink.version}
           
        

        
            org.apache.flink
            flink-java
            ${flink.version}
        

        
            org.apache.flink
            flink-clients_2.11
            ${flink.version}
        

        
            org.apache.flink
            flink-table-api-java-bridge_2.11
            ${flink.version}
        

        
            org.apache.flink
            flink-table-planner-blink_2.11
            ${flink.version}
        

        

        
            org.apache.flink
            flink-shaded-hadoop-2-uber
            2.7.5-10.0
        

        
            log4j
            log4j
            1.2.17
        

        
            org.projectlombok
            lombok
            1.18.24
        

        
            org.apache.flink
            flink-connector-kafka_2.11
            ${flink.version}
        

        
            org.apache.flink
            flink-connector-jdbc_2.11
            ${flink.version}
        

        
            mysql
            mysql-connector-java
            5.1.38
        

       
            org.apache.bahir
            flink-connector-redis_2.11
            1.1.0
        

        
            org.apache.hbase
            hbase-server
            ${hbase.version}
            
                
                    guava
                    com.google.guava
                
                
                    log4j
                    log4j
                
            
        
        
            org.apache.hbase
            hbase-common
            ${hbase.version}
            
                
                    guava
                    com.google.guava
                
            
        
        
            org.apache.commons
            commons-pool2
            2.4.2
        

        
            com.alibaba
            fastjson
            2.0.32
        

        
            org.apache.flink
            flink-connector-kafka_2.11
            ${flink.version}
        


        
            org.apache.flink
            flink-csv
            ${flink.version}
        

        
            org.apache.flink
            flink-connector-jdbc_2.11
            ${flink.version}
        

        
            org.apache.flink
            flink-json
            ${flink.version}
        

        
            org.apache.flink
            flink-connector-hbase-2.2_2.11
            ${flink.version}
        

        
            org.apache.flink
            flink-cep_2.11
            ${flink.version}
        

        
            cn.hutool
            hutool-all
            5.8.20
        
    

    
        
            
                org.apache.maven.wagon
                wagon-ssh
                2.8
            
        

        
            
                org.codehaus.mojo
                wagon-maven-plugin
                1.0
                
                    
                    target/${project.build.finalName}.jar
                    
                    scp://root:root@hadoop10:/opt/app
                
            
        

    





  • 依赖参考

    在这里插入图片描述

3.2 代码部分

  • 请注意kafka和hdfs的部分需要配置服务器地址,域名映射。
  • 此代码的功能是消费topic1主题,将数据直接写入hdfs中。
import org.apache.flink.api.common.serialization.SimpleStringEncoder;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.core.fs.Path;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;public class Test9_kafka {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        Properties properties = new Properties();        properties.setProperty("bootstrap.servers", "hadoop10:9092");        properties.setProperty("group.id", "test");        // 使用FlinkKafkaConsumer作为数据源        DataStream ds1 = env.addSource(new FlinkKafkaConsumer<>("topic1", new SimpleStringSchema(), properties));        String outputPath = "hdfs://hadoop10:8020/out240102";        // 使用StreamingFileSink将数据写入HDFS        StreamingFileSink sink = StreamingFileSink                .forRowFormat(new Path(outputPath), new SimpleStringEncoder("UTF-8"))                .build();        // 添加Sink,将Kafka数据直接写入HDFS        ds1.addSink(sink);        ds1.print();        env.execute("Flink Kafka HDFS");    }}

4 实验验证

STEP1

运行idea代码,程序开始执行,控制台除了日志外为空。下图是已经接收到生产者的数据后,消费在控制台的截图。

在这里插入图片描述

STEP2

启动生产者,将数据写入,数据无格式限制,随意填写。此时发送的数据,是可以在STEP1中的控制台中看到屏幕打印结果的。

在这里插入图片描述

STEP3

在HDFS中查看对应的目录,可以看到数据已经写入完成。

我这里生成了多个inprogress文件,是因为我测试了多次,断码运行了多次。ide打印在屏幕后,到hdfs落盘写入,中间有一定时间,需要等待,在HDFS中刷新数据,可以看到文件大小从0到被写入数据的过程。

在这里插入图片描述

5 时间窗口

  • 使用另一种思路实现,以时间窗口的形式,将数据实时写入HDFS,实验方法同上。截图为发送数据消费,并且在HDFS中查看到数据。

    在这里插入图片描述

在这里插入图片描述

package day2;

import day2.CustomProcessFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

public class Test9_kafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "hadoop10:9092");
        properties.setProperty("group.id", "test");

        // 使用FlinkKafkaConsumer作为数据源
        DataStream ds1 = env.addSource(new FlinkKafkaConsumer<>("topic1", new SimpleStringSchema(), properties));

        String outputPath = "hdfs://hadoop10:8020/out240102";

        // 使用StreamingFileSink将数据写入HDFS
        StreamingFileSink sink = StreamingFileSink
                .forRowFormat(new Path(outputPath), new SimpleStringEncoder("UTF-8"))
                .build();

        // 在一个时间窗口内将数据写入HDFS
        ds1.process(new CustomProcessFunction())  // 使用自定义 ProcessFunction
                .addSink(sink);

        // 执行程序
        env.execute("Flink Kafka HDFS");
    }
}
package day2;

import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

public class CustomProcessFunction extends ProcessFunction {
    @Override
    public void processElement(String value, Context ctx, Collector out) throws Exception {
        // 在这里可以添加具体的逻辑,例如将数据写入HDFS
        System.out.println(value);  // 打印结果到屏幕
        out.collect(value);
    }
}

本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://net2asp.com/762a87802a.html