Flink之DataStream API 概述
•
大数据
DataStream API 概述
- 前言
- 一、DataStream API 应用实例
-
-
- DataStream程序主要包含3部分:
- 1、StreamExecutionEnvironment初始化:该部分主要创建和初始化StreamExecutionEnvironment,提供通过DataStream API构建Flink作业需要的执行环境,包括设定ExecutionConfig、CheckpointConfig等配置信息以及StateBackend和TimeCharacteristic等变量
- 2、业务逻辑代码转换:env中提供了创建DataStream的方法,通过env.readTextFile()方法读取文本数据并构建DataStreamSource数据集,之后所有的DataStream操作都会以DataStreamSource为头节点。同时DataStreamAPI 提供了各种转换操作,如map、reduce、keyBy等算子,用于构建完整的计算逻辑。
- 3、程序执行env.execute(),在execute()方法中基于DataStream之间的转换生成StreamGraph,并将StreamGraph转成成JobGraph,最终将JobGraph提交到集群
-
- 二、DataStream的主要成员变量
-
-
- 2.1 DataStream数据结构包含两个成员变量StreamExecutionEnvironment和Transformation。通过transformation生成当前的DataStream。
- 2.2 通过DataStream 构建Flink作业时,env会将DataStream之间的转换操作存储到env中List<Transformation>,transformations集合,然后基于转换操作去构建Pipeline,用于描述作业的总体计算逻辑,流作业具体实现类是StreamGraph,批作业对应的Pipeline实现类为Plan。
- 2.3 具体转换图
- 2.4 StreamFlatMap
-
- 总结
前言
flink版本号1.18.1
一、DataStream API 应用实例
public class WordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource DataStreamSource = env.readTextFile("word.text");
SingleOutputStreamOperator<Tuple2> wordAndOne = DataStreamSource.flatMap(new FlatMapFunction<String, Tuple2>() {
@Override
public void flatMap(String value, Collector<Tuple2> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1L));
}
}
});
wordAndOne.keyBy(data -> data.f0).sum(1).print();
env.execute();
}
}
DataStream程序主要包含3部分:
1、StreamExecutionEnvironment初始化:该部分主要创建和初始化StreamExecutionEnvironment,提供通过DataStream API构建Flink作业需要的执行环境,包括设定ExecutionConfig、CheckpointConfig等配置信息以及StateBackend和TimeCharacteristic等变量
2、业务逻辑代码转换:env中提供了创建DataStream的方法,通过env.readTextFile()方法读取文本数据并构建DataStreamSource数据集,之后所有的DataStream操作都会以DataStreamSource为头节点。同时DataStreamAPI 提供了各种转换操作,如map、reduce、keyBy等算子,用于构建完整的计算逻辑。
3、程序执行env.execute(),在execute()方法中基于DataStream之间的转换生成StreamGraph,并将StreamGraph转成成JobGraph,最终将JobGraph提交到集群
二、DataStream的主要成员变量
public class DataStream {
protected final StreamExecutionEnvironment environment;
protected final Transformation transformation;
/**
* Create a new {@link DataStream} in the given execution environment with partitioning set to
* forward by default.
*
* @param environment The StreamExecutionEnvironment
*/
public DataStream(StreamExecutionEnvironment environment, Transformation transformation) {
this.environment =
Preconditions.checkNotNull(environment, "Execution Environment must not be null.");
this.transformation =
Preconditions.checkNotNull(
transformation, "Stream Transformation must not be null.");
}
}
2.1 DataStream数据结构包含两个成员变量StreamExecutionEnvironment和Transformation。通过transformation生成当前的DataStream。
2.2 通过DataStream 构建Flink作业时,env会将DataStream之间的转换操作存储到env中List<Transformation>,transformations集合,然后基于转换操作去构建Pipeline,用于描述作业的总体计算逻辑,流作业具体实现类是StreamGraph,批作业对应的Pipeline实现类为Plan。
2.3 具体转换图

每个StreamTransformation都包含相应的StreamOperator,例如执行DataStream.flatMap(new FlatMapFuction(…)) 转换之后,内部生成了StreamFlatMap,StreamOperator包含了自定义函数的信息,StreamFlatMap算子包含了FlatMapFuction。
2.4 StreamFlatMap
以DataStream的flatMap转换操作为例,分析DataStream底层源码实现,首先自定义FlatMapFunction实现数据的处理逻辑,然后调用DataStream.flatMap()方法将FlatMapFunction作为参数应用在FlatMap转换操作中。在DataStream.flatMap()方法中可以看出,调用了transform()方法进行后续的转换处理,调用过程基于FlatMapFunction参数创建StreamFlatMap实例,StreamFlatMap本质上就是StreamOperator的实现类。
public SingleOutputStreamOperator flatMap(FlatMapFunction flatMapper) {
TypeInformation outType =
TypeExtractor.getFlatMapReturnTypes(
clean(flatMapper), getType(), Utils.getCallLocationName(), true);
return flatMap(flatMapper, outType);
}
public SingleOutputStreamOperator flatMap(
FlatMapFunction flatMapper, TypeInformation outputType) {
return transform("Flat Map", outputType, new StreamFlatMap(clean(flatMapper)));
}
public SingleOutputStreamOperator transform(
String operatorName,
TypeInformation outTypeInfo,
OneInputStreamOperator operator) {
return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}
protected SingleOutputStreamOperator doTransform(
String operatorName,
TypeInformation outTypeInfo,
StreamOperatorFactory operatorFactory) {
// 获取上一次转换操作输出的TypeInformation信息
transformation.getOutputType();
// 基于operatorName、、outTypeInfo和operatorFactory等参数创建OneInputTransformation实例,其中OneInputTransformation会包含当前DataStream对应的上一次转换操作。
OneInputTransformation resultTransform =
new OneInputTransformation(
this.transformation,
operatorName,
operatorFactory,
outTypeInfo,
environment.getParallelism(),
false);
// 基于resultTransform创建SingleOutputStreamOperator。SingleOutputStreamOperator继承自DataStream,每次转换操作返回给用户的数据结构,以便继续调用
@SuppressWarnings({"unchecked", "rawtypes"})
SingleOutputStreamOperator returnStream =
new SingleOutputStreamOperator(environment, resultTransform);
// 将创建好的OneInputTransformation添加到env中的List<Transformation>集合中,用于生成StreamGraph对象
getExecutionEnvironment().addOperator(resultTransform);
return returnStream;
}
总结
在DataStream转换的过程中,不管是哪种类型的转换操作,都是按照相同的方式:首先将用户自定义的函数封装到Operator中,然后将Operator封装到Transformation转换操作中,最后将Transformation添加到StreamExecutionEnviroment提供的Transfromation集合。通过DataStream之间的转换操作生成pipeline即Streamgraph
本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://net2asp.com/094e101802.html
