flink之addSource & fromSource 、addSink & SinkTo

一、addSource & fromSource 、addSink & SinkTo

       这两组算子区别在于:addSource和addSink需要自己实现SourceFunction或者是SinkFunction,其中读取数据的逻辑,容错等都需要自己实现;fromSource和SinkTo,是flink提供的简易的读取和输出的算子,建议优先使用fromSource和SinkTo,并结合flink官方文档;

二、filesystem source算子

1.readTextFile(
filePath: String, charsetName: String
):底层调用的是
readFile(format
,
filePath
,
FileProcessingMode.
PROCESS_ONCE
,

1
,
typeInfo)

2.readFile(
FileInputFormat inputFormat
,
String filePath
,
FileProcessingMode watchType
,
long
interval
,
FilePathFilter filter

①FileInputFormat inputFormat:定义读取文件的类,具体可以看有哪些实现类,根据需要读取文件的类型定,也可以自定义;

②String filePath:文件路径

③FileProcessingMode watchType:定义读取文件的模式,读一次:FileProcessingMode.
PROCESS_ONCE
读多次:FileProcessingMode
.
PROCESS_CONTINUOUSLY

④long interval:读取文件的的时间间隔,batch模式设置为-1,单位毫秒

⑤FilePathFilter filter:过滤文件,标记为
@Deprecated

以上两种source底层源码并行度都为1,而且都是调用的addSource传入的SourceFunction;说个题外话,在1.14以前flink Kafka都是使用的是addSource,实现的是ParalismSourceFunction以及一些容错的类,1.14发布以后采用的fromSource,使用的架构是
分片(Splits)

分片枚举器(SplitEnumerator) 
以及 
源阅读器(SourceReader)

3.FileSource

A
·batch模式

val fileSource: FileSource[String] = FileSource

.
forBulkFileFormat
()

.build()

B.stream模式

依赖:

org.apache.flink

flink-connector-files

1.12.5

代码:

val fileSource: FileSource[String] = FileSource

.
forRecordStreamFormat
(new TextLineFormat(), new Path(“”))

.monitorContinuously(Duration.
ofSeconds
(1000))

.build()

这种方式可以并行读取文件,并且也做好了容错等,但是具体的逻辑请看flink官方代码,
需要注意的是如果使用的是
TextLineFormat,因为其父类
SimpleStreamFormat的isSplittable方法返回的是false所以即使fromSource是多并行度的是,但是仍然不可以并行读取文件;但是可以但是可以重写StreamFormat,下面是一个参照TextLineFormat简单的实现

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.src.reader.StreamFormat;
import org.apache.flink.connector.file.src.reader.TextLineFormat;
import org.apache.flink.core.fs.FSDataInputStream;

import javax.annotation.Nullable;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;


public class MyStreamFormat implements StreamFormat {
    private static final long serialVersionUID = 1L;

    public static final String DEFAULT_CHARSET_NAME = "UTF-8";

    private final String charsetName;

    public MyStreamFormat() {
        this(DEFAULT_CHARSET_NAME);
    }

    public MyStreamFormat(String charsetName) {
        this.charsetName = charsetName;
    }

    @Override
    public Reader createReader(Configuration config, FSDataInputStream stream, long fileLen, long splitEnd) throws IOException {
        final BufferedReader reader =
                new BufferedReader(new InputStreamReader(stream, charsetName));
        return new MyStreamFormat.Reader(reader);
    }

    @Override
    public Reader restoreReader(Configuration config, FSDataInputStream stream, long restoredOffset, long fileLen, long splitEnd) throws IOException {
        stream.seek(restoredOffset);
        return createReader(config, stream,fileLen,splitEnd);
    }

    @Override
    public boolean isSplittable() {
        return true;
    }

    @Override
    public TypeInformation getProducedType() {
        return Types.STRING;
    }

    public static final class Reader implements StreamFormat.Reader {

        private final BufferedReader reader;

        Reader(final BufferedReader reader) {
            this.reader = reader;
        }

        @Nullable
        @Override
        public String read() throws IOException {
            return reader.readLine();
        }

        @Override
        public void close() throws IOException {
            reader.close();
        }
    }
}

二、filesystem sink
算子

StreamFileSink,1.14版本之后是FileSink,这里以前者为例

两种写入模式:forRowFormat、forBulkFormat

1.forRowFormat、forBulkFormat的构造

①forRowFormat(final Path basePath, final Encoder encoder)

行模式下,自定义内容限定于文件内部,想对文件进行压缩等操作,则很难办到;

//这个类只有一个方法

public interface Encoder extends Serializable {

    void encode(IN element, OutputStream stream) throws IOException;

}

②forBulkFormat( final Path basePath, final BulkWriter.Factory bulkWriterFactory)

列模式下,不仅可以对文件内部操作,也可以轻松做到对文件压缩等操作;

public class Mybucket implements BulkWriter{

   

    FSDataOutputStream fsDataOutputStream=null;

    GzipCompressorOutputStream gzout=null;

    @Override

    //每条数据通过流写入文件

    public void addElement(T element) throws IOException {

         gzout.write(element.toString().getBytes());

    }

    //刷新流,如果考虑效率问题这里可以不刷,到flinish()方法刷也行

    @Override

    public void flush() throws IOException {

         gzout.flush;

    }

    //关闭流

    //注意:此方法不能关闭Factory传入的流,这是框架完成的事!!!

    @Override

    public void finish() throws IOException {

         gzout.close;

    }

    

    //创建一个writer

    //这里将类单独写也行,无非就是目前的外部类多一个构造方法

    class MyFactory implements Factory{

            @Override

            public Mybucket create(FSDataOutputStream out) throws IOException {

                fsDataOutputStream=out;

                GzipCompressorOutputStream gzipOutput = new GzipCompressorOutputStream(output);

                return Mybucket.this;

            }}}

2.withBucketAssigner():

①解释:指定分桶策略,所谓桶,即数据应该去往哪个文件夹;
行模式和列模式是通用的;

②参数:BucketAssigner,其实现类有三个,:

BasePathBucketAssigner:

//直接再给定的路径下生成文件,不会生成文件夹

public String getBucketId(T element, BucketAssigner.Context context) {return “”;}

DateTimeBucketAssigner:

//日期格式( 即 桶大小)和时区都可以手动配置。(见其构造方法)

//生成文件夹:yyyy-MM-dd–HH,所以是按小时滚动桶的

public String getBucketId(IN element, BucketAssigner.Context context) {

        if (dateTimeFormatter == null) {

            dateTimeFormatter = DateTimeFormatter.ofPattern(formatString).withZone(zoneId);

        }

        return dateTimeFormatter.format(Instant.ofEpochMilli(context.currentProcessingTime()));

    }

自定义BucketAssigner:这个需要根据自己的业务逻辑去看继承哪个,如果只是按数据分桶那可以直接继承BasePathBucketAssigner重写getBucketId,如果是需要自定义时间可以继承DateTimeBucketAssigner重写getBucketId,如果业务逻辑进一步复杂那就重写BucketAssigner,下面简单简绍BucketAssigner的两个方法:

//决定了每条数据应该去往哪个文件夹,没有这个文件夹会自动创建,最终数据写入路径是该方法返回值+给定的路径

//context可以获取一些时间

BucketID getBucketId(IN element, BucketAssigner.Context context);

//序列化/反序列化getBucketId返回值

SimpleVersionedSerializer getSerializer();

3.
withRollingPolicy:

①解释:
RollingPolicy 
定义了何时关闭给定的 In-progress Part 文件,并将其转换为 Pending 状态,然后在转换为 Finished 状态。 Finished 状态的文件,可供查看并且可以保证数据的有效性,在出现故障时不会恢复。 在 
STREAMING 
模式下,滚动策略结合 Checkpoint 间隔(到下一个 Checkpoint 成功时,文件的 Pending 状态才转换为 Finished 状态)共同控制 Part 文件对下游 readers 是否可见以及这些文件的大小和数量。在 
BATCH 
模式下,Part 文件在 Job 最后对下游才变得可见,滚动策略只控制最大的 Part 文件大小。
其中列模式(
forBulkFormat
)只能使用
CheckpointRollingPolicy

②参数

RollingPolicy(所有RollingPolicy的父类)

public interface RollingPolicy extends Serializable {

    //Determines if the in-progress part file for a bucket should roll on every checkpoint.(if true in-progress move to pending)

    //文件从pending状态到finished状态与此毫不相干

    boolean shouldRollOnCheckpoint(final PartFileInfo partFileState) throws IOException;

    //Determines if the in-progress part file for a bucket should roll based on its current state, e.g. its size.(if true in-progress move to pending)

    boolean shouldRollOnEvent(final PartFileInfo partFileState, IN element)

            throws IOException;

    // Determines if the in-progress part file for a bucket should roll based on a time condition.(if true in-progress move to pending)

    boolean shouldRollOnProcessingTime(

            final PartFileInfo partFileState, final long currentTime) throws IOException;

}

CheckpointRollingPolicy

列模式虽然只能使用抽象类CheckpointRollingPolicy(它是RollingPolicy的一个实现,重写了
shouldRollOnCheckpoint返回为true
),
CheckpointRollingPolicy只有一个子类
OnCheckpointRollingPolicy(此类中
shouldRollOnEvent、
shouldRollOnProcessingTime返回为false
);如果在列模式下,不想按照checkpoint进行滚动文件,那么可以试试继承
CheckpointRollingPolicy后
重写了
shouldRollOnCheckpoint返回为false;

DefaultRollingPolicy,采用builder架构

DefaultRollingPolicy.builder()

          //设置一个文件最大开启时间,超过时间则滚动

          .withRolloverInterval(Duration.ofMinutes(15))

          //设置一个文件无数据写入的时间,超过时间则滚动

          .withInactivityInterval(Duration.ofMinutes(5))

          //设置一个文件最大的容量,超过容量则滚动

          .withMaxPartSize(MemorySize.ofMebiBytes(1024))

          .build()

4.
withOutputFileConfig:

①解释:为生成的文件添加前缀后缀;
行模式和列模式是通用的;

.withOutputFileConfig(OutputFileConfig

      .builder()

      .withPartPrefix(“gouba-“)

      .withPartSuffix(“.gz”)

      .build())

5.
enableCompact:

①解释:合并生成的finished文件;
行模式和列模式是通用的;

②参数:FileCompactStrategy,FileCompactor

FileCompactStrategy

FileCompactStrategy.Builder.newBuilder()

                  //相隔几个checkpoint做一次合并,默认1

          .enableCompactionOnCheckpoint(3)

         //合并文件使用多少个线程,默认1

          .setNumCompactThreads(4)

          .build()

FileCompactor

※ IdenticalFileCompactor:直接复制一个文件的内容,到另一个文件,一次只能复制一个文件。

※ ConcatFileCompactor:可以自定义两个文件直接的分割符,由构造方法传入。

※ RecordWiseFileCompactor:自定义内容比较多

6.
withBucketCheckInterval(mills)

解释:根据RollingPolicy检查是否应该关闭in-progress文件,以系统时间0秒时刻开始算的。

本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://net2asp.com/59296f3108.html