Flink编程——最小程序MiniProgram

Flink最小程序MiniProgram

前面我们已经搭建起了Flink 的基础环境,这一节我们就在上一节的基础上,进行编写我们的第一个Flink 程序,开始之前我们先看一下一个完整的Flink 程序是什么样的

创建项目

快速创建Flink工程

我们可以使用Flink 提供的Maven 骨架程序,快速创建一个Flink 程序

 mvn archetype:generate                \
  -DarchetypeGroupId=org.apache.flink   \
  -DarchetypeArtifactId=flink-quickstart-java \
  -DarchetypeVersion=1.18.1

这允许你命名新建的项目,而且会交互式地询问 groupId、artifactId、package 的名字

image-20240215112946150

或者你可以在通过在命令指定更多的参数

mvn archetype:generate \
    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-quickstart-java \
    -DarchetypeVersion=1.18.1 \
    -DgroupId=com.kingcall \
    -DartifactId=flink-demo \
    -Dversion=0.1 \
    -Dpackage=spendreport \
    -DinteractiveMode=false

当然我们也可以通过脚本

curl https://flink.apache.org/q/quickstart.sh | bash -s 1.18.1
Flink 依赖

Flink提供了两大 API:Datastream API 和 Table API & SQL,它们可以单独使用,也可以混合使用,具体取决于你的使用场景:

你要使用的 API 你需要添加的依赖项
DataStream flink-streaming-java
DataStream Scala 版 flink-streaming-scala_2.12
Table API flink-table-api-java
Table API Scala 版 flink-table-api-scala_2.12
Table API + DataStream flink-table-api-java-bridge
Table API + DataStream Scala 版 flink-table-api-scala-bridge_2.12

你只需将它们包含在你的构建工具脚本/描述符中,就可以开发你的作业了!

Flink 程序结构

为了演示Flink 程序结构,我们下面写了一个程序,这个程序我称之为 MiniProgram,也就是流程序的最小结构

package com.kingcall.examples.stream;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;


public class MiniProgram {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();

        ArrayList arras = new ArrayList();
        arras.add("China");
        arras.add("Japan");
        arras.add("Russia");

        DataStream country= env.fromCollection(arras);

        DataStream filters = country.filter(new FilterFunction() {
            @Override
            public boolean filter(String str) throws Exception {
                return !"Japan".equals(str);
            }
        });

        filters.print();

        env.execute();
    }
}

可以参考下面的注释,一个流程序就这样被创建出来了,一个标准的程序就是这样的结构

  1. 每个 Flink 应用都需要有执行环境,在该示例中为 env。流式应用需要用到 StreamExecutionEnvironment。
  2. DataStream API 将你的应用构建为一个 job graph,并附加到 StreamExecutionEnvironment 。当调用 env.execute() 时此 graph 就被打包并发送到 JobManager 上,后者对作业并行处理并将其子任务分发给 Task Manager 来执行。每个作业的并行子任务将在 task slot 中执行。
  3. 上述示例用 filters.print() 打印其结果到 task manager 的日志中(如果运行在 IDE 中时,将追加到你的 IDE 控制台)。它会对流中的每个元素都调用 toString() 方法。

image-20240123093838466

需要特别注意的是,Flink 的流程序需要我们在最后调用env.execute() 才可以执行,在此之前都是在定义程序的执行逻辑,只有最后的这一步骤才会提交任务并执行

如果没有调用 execute(),应用就不会运行

下面就是程序的输出

image-20240123095811283

1> 和 5> 指出输出来自哪个 sub-task(即 thread),我们可以对上面的程序进行改造一下,也就是设置并行度env.setParallelism(1); 这个时候输出就没有> 这样的提示了

image-20240123100033191

基本的 stream source

上述示例用 env.fromElements(…) 方法构造 DataStream 。这样将简单的流放在一起是为了方便用于原型或测试。StreamExecutionEnvironment 上还有一个 fromCollection(Collection) 方法。因此,你可以这样做:

List people = new ArrayList();

people.add(new Person("Fred", 35));
people.add(new Person("Wilma", 35));
people.add(new Person("Pebbles", 2));

DataStream flintstones = env.fromCollection(people);

另一个获取数据到流中的便捷方法是用 socket

DataStream lines = env.socketTextStream("localhost", 9999)

或读取文件

DataStream lines = env.readTextFile("file:///path");

在真实的应用中,最常用的数据源是那些支持低延迟,高吞吐并行读取以及重复(高性能和容错能力为先决条件)的数据源,例如 Apache Kafka,Kinesis 和各种文件系统。REST API 和数据库也经常用于增强流处理的能力(stream enrichment)。

基本的 stream sink

上述示例用 filters.print() 打印其结果到 task manager 的日志中(如果运行在 IDE 中时,将追加到你的 IDE 控制台)。它会对流中的每个元素都调用 toString() 方法。

输出看起来类似于

1> Fred: age 35
2> Wilma: age 35
  1. 1> 和 2> 指出输出来自哪个 sub-task(即 thread)在生产中
  2. 常用的 sink 包括各种数据库和几个 pub-sub 系统

什么是DataStream

前面我们介绍了Flink程序的结构,我们是通过一个叫做最小程序的MiniProgram 进行说明的,我们之前一直说到Flink 程序的数据流,流说明数据是实时的流动的,那这里的数据有什么格式的要求吗,或者什么样的数据才能流动。

DataStream API 得名于特殊的 DataStream 类,该类用于表示 Flink 程序中的数据集合。你可以认为 它们是可以包含重复项的不可变数据集合。这些数据可以是有界(有限)的,也可以是无界(无限)的,但用于处理它们的API是相同的。

DataStream 在用法上类似于常规的 Java 集合,但在某些关键方面却大不相同。它们是不可变的,这意味着一旦它们被创建,你就不能添加或删除元素。你也不能简单地察看内部元素,而只能使用 DataStream API 操作来处理它们,DataStream API 操作也叫作转换(transformation)。

你可以通过在 Flink 程序中添加 source 创建一个初始的 DataStream。然后,你可以基于 DataStream 派生新的流,并使用 map、filter 等 API 方法把 DataStream 和派生的流连接在一起。

在自然环境中,数据的产生原本就是流式的。无论是来自 Web 服务器的事件数据,证券交易所的交易数据,还是来自工厂车间机器上的传感器数据,其数据都是流式的。但是当你分析数据时,可以围绕 有界流bounded)或 无界流unbounded)两种模型来组织处理数据,当然,选择不同的模型,程序的执行和处理方式也都会不同。

Bounded and unbounded streams

DataStream 中的元素

Flink 的 Java DataStream API 可以将任何可序列化的对象转化为流。Flink 自带的序列化器有

  • 基本类型,即 String、Long、Integer、Boolean、Array
  • 复合类型:Tuples、POJOs 和 Scala case classes

而且 Flink 会交给 Kryo 序列化其他类型。也可以将其他序列化器和 Flink 一起使用。特别是有良好支持的 Avro。

Flink 的原生序列化器可以高效地操作 tuples 和 POJOs

Tuples

对于 Java,Flink 自带有 Tuple0 到 Tuple25 类型。

Tuple2 person = Tuple2.of("Fred", 35);

// zero based index!  
String name = person.f0;
Integer age = person.f1;
POJOs

如果满足以下条件,Flink 将数据类型识别为 POJO 类型(并允许“按名称”字段引用):

  • 该类是公有且独立的(没有非静态内部类)
  • 该类有公有的无参构造函数
  • 类(及父类)中所有的所有不被 static、transient 修饰的属性要么是公有的(且不被 final 修饰),要么是包含公有的 getter 和 setter 方法,这些方法遵循 Java bean 命名规范。

示例:

public class Person {
    public String name;  
    public Integer age;  
    public Person() {}
    public Person(String name, Integer age) {  
        . . .
    }
}  

Person person = new Person("Fred Flintstone", 35);
完整程序

下面的程序将关于人的记录流作为输入,并且过滤后只包含成年人。

package com.kingcall.examples.stream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.functions.FilterFunction;

public class Adults {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream flintstones = env.fromElements(
                new Person("Fred", 35),
                new Person("Wilma", 35),
                new Person("Pebbles", 2));

        DataStream adults = flintstones.filter(new FilterFunction() {
            @Override
            public boolean filter(Person person) throws Exception {
                return person.age >= 18;
            }
        });

        adults.print();

        env.execute();
    }

    public static class Person {
        public String name;
        public Integer age;

        public Person() {
        }

        public Person(String name, Integer age) {
            this.name = name;
            this.age = age;
        }

        @Override
        public String toString() {
            return this.name.toString() + ": age " + this.age.toString();
        }
    }
}

下面就是程序的输出

image-20240123095457015

DataStream执行环境

每个 Flink 应用都需要有执行环境,在该示例中为 env。流式应用需要用到 StreamExecutionEnvironment。

DataStream API 将你的应用构建为一个 job graph,并附加到 StreamExecutionEnvironment 。当调用 env.execute() 时此 graph 就被打包并发送到 JobManager 上,后者对作业并行处理并将其子任务分发给 Task Manager 来执行。每个作业的并行子任务将在 task slot 中执行。

注意,如果没有调用 execute(),应用就不会运行。

image-20240123101650818

调试

在生产中,应用程序将在远程集群或一组容器中运行。如果集群或容器挂了,这就属于远程失败。JobManager 和 TaskManager 日志对于调试此类故障非常有用,但是更简单的是 Flink 支持在 IDE 内部进行本地调试。你可以设置断点,检查局部变量,并逐行执行代码。如果想了解 Flink 的工作原理和内部细节,查看 Flink 源码也是非常好的方法。

总结

Flink 中的 DataStream 程序是对数据流(例如过滤、更新状态、定义窗口、聚合)进行转换的常规程序。数据流的起始是从各种源(例如消息队列、套接字流、文件)创建的。结果通过 sink 返回,例如可以将数据写入文件或标准输出(例如命令行终端)。Flink 程序可以在各种上下文中运行,可以独立运行,也可以嵌入到其它程序中。任务执行可以运行在本地 JVM 中,也可以运行在多台机器的集群上。

Flink 程序看起来像一个转换 DataStream 的常规程序。每个程序由相同的基本部分组成:

  1. 获取一个执行环境(execution environment);
  2. 加载/创建初始数据;
  3. 指定数据相关的转换;
  4. 指定计算结果的存储位置;
  5. 触发程序执行。

本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://net2asp.com/c6d94817fc.html