Flink|《Flink 官方文档 – DataStream API – 概览》学习笔记

学习文档:Flink 官方文档 – DataStream API – 概览

学习笔记如下:


DataStream

Flink 的 DataStream API:

  • 数据里的起始是各种 source,例如消息队列、socket 流、文件等;
  • 对数据流进行转换,例如过滤、更新状态、定义窗口、聚合等;
  • 结果通过 sink 返回,例如可以将数据写入文件或标准输出。

DataStream:Flink 程序中的数据集合;可以将其理解为包含重复项的不可变数据集合。这些数据可以是有界的,也可以是无界的,但用于处理它们的 API 是相同的。

相较于常规的 Java 集合,DataStream 有以下差异:

  • 不可变,一旦创建就不能添加或删除元素
  • 不能简单地查看内部元素,只能使用 DataStream API 来处理它们

DataStream 源码:flink-streaming-java: org.apache.flink.streaming.api.datastream.DataStream

Flink 程序

Flink 程序看起来像一个转换 DataStream 的常规程序,但是 Flink 程序都是延迟执行的。当程序的 main() 方法被执行时,数据加载和转换不会直接发生,只会将每个算子都创建并添加到 dataflow 形成的有向图;只有被执行环境的 execute() 方法显式地被处罚后,这些算子才会真正执行。

每个程序由相同的基本部分组成:

Step 1|获取一个执行环境(execution environment)

通常,调用 StreamExecutionEnvironment 的如下静态方法获取执行环境:

  • getExecutionEnvironment():通常调用这个方法即可
  • createLocalEnvironment()
  • createRemoteEnvironment(String host, int port, String… jarFiles)

样例:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Step 2|加载 / 创建初始数据

执行环境提供了一些方法,用于从任何第三方提供的 source 或本地文件中读取数据。这将生成一个 DataStream,可以在上面应用转换(trasformation)来创建新的派生 DataStream。

样例:以直接逐行读取本地文件中的数据

DataStream text = env.readTextFile("file:///path/to/file");

Step 3|指定数据相关的转换

可以通过调用 DataStream 上具有转换功能的方法来应用转换。

样例:使用 map 进行转换(将每个字符串转换为一个整数并创建一个新的 DataStream)

DataStream input = ...;

DataStream parsed = input.map(new MapFunction() {
    @Override
    public Integer map(String value) {
        return Integer.parseInt(value);
    }
});

Step 4|指定计算结果的存储位置

可以将包含最终结果的 DataStream,通过创建 sink 写出到外部系统。

样例:将数据结果写出到文件

writeAsText(String path);

Step 5|触发程序执行

需要调用 StreamExecutionEnvironment 的 execute() 或 executeAsync() 来触发程序执行。根据 StreamExecutionEnvironment 的类型,执行会在本地机器上触发,或提交到某个集群上执行。

  • execute() 方法:等待作业完成,然后返回一个 JobExecutionResult,其中包括执行时间和累加器结果
  • executeAsync() 方法:触发作业异步执行,它会返回一个 JobClient,可以通过它与刚刚提交的作业进行通信。

Data Source

Source 是 Flink 程序读取输入的地方。可以通过 StreamExecutionEnvironment.addSource(sourceFunction) 添加 Source,也可以使用 Flink 自带的 source function。

DataSink

Sink 是 Flink 程序写出结果的地方。可以通过 DataStream.addSink(sinkFunction) 添加 Sink,也可以使用 Flink 自带的 sink function。

但是需要注意,DataStream 的 write*() 方法主要用于调试目的,它们不涉及 checkpoint,因此这些函数通常具有至少一次语义。

Iterations

Iterations 是对数据进行迭代处理的机制。在使用 IterativeStream 时,需要指定哪一部分反馈给迭代,哪一部分使用旁路输出或使用过滤器转发到下游。通常来说,我们首先定义一个 IterativeStream 流,例如:

IterativeStream iteration = input.iterate();

然后,指定循环内执行的转换逻辑,例如:

DataStream iterationBody = iteration.map(/* this is executed many times */);

最后,使用 IterativeStream 的 closeWith(feedbackStream) 方法,定义迭代的结束条件。提供的 closeWith 的 DataStream 将反馈给迭代头。一种常见的模式,是使用过滤器将反馈的流部分和向前传播(重新迭代)的流部分分开。例如:

iteration.closeWith(iterationBody.filter(/* one part of the stream */));  // 继续迭代
DataStream output = iterationBody.filter(/* some other part of the stream */);  // 传递给下游的流

样例:下面的程序从一系列整数中连续减去 1,直到它们达到零

DataStream someIntegers = env.generateSequence(0, 1000);

IterativeStream iteration = someIntegers.iterate();

DataStream minusOne = iteration.map(new MapFunction() {
  @Override
  public Long map(Long value) throws Exception {
    return value - 1 ;
  }
});

DataStream stillGreaterThanZero = minusOne.filter(new FilterFunction() {
  @Override
  public boolean filter(Long value) throws Exception {
    return (value > 0);
  }
});

iteration.closeWith(stillGreaterThanZero);

DataStream lessThanZero = minusOne.filter(new FilterFunction() {
  @Override
  public boolean filter(Long value) throws Exception {
    return (value <= 0);
  }
});

控制延迟

如果将元素在网络上逐个传输,会导致大量频繁的网络请求。因此,Flink 不会将元素不会在网络上一一传输,而是会进行缓冲,每次通过通过传输直接传输整个完整的缓冲区。缓冲区的大小可以在 Flink 配置文件中传输。

触发缓冲区的网络传输,有两种情况:

  • 缓冲区已满
  • 缓冲区已达到缓冲区的最长等待时间,如果超过缓冲区的最长等待时间,那么即使缓冲区没有满也会被自动发送。超时时间的默认值为 100 毫秒

缓冲区的设置样例如下:

LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setBufferTimeout(timeoutMillis);  // 给流设置缓冲区超时时间
env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);  // 给 Operator 设置缓冲区超时时间

缓冲区的超时时间设置得越长,吞吐量越大,极限可以设置 setBufferTimeout(-1) 来关闭超时,这样缓冲区只有在它们已满时才会被刷新;缓冲区设置得越小,延迟越少,但应避免设置超时为 0 的缓冲区,因为它会导致严重的性能下降。

调试

本地执行环境

在本地调试时,可以使用 LocalStreamEnvironment 的本地执行环境,它将在创建它的同一个 JVM 进程中启动 Flink 程序。如果从 IDE 启动 LocalEnvironment,则可以在代码中设置断点已实现调试。

样例:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

DataStream lines = env.addSource(/* some source */);
// 构建你的程序

env.execute();
从 Java 集合构造 Data Sources

在本地调试时,可以使用 fromElements 或 fromCollection 方法从 Java 集合中读取数据构造 DataStream。

样例:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

// 从元素列表创建一个 DataStream
DataStream myInts = env.fromElements(1, 2, 3, 4, 5);

// 从任何 Java 集合创建一个 DataStream
List<Tuple2> data = ...
DataStream<Tuple2> myTuples = env.fromCollection(data);

// 从迭代器创建一个 DataStream
Iterator longIt = ...
DataStream myLongs = env.fromCollection(longIt, Long.class);
将 Data Sink 写出到 Java 迭代器

类似地,也可以将 DataStream 写出到本地的迭代器。

样例:

DataStream<Tuple2> myResult = ...
Iterator<Tuple2> myOutput = myResult.collectAsync();

本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://net2asp.com/d7e00ca599.html