flink学习之旅(-)
某天正在摸鱼的小邓,突然接到任务需要1个月内掌握flink并接手前辈遗留下来的大数据计算项目,于是便有了此文。
1.flink 简单了解
有状态的数据计算、流批一体、高吞吐、低延迟、灵活、可扩展性好
发展历史:
Flink起源于一个叫作Stratosphere的项目,它是由3所地处柏林的大学和欧洲其他一些大学在2010-2014年共同进行的研究项目,由柏林理工大学的教授沃克尔·马尔科(Volker Markl)领街开发2014年4月,Stratosphere的代码被复制并捐赠给了Apache软件基金会,Flink就是在此基础上被重新设计出来的。在德语中,“flink”一词表示“快速、灵巧”项目的1ogo是一只彩色的松鼠。
2014年8月,Flink第一个版本0.6正式发布,与此同时Fink的几位核心开发者创办Data Atisans公司
2014年12月,Flink项目完成孵化从apache毕业
2015年4月,Flink发布了里程碑式的重要版本0.9.0;
2019年1月,长期对Flink投入研发的阿里巴巴,以9000万欧元的价格收购了DataArtisans 公司2019年8月,阿里巴巴将内部版本Blink开源,合并入Fink 1.9.0版本。
查看:flink官网
2.环境准备
一台安装了java环境的liunx服务器(jdk8+)
3.下载flink安装包
wget https://dlcdn.apache.org/flink/flink-1.17.2/flink-1.17.2-bin-scala_2.12.tgz
4.解压并安装
tar -zxvf flink-1.17.0-bin-scala_2.12.tgz
修改配置文件(路径为解压后的conf目录如下:)

主要调整的配置如下:
# JobManager节点地址.
jobmanager.rpc.address: 10.26.141.203
jobmanager.bind-host: 0.0.0.0
rest.address: 10.26.141.203
rest.bind-address: 0.0.0.0
# TaskManager节点地址.需要配置为当前机器名
taskmanager.bind-host: 0.0.0.0
taskmanager.host: 10.26.141.203
启动并访问:
进入bin目录下执行启动脚本:
bin/start-cluster.sh
打印StandaloneSession信息即为启动成功可以访问对应的webui界面如下:

5.提交任务jar并统计单词个数
新建maven项目命名为:FlinkLearn对应pom.xml文件如下:
4.0.0
org.example
FlinkLearn
1.0-SNAPSHOT
11
11
1.17.0
org.apache.flink
flink-streaming-java
${flink.version}
org.apache.flink
flink-clients
${flink.version}
org.apache.maven.plugins
maven-shade-plugin
3.2.4
package
shade
com.google.code.findbugs:jsr305
org.slf4j:*
log4j:*
*:*
META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA
</project
统计单词的执行方法如下:
public class SocketStreamWordCount {
public static void main(String[] args) throws Exception {
// TODO 1.创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// TODO 2.读取数据:从文件读
// TODO 2. 读取数据: socket
DataStreamSource socketDS = env.socketTextStream("10.26.141.203", 7777);
// TODO 3.处理数据: 切分、转换、分组、聚合
// TODO 3.1 切分、转换
SingleOutputStreamOperator<Tuple2> wordAndOneDS = socketDS
.flatMap(new FlatMapFunction<String, Tuple2>() {
@Override
public void flatMap(String value, Collector<Tuple2> out) throws Exception {
// 按照 空格 切分
String[] words = value.split(" ");
for (String word : words) {
// 转换成 二元组 (word,1)
Tuple2 wordsAndOne = Tuple2.of(word, 1);
// 通过 采集器 向下游发送数据
out.collect(wordsAndOne);
}
}
});
// TODO 3.2 分组
KeyedStream<Tuple2, String> wordAndOneKS = wordAndOneDS.keyBy(
new KeySelector<Tuple2, String>() {
@Override
public String getKey(Tuple2 value) throws Exception {
return value.f0;
}
}
);
// TODO 3.3 聚合
SingleOutputStreamOperator<Tuple2> sumDS = wordAndOneKS.sum(1);
// TODO 4.输出数据
sumDS.print();
// TODO 5.执行:类似 sparkstreaming最后 ssc.start()
env.execute();
}
}
打包并上传到界面:

打开socket 7777监听
![]()
并执行任务:

可以看到有一个任务在执行:

在socket中输入 hello dxy 可以再stdout 中看到 如下打印完成了一次单词统计:

本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://net2asp.com/ef778426eb.html
