flink学习之旅(-)

  某天正在摸鱼的小邓,突然接到任务需要1个月内掌握flink并接手前辈遗留下来的大数据计算项目,于是便有了此文。

1.flink 简单了解

     有状态的数据计算、流批一体、高吞吐、低延迟、灵活、可扩展性好

     发展历史: 

   Flink起源于一个叫作Stratosphere的项目,它是由3所地处柏林的大学和欧洲其他一些大学在2010-2014年共同进行的研究项目,由柏林理工大学的教授沃克尔·马尔科(Volker Markl)领街开发2014年4月,Stratosphere的代码被复制并捐赠给了Apache软件基金会,Flink就是在此基础上被重新设计出来的。在德语中,“flink”一词表示“快速、灵巧”项目的1ogo是一只彩色的松鼠。

2014年8月,Flink第一个版本0.6正式发布,与此同时Fink的几位核心开发者创办Data Atisans公司

2014年12月,Flink项目完成孵化从apache毕业

2015年4月,Flink发布了里程碑式的重要版本0.9.0;

2019年1月,长期对Flink投入研发的阿里巴巴,以9000万欧元的价格收购了DataArtisans 公司2019年8月,阿里巴巴将内部版本Blink开源,合并入Fink 1.9.0版本。

查看:flink官网

2.环境准备 

   一台安装了java环境的liunx服务器(jdk8+)

3.下载flink安装包

wget https://dlcdn.apache.org/flink/flink-1.17.2/flink-1.17.2-bin-scala_2.12.tgz

 4.解压并安装

tar -zxvf flink-1.17.0-bin-scala_2.12.tgz

修改配置文件(路径为解压后的conf目录如下:)

flink学习之旅(-)

主要调整的配置如下:

# JobManager节点地址.

jobmanager.rpc.address: 10.26.141.203

jobmanager.bind-host: 0.0.0.0

rest.address: 10.26.141.203

rest.bind-address: 0.0.0.0

# TaskManager节点地址.需要配置为当前机器名

taskmanager.bind-host: 0.0.0.0

taskmanager.host: 10.26.141.203

启动并访问:

进入bin目录下执行启动脚本:

 bin/start-cluster.sh

打印StandaloneSession信息即为启动成功可以访问对应的webui界面如下:

flink学习之旅(-)

5.提交任务jar并统计单词个数 

 新建maven项目命名为:FlinkLearn对应pom.xml文件如下:


  4.0.0

  org.example
  FlinkLearn
  1.0-SNAPSHOT

  
    11
    11
    1.17.0
  
  
    
      org.apache.flink
      flink-streaming-java
      ${flink.version}
    

    
      org.apache.flink
      flink-clients
      ${flink.version}
    
  

  
    
      
        org.apache.maven.plugins
        maven-shade-plugin
        3.2.4
        
          
            package
            
              shade
            
            
              
                
                  com.google.code.findbugs:jsr305
                  org.slf4j:*
                  log4j:*
                
              
              
                
                  
                  *:*
                  
                    META-INF/*.SF
                    META-INF/*.DSA
                    META-INF/*.RSA
                  
                
              
              
                
                
              
            
          
        
      
    
  

</project

 统计单词的执行方法如下:

public class SocketStreamWordCount {
    public static void main(String[] args) throws Exception {
        // TODO 1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // TODO 2.读取数据:从文件读
      // TODO 2. 读取数据: socket
      DataStreamSource socketDS = env.socketTextStream("10.26.141.203", 7777);

        // TODO 3.处理数据: 切分、转换、分组、聚合
        // TODO 3.1 切分、转换
        SingleOutputStreamOperator<Tuple2> wordAndOneDS = socketDS
                .flatMap(new FlatMapFunction<String, Tuple2>() {
                    @Override
                    public void flatMap(String value, Collector<Tuple2> out) throws Exception {
                        // 按照 空格 切分
                        String[] words = value.split(" ");
                        for (String word : words) {
                            // 转换成 二元组 (word,1)
                            Tuple2 wordsAndOne = Tuple2.of(word, 1);
                            // 通过 采集器 向下游发送数据
                            out.collect(wordsAndOne);
                        }
                    }
                });
        // TODO 3.2 分组
        KeyedStream<Tuple2, String> wordAndOneKS = wordAndOneDS.keyBy(
                new KeySelector<Tuple2, String>() {
                    @Override
                    public String getKey(Tuple2 value) throws Exception {
                        return value.f0;
                    }
                }
        );
        // TODO 3.3 聚合
        SingleOutputStreamOperator<Tuple2> sumDS = wordAndOneKS.sum(1);

        // TODO 4.输出数据
        sumDS.print();

        // TODO 5.执行:类似 sparkstreaming最后 ssc.start()
        env.execute();
    }
}

打包并上传到界面:

flink学习之旅(-)

打开socket 7777监听 

flink学习之旅(-)

并执行任务:

flink学习之旅(-)

可以看到有一个任务在执行:

flink学习之旅(-)

在socket中输入 hello dxy 可以再stdout 中看到 如下打印完成了一次单词统计:

flink学习之旅(-)

本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://net2asp.com/ef778426eb.html