FLink之StreamOperator

一、StreamOperator的定义与实现

紧接上文,Transformation负责描述DataStream之间的转换操作,Transformation结构中最主要的组成部分就是StreamOperator

1.1 StreamOperator接口关系图

关系图

由关系图不难看出:不管是OneInputStreamOperator还是TwoInputStreamOperator类型的算子都继承自AbstractStreamOperator基本实现类。在调度和执行task实例是,会通过AbstractStreamOperator提供的入口方法触发和执行Operator,同时AbstractStreamOperator中也定义了所有算子的公共的组成部分,如StreamRuntimeConetxt、OperatorStateBackedn等。对于AbstractStreamOperator如何被SubTask触发执行,会在后续的章节进行详细的介绍。

1.2 StreamOperator接口实现

接口方法

1.2.1 open()方法:

定义当前Operator的初始化方法,在数据元素正式接入

Operator运算之前,Task会调用StreamOperator.open()方法对该算子进行

初始化,具体open()方法的定义由子类实现,常见的用法如调用

RichFunction中的open()方法创建相应的状态变量。

1.2.2 close()方法

当所有的数据元素都添加到当前Operator时,就会调用

该方法刷新所有剩余的缓冲数据,保证算子中所有数据被正确处理

1.2.3 finish()方法

算子生命周期结束时会调用此方法,包括算子操作执行成功、失败、或者取消时。

1.2.4 prepareSnapshotPreBarrier()方法

在StreamOperator正式执行checkpoint操作之前会调用该方法

1.2.5 snapshotState() 方法

当SubTask执行checkpoint操作时会调用该方法,用于触发该Operator中状态数据的快照操作

1.2.6 initializeState()方法

当算子启动或者重启时,该方法初始化状态数据,当恢复作业任务时,算子会从检查点(checkpoint)持久化的数据中恢复数据。

1.3 AbstractStreamOperator的基本实现

AbstractStreamOperator作为StreamOperator的基本实现类,所有的Operator都会继承和实现该抽象方法

AbstractStreamOperator

1.3.1 成员变量StreamConfig config

存储该StreamOperator的配置信息,实际上是对Configuration参数进行的封装。

1.3.2 成员变量Output output

定义了当前StreamOperator的输出操作,执行完该算子的所有转换操作后,会通过Output的collect将数据发送到下游

1.3.3 StreamingRuntimeContext runtimeContext

主要定义了UDF函数执行过程中的上下文信息,例如获取累加器、状态数据

1.3.4 private transient KeySelector stateKeySelector1;

DataStream只有经过keyBy()转换生成KeyedStream后,才会设定stateKeySelector1变量信息

1.3.5 private transient KeySelector stateKeySelector2

只有执行两个KeyedStream关联操作时使用,例如两个流进行join操作在AbstractStreamOperator中会保存stateKeySelector2的信息。

1.3.6 AbstractKeyedStateBackend keyedStateBackend

用于存储KeyedState的状态管理后端,默认为HashMapStateBackend。如果配置

RocksDB作为状态存储后端,则此处为RocksDBKeyedStateBackend。

1.3.7 private transient StreamOperatorStateHandler stateHandler

statehanler 对象封装了KeyedStateBackend和OperatorStateBackend用于同一管理StreamOperator的状态

1.3.8 OperatorMetricGroup metrics

同于记录算子层面的监控指标,包括numRecordsIn、numRecordsOut、numRecordsInRate、

numRecordsOutRate等

1.3.9 protected transient LatencyStats latencyStats

用于采集和汇报当前Operator的延迟时状况

1.3.10 protected transient ProcessingTimeService processingTimeService

基于ProcessingTime的时间服务,实现ProcessingTime时间操作,例如获取当前时间,然后创建定时器等等。

1.3.11 private transient IndexedCombinedWatermarkStatus combinedWatermark

在双输入类型的算子中,如果基于事件时间处理乱序数据,会在AbstractStreamOperator中合并输入的Watermark 选择最小的Watermark作为合并后的指标,并存储在 combinedWatermark变量中。

二、OneInputStreamOperator与 TwoInputStreamOperator

StreamOperator根据输入流的数量分为两种类型,即支持单输入

流的OneInputStreamOperator以及支持双输入流的

TwoInputStreamOperator,我们可以将其称为一元输入算子和二元输

入算子。

2.1 OneInputStreamOperator的实现

OneInputStreamOperator定义了单输入流的StreamOperator,常见的实现类StreamMap StreamFlatMap StreamFilter等
找一个具体实现进行查看 如 StreamMap
@Internal
public class StreamMap extends AbstractUdfStreamOperator<OUT, MapFunction>
        implements OneInputStreamOperator {

    private static final long serialVersionUID = 1L;

    public StreamMap(MapFunction mapper) {
        super(mapper);
        chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override
    public void processElement(StreamRecord element) throws Exception {
        output.collect(element.replace(userFunction.map(element.getValue())));
    }
}

在StreamFilter算子构造器中,内部的Function类型为

FilterFunction,并设定上下游算子的链接策略为

ChainingStrategy.ALWAYS,也就是该类型的Operator通常都会与上下游

的Operator连接在一起,形成OperatorChain。

在StreamFilter中实现了OneInputStreamOperator的

processElement()方法,通过该方法定义了具体的数据元素处理逻辑。

实际上就是使用定义的filterFunction对接入的数据进行筛选,然后通过

output.collect(element)方法将符合的条件输出到下游算子中。

2.2 TwoInputStreamOperator的实现

TwoInputStreamOperator定义了双输入流类型的StreamOperator

接口实现,常见的实现类有CoStreamMap、HashJoinOperator等算子。

TwoInputStreamOperator接口定义的主要方法实现对两个数据流转换操作的同时,还定义了两条数据流中Watermark和LatencyMarker的处理逻辑

public interface TwoInputStreamOperator extends StreamOperator {
	//  处理输入源1的数据元素方法
	void processElement1(StreamRecord element) throws Exception;
	//  处理输入源2的数据元素方法
    void processElement2(StreamRecord element) throws Exception;
	//  处理输入源1的数据水位线
    void processWatermark1(Watermark mark) throws Exception;
	//  处理输入源2的数据水位线
    void processWatermark2(Watermark mark) throws Exception;
   
	// 处理输入源1的LatencyMarker(延迟标记)方法
    void processLatencyMarker1(LatencyMarker latencyMarker) throws Exception;

	// 处理输入源2的LatencyMarker(延迟标记)方法
    void processLatencyMarker2(LatencyMarker latencyMarker) throws Exception;
	//  处理输入源1的数据水位线状态
    void processWatermarkStatus1(WatermarkStatus watermarkStatus) throws Exception;
	//  处理输入源2的数据水位线状态
    void processWatermarkStatus2(WatermarkStatus watermarkStatus) throws Exception;
}

以CoStreamMap类的定义和实现为例:

public class CoStreamMap
        extends AbstractUdfStreamOperator<OUT, CoMapFunction>
        implements TwoInputStreamOperator {

    private static final long serialVersionUID = 1L;

    public CoStreamMap(CoMapFunction mapper) {
        super(mapper);
    }

    @Override
    public void processElement1(StreamRecord element) throws Exception {
        output.collect(element.replace(userFunction.map1(element.getValue())));
    }

    @Override
    public void processElement2(StreamRecord element) throws Exception {
        output.collect(element.replace(userFunction.map2(element.getValue())));
    }
}

从CoStreamMap算子定义中

可以看出,CoStreamMap继承AbstractUdfStreamOperator的同时,实

现了TwoInputStreamOperator接口。其中在processElement1()和

processElement2()两个方法的实现中,分别调用了用户定义的

CoMapFunction的map1()和map2()方法对输入的数据元素Input1和

Input2进行处理。经过函数处理后的结果会通过output.collect()接

口推送到下游的Operator中。

三、StreamOperatorFactory工厂类

StreamOperator最终会通过StreamOperatorFactory封装到Transformation结构中,并存储在StreamGraph和JobGraph中。直到运行时执行StreamTask时,才会调用StreamOperatorFactory.createStreamOperator()方法在StreamOperatorFactory中定义StreamOperator实例。

StreamOperatorFactory接口:

@PublicEvolving
public interface StreamOperatorFactory extends Serializable {

    <T extends StreamOperator> T createStreamOperator(
            StreamOperatorParameters parameters);
    void setChainingStrategy(ChainingStrategy strategy);
    ChainingStrategy getChainingStrategy();
    default boolean isStreamSource() {
        return false;
    }

    default boolean isLegacySource() {
        return false;
    }

    default boolean isOutputTypeConfigurable() {
        return false;
    }

    default void setOutputType(TypeInformation type, ExecutionConfig executionConfig) {}

    default boolean isInputTypeConfigurable() {
        return false;
    }

    default void setInputType(TypeInformation type, ExecutionConfig executionConfig) {}

    Class getStreamOperatorClass(ClassLoader classLoader);
}

StreamOperatorFactory继承关系图:

继承关系

StreamOperatorFactory封装创建StreamOperator的操作,在DataStreamAPI中主要通过SimpleStreamOperatorFactory创建已经定义好的Operator ,DataStream API 中大部分操作都是通过

public class SimpleOperatorFactory extends AbstractStreamOperatorFactory {

    private final StreamOperator operator;

    /** Create a SimpleOperatorFactory from existed StreamOperator. */
    @SuppressWarnings("unchecked")
    public static  SimpleOperatorFactory of(StreamOperator operator) {
        if (operator == null) {
            return null;
            // 如果Operator是StreamSource类型的且userFunction 为InputFormatSourceFuction
            // 返回SimpleInputFormatOperatorFactory
        } else if (operator instanceof StreamSource
                && ((StreamSource) operator).getUserFunction()
                        instanceof InputFormatSourceFunction) {
            return new SimpleInputFormatOperatorFactory((StreamSource) operator);
            // 如果Operator是StreamSink类型,且UserFuction类型为 OutputFormatSinkFuction 
            // 返回SimpleOutputFormatOperatorFactory
        } else if (operator instanceof UserFunctionProvider
                && (((UserFunctionProvider) operator).getUserFunction()
                        instanceof OutputFormatSinkFunction)) {
            return new SimpleOutputFormatOperatorFactory(
                    (((OutputFormatSinkFunction)
                                    ((UserFunctionProvider) operator).getUserFunction())
                            .getFormat()),
                    operator);
         //  如果Operator是AbstractUdfStreamOperator则返回 SimpleUdfStreamOperatorFactory 
        } else if (operator instanceof AbstractUdfStreamOperator) {
            return new SimpleUdfStreamOperatorFactory((AbstractUdfStreamOperator) operator);
            // 如果是其他情况返回SimpleOperatorFactory
        } else {
            return new SimpleOperatorFactory(operator);
        }
    }
}

SimpleOperatorFactory.of()方法定义中可以看出,基于StreamOperator提供的of()方法对算子进行工厂类的封装,将Operator封装在OperatorFactory中。然后根据Operator类型的不同,创建不同的SimpleOperator实现类。

  public <T extends StreamOperator> T createStreamOperator(
            StreamOperatorParameters parameters) {
        if (operator instanceof AbstractStreamOperator) {
            ((AbstractStreamOperator) operator).setProcessingTimeService(processingTimeService);
        }
        if (operator instanceof SetupableStreamOperator) {
            ((SetupableStreamOperator) operator)
                    .setup(
                            parameters.getContainingTask(),
                            parameters.getStreamConfig(),
                            parameters.getOutput());
        }
        return (T) operator;
    }

SimpleOperatorFactory.createStreamOperator()方法创建StreamOperator实例。如果算子同时实现SetupableStreamOperator接口,则会调用setup()方法对算子进行基本的设置。

本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://net2asp.com/35ba570f77.html