Flink|《Flink 官方文档 – Operations – 指标》学习笔记

学习文档:《Flink 官方文档 – Operations – 指标》

学习笔记如下:


Flink 提供了一个指标系统(metric system),以支持将 Flink 运行指标收集并展示到外部系统。

注册指标

你可以在任何富函数中,通过调用 getRuntimeContext().getMetricGroup() 来访问指标系统,这个方法返回用于创建和登记新指标的 MetricGroup 对象。

计数器(Counter)

计数器用于统计某种数量。

  • 登记方法:在 MetricGroup 上调用 counter(String name)
  • 修改方法:调用 inc()、inc(long n) 或 dec()、dec(long n) 来增加或减少计数器的值。

示例:使用计数器

public class MyMapper extends RichMapFunction {
private transient Counter counter;

@Override
public void open(Configuration config) {
 this.counter = getRuntimeContext()
   .getMetricGroup()
   .counter("myCounter");
}

@Override
public String map(String value) throws Exception {
 this.counter.inc();
 return value;
}
}

示例:使用自定义的计数器实现类

public class MyMapper extends RichMapFunction {
private transient Counter counter;

@Override
public void open(Configuration config) {
 this.counter = getRuntimeContext()
   .getMetricGroup()
   .counter("myCustomCounter", new CustomCounter());
}

@Override
public String map(String value) throws Exception {
 this.counter.inc();
 return value;
}
}
计量器(Gauge)

计量器提供一个任意类型的值。要使用计量器,必须构造一个实现了 org.apache.flink.metrics.Gauge 接口的类。计量器不限制返回值的类型。

  • 登记方法:在 MetricGroup 上调用 gauge(String name, Gauge gauge)

示例:使用计量器实现计数

public class MyMapper extends RichMapFunction {
private transient int valueToExpose = 0;

@Override
public void open(Configuration config) {
 getRuntimeContext()
   .getMetricGroup()
   .gauge("MyGauge", new Gauge() {
     @Override
     public Integer getValue() {
       return valueToExpose;
     }
   });
}

@Override
public String map(String value) throws Exception {
 valueToExpose++;
 return value;
}
}

需要注意的是,计量器的返回值最终将被转化为字符串,因此返回值类型需要有意义的 toString() 方法。

直方图(Histogram)

直方图可以用量测量一个 long 类型的分布情况。

  • 登记方法:在 MetricGroup 上调用 histogram(String name, Histogram histogram)

示例:使用直方图

public class MyMapper extends RichMapFunction {
private transient Histogram histogram;

@Override
public void open(Configuration config) {
 this.histogram = getRuntimeContext()
   .getMetricGroup()
   .histogram("myHistogram", new MyHistogram());
}

@Override
public Long map(Long value) throws Exception {
 this.histogram.update(value);
 return value;
}
}

Flink 并没有提供 Histogram 的默认实现,但提供了使用 Codahale / Dropwizard 直方图的 wrapper,使用这个 wrapper 只需要在 pom.xml 中添加如下依赖:

      org.apache.flink
      flink-metrics-dropwizard
      1.18.1

示例:使用 Codahale / Dropwizard 直方图

public class MyMapper extends RichMapFunction {
private transient Histogram histogram;

@Override
public void open(Configuration config) {
 com.codahale.metrics.Histogram dropwizardHistogram =
   new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));

 this.histogram = getRuntimeContext()
   .getMetricGroup()
   .histogram("myHistogram", new DropwizardHistogramWrapper(dropwizardHistogram));
}

@Override
public Long map(Long value) throws Exception {
 this.histogram.update(value);
 return value;
}
}
测量器(meter)

测量器用于测量平均吞吐量。每个发生的事件通过调用 markEvent() 方法记录,同时发生的多个事件可以调用 markEvent(long n) 方法记录。

  • 登记方法:在 MetricGroup 上调用 meter(String name, Meter meter)

示例:使用测量器

public class MyMapper extends RichMapFunction {
private transient Meter meter;

@Override
public void open(Configuration config) {
 this.meter = getRuntimeContext()
   .getMetricGroup()
   .meter("myMeter", new MyMeter());
}

@Override
public Long map(Long value) throws Exception {
 this.meter.markEvent();
 return value;
}
}

Flink 提供了使用 Codahale / Dropwizard 测量器的 wrapper,使用这个 wrapper 只需要在 pom.xml 中添加如下依赖:

      org.apache.flink
      flink-metrics-dropwizard
      1.18.1

示例:使用 Codehale / Dropwizard 测量器

public class MyMapper extends RichMapFunction {
private transient Meter meter;

@Override
public void open(Configuration config) {
 com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter();

 this.meter = getRuntimeContext()
   .getMetricGroup()
   .meter("myMeter", new DropwizardMeterWrapper(dropwizardMeter));
}

@Override
public Long map(Long value) throws Exception {
 this.meter.markEvent();
 return value;
}
}

scope

每个指标都被分配了一个标识符和一组键值对。每个标识符都基于 3 个组成部分,用户定义名称(user-defined scope)、可选的用户定义 scope(user-defined scope)以及系统提供 scope(system-provided scope)。例如,如果系统提供 scope 为 A.B,用户定义 scope 为 C.D,名称为 E 时,那么指标的标识符将为 A.B.C.D.E。

可以通过 metrics.scope.delimiter 参数来设置标识符中的分隔符,默认为 .。

User Scope

可以通过调用 MetricGroup 的 addGroup(String name)、addGroup(int name) 或 addGroup(String key, String value) 来定义 User scope,这些方法将影响 MetricGroup 的 getMetricIdentifier 和 getScopeComponents 方法的返回值。

示例:使用 addGroup 设置 User Scope

counter = getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetrics")
.counter("myCounter");

counter = getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetricsKey", "MyMetricsValue")
.counter("myCounter");
System scope

在 System scope 中包含了指标的上下文信息,例如是哪个 task 登记了该指标,这个 task 属于哪个 job。

可以再 conf/flink-conf.yaml 中可以配置需要保留哪些上下文信息,具体地:

  • metrics.scope.jm:默认值为 .jobmanager
  • metrics.scope.jm-job:默认值为 .jobmanager.
  • metrics.scope.tm:默认值为 .taskmanager.
  • metrics.scope.tm-job:默认值为 .taskmanager..
  • metrics.scope.task:默认值为 .taskmanager….
  • metrics.scope.operator:默认值为 .taskmanager….

以上格式字符串中,变量数量或顺序没有限制。变量区分大小写。

需要注意的是,对于此格式字符串,如果同一作业同时运行多次,可能会发生标识符冲突,从而导致指标数据不一致。因此,建议使用 job_id 或为作业和运算符指定唯一名称,来提供更具有唯一性的格式字符串。

可供选择的变量如下:

  • JobManager:
  • TaskManager:、
  • Job:、
  • Task:、、、、
  • Operator:、、

对于批处理作业, 永远等于

User Variable

可以通过调用 MetricGroup 的 addGroup(String key, String value) 来定义 user variable。这个方法将会影响 MetricGroup 的 getMetricIdentifier()、getScopeComponents() 和 getAllVariables() 的返回值。

示例:定义 User Variable

counter = getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetricsKey", "MyMetricsValue")
.counter("myCounter");

指标报告

详见 Flink|《Flink 官方文档 – 部署 – 指标报告》学习笔记

系统指标

Flink 提供了一些可以对当前状态进行深层洞察的指标。具体地,包括如下指标:

  • CPU

    • Status.JVM.CPU.Load:当前 JVM 得 CPU 使用量
    • Status.JVM.CPU.Time:JVM 已经使用的 CPU 时间
  • 内存(Memory)

    • Status.JVM.Memory.Heap.Used:当前堆内存使用量
    • Status.JVM.Memory.Heap.Committed:当前 JVM 可获得的堆内存数量
    • Status.JVM.Memory.Heap.Max:当前内存管理中的堆内存最大值
    • Status.JVM.Memory.NonHeap.Used:当前堆外内存使用量
    • Status.JVM.Memory.NonHeap.Committed:当前 JVM 可获得的堆外内存数量
    • Status.JVM.Memory.NonHeap.Max:当前内存管理中的堆外内存最大值
    • Status.JVM.Memory.Metaspace.Used:当前 Metaspace 内存池的内存使用量
    • Status.JVM.Memory.Metaspace.Committed:当前 Metaspace 内存池可获得的内存数量
    • Status.JVM.Memory.Metaspace.Max:当前内存管理中 Metaspace 内存池的内存最大值
    • Status.JVM.Memory.Direct.Count:当前 direct buffer pool 的缓冲区数量
    • Status.JVM.Memory.Direct.MemoryUsed:当前 direct buffer pool 的内存使用量
    • Status.JVM.Memory.Direct.TotalCapacity:当前 direct buffer pool 的容量
    • Status.JVM.Memory.Mapped.Count:当前 mapped buffer pool 的缓冲区数量
    • Status.JVM.Memory.Mapped.MemoryUsed:当前 mapped buffer pool 的内存使用量
    • Status.JVM.Memory.Mapped.TotalCapacity:当前 mapped buffer pool 的容量
    • Status.Flink.Memory.Managed.Used:当前代理内存使用量
    • Status.Flink.Memory.Managed.Total:代理内存总量
  • 线程(Threads)

    • Status.JVM.Threads.Count:当前存活线程数
  • JVM 垃圾回收(GarbageCollection)

    • Status.JVM.GarbageCollector..Count:JVM 垃圾回收的执行次数
    • Status.JVM.GarbageCollector..Time:JVM 垃圾回收的总执行时间
  • JVM 类加载(ClassLoader)

    • Status.JVM.ClassLoader.ClassesLoaded:自 JVM 启动开始,加载的类数量
    • Status.JVM.ClassLoader.ClassesUnloaded:自 JVM 启动开始,卸载的类数量
  • 网络(Network)

    • Status.Network.AvailableMemorySegments:暂未使用的内存碎片数量
    • Status.Network.TotalMemorySegments:分配的内存碎片总数
    • buffers.inputQueueLength:输入缓冲区队列数
    • buffers.outputQueueLength:输出缓冲区队列数
    • buffers.inPoolUsage:输入缓冲区使用量的估计值
    • buffers.inputFloatingBuffersUsage:floating input buffer 使用量的估计值
    • buffers.inputExclusiveBuffersUsage:exclusive input buffer 使用量的估计值
    • Network…totalQueueLen:所有输入、输出队列的缓冲区总数
    • Network…minQueueLen:所有输入、输出缓冲区的最小数量
    • Network…maxQueueLen:所有输入、输出缓冲区的最大数量
    • Network…maxQueueLen:所有输入、输出缓冲区的平均数量
  • Default shuffle service

    • Status.Shuffle.Netty.AvailableMemorySegments:未使用的内存分片数量
    • Status.Shuffle.Netty.UsedMemorySegments:已使用的内存分片数量
    • Status.Shuffle.Netty.TotalMemorySegments:分配的内存分片总数
    • Status.Shuffle.Netty.AvailableMemory:未使用的内存字节数
    • Status.Shuffle.Netty.UsedMemory:已使用的内存字节数
    • Status.Shuffle.Netty.TotalMemory:分配的总内存字节数
    • Status.Shuffle.Netty.RequestedMemoryUsage:网络内存使用数量(实验性)
    • Shuffle.Netty.Input.Buffers.inputQueueLength:输入缓冲队列的数量
    • Shuffle.Netty.Input.Buffers.inputQueueSize:输入缓冲队列的字节数
    • Shuffle.Netty.Input.Buffers.inPoolUsage:输入缓冲使用量的估计值(忽略 LocalInputChannels)
    • Shuffle.Netty.Input.Buffers.inputFloatingBuffersUsage:floating input buffer 的使用量的估计值(忽略 LocalInputChannels)
    • Shuffle.Netty.Input.Buffers.inputExclusiveBuffersUsage:exclusive input buffer 使用量的估计值(忽略 LocalInputChannels)
    • Shuffle.Netty.Output.Buffers.outputQueueLength:输出缓冲队列的数量
    • Shuffle.Netty.Output.Buffers.outputQueueSize:输出缓冲队列的字节数
    • Shuffle.Netty.Output.Buffers.outPoolUage:输出缓冲使用量的估计值
    • Shuffle.Netty…totalQueueLen:输入、输出缓冲队列的总数
    • Shuffle.Netty…minQueueLen:输入、输出缓冲队列的最小值
    • Shuffle.Netty…maxQueueLen:输入、输出缓冲队列的最大值
    • Shuffle.Netty…avgQueueLen:输入、输出缓冲队列的平均值
    • Shuffle.Netty.Input.numBytesInLocal:task 从 local source读取的字节数
    • Shuffle.Netty.Input.numBytesInLocalPerSecond:task 每秒从 local source 读取的字节数
    • Shuffle.Netty.Input.numBytesInRemote:task 从 remote source 读取的字节数
    • Shuffle.Netty.Input.numBytesInRemotePerSecond:task 每秒从 remote source 读取的字节数
    • Shuffle.Netty.Input.numBuffersInLocal:task 从 local source 读取的缓冲数量
    • Shuffle.Netty.Input.numBuffersLocalPerSecond:task 每秒从 local source 读取的缓冲数量
    • Shuffle.Netty.Input.numBuffersInRemote:task 从 remote source 读取的缓冲数量
    • Shuffle.Netty.Input.numBuffersInRemotePerSecond:task 每秒从 remote source 读取的缓冲数量
  • 集群(Cluster)

    • numRegisteredTaskManagers:登记的 TaskManager 数量
    • numPendingTaskManagers:未完成的 TaskManager 数量
    • numRunningJobs:正在运行的作业数
    • taskSlotsAvailable:可用的 task slots 数量
    • tasksSlotsTotal:task slots 总数
  • 可用于每种作业状态(包括 INITIALIZING、CREATED、RUNNING、RESTARTING、CANCELLING、FAILING)的指标

    • State:输入状态,如果作业属于这种状态则返回 1,否则返回 0
    • Time:输入状态,如果作业属于这种状态则返回作业在这种状态的持续时间(单位:毫秒),否则返回 0
    • TimeTotal:输入状态,返回作业在这种状态的持续时间(单位:毫秒)
    • deployingState:如果作业属于 DEPLOYING 状态则返回 1,否则返回 0【实验性】
    • deployingTime:如果作业属于 DEPLOYING 状态则返回作业在这种状态的持续时间(单位:毫秒),否则返回 0【实验性】
    • deployingTimeTotal:返回作业在 DEPLOYING 状态的持续时间(单位:毫秒)【实验性】
    • uptime:作业未被中断的运行时间,如果作业已经完成则返回 -1(单位:毫秒)
    • downtime:如果作业正在运行返回 0,如果作业已完成返回 -1,如果作业当前处于 FAILING / RECOVERING 状态返回在这个状态的持续时间(单位:毫秒)
    • fullRestarts【建议使用 numRestarts】
    • numRestarts:作业自提交以来的重启次数
  • Checkpointing

    • lastCheckpointDuration:上一个 checkpoint 的完成时间(单位:毫秒)
    • lastCheckpointSize:上一个 checkpoint 的字节数
    • lastCompletedCheckpointId:上一个 checkpoint 的标识名
    • lastCheckpointFullSize:上一个 checkpoint 的完整字节数
    • lastCheckpointExternalPath:上一个 checkpoint 的外部存储路径
    • lastCheckpointRestoreTimestamp:上一个 checkpoint 的时间戳
    • numberOfInProgressCheckpoints:当前正在进行的 checkpoint 数量
    • numberOfCompletedCheckpoints:当前已经完成的 checkpoint 数量
    • numberOfFailedCheckpoints:当前已经失败的 checkpoint 数量
    • totalNumberOfCheckpoints:正在进行、已经完成和已经失败的 checkpoint 总数
    • checkpointAlignmentTime:上一个 checkpoint 对齐所用的时间,即收到第一个 checkpoint 确认和最后一个 checkpoint 确认之间的时间(单位:纳秒)
    • checkpointStartDelayNanos:从上一个 checkpoint 开始到当前 task 开始进行 checkpoint 之间的延时时间
  • 状态访问延迟(State Access Latency)

    • stateClearLatency:清理算子状态的延迟
    • valueStateGetLatency:在算子 value state 上执行 get() 的延迟
    • valueStateUpdateLatency:在算子 value state 上执行 update() 延迟
    • listStateGetLatency:在算子 list state 上执行 get() 的延迟
    • listStateAddLatency:在算子 list state 上执行 add() 的延迟
    • listStateAddAllLatency:在算子 list state 上执行 addAll() 的延迟
    • listStateUpdateLatency:在算子 list state 上执行 update() 的延迟
    • listStateMergeNamespacesLatency:合并算子 list state 的延迟
    • mapStateGetLatency:在算子 map state 上执行 get() 的延迟
    • mapStatePutLatency:在算子 map state 上执行 put() 的延迟
    • mapStatePutAllLatency:在算子 map state 上执行 putAll() 的延迟
    • mapStateRemoveLatency:在算子 map state 上执行 remove() 的延迟
    • mapStateContainsLatency:在算子 map state 上执行 contains() 的延迟
    • mapStateEntriesInitLatency:在算子 map state 上初始化遍历键值对迭代器的延迟
    • mapStateKeysInitLatency:在算子 map state 上初始化遍历键迭代器的延迟
    • mapStateValuesInitLatency:在算子 map state 上初始化遍历值迭代器的延迟
    • mapStateIteratorInitLatency:在算子 map state 上构造迭代器的延迟
    • mapStateIsEmptyLatency:在算子 map state 上执行 isEmpty() 的延迟
    • mapStateIteratorHasNextLatency:在算子 map state 的迭代器上执行 hasNext() 的延迟
    • mapStateIteratorNextLatency:在算子 map state 的迭代器上执行 next() 的延迟
    • mapStateIteratorRemoveLatency:在算子 map state 的迭代器上执行 moreve() 的延迟
    • aggregatingStateGetLatency:在算子 aggregating state 上执行 get() 的延迟
    • aggregatingStateAddLatency:在算子 aggregating state 上执行 add() 的延迟
    • aggregatingStateMergeNamespacesLatency:合并算子 aggregating state 的延迟
    • reducingStateGetLatency:在算子 reducing state 上执行 get() 的延迟
    • reducingStateAddLatency:在算子 reducing state 上执行 add() 的延迟
    • reducingStateMergeNamespacesLatency:合并算子 reducing state 的延迟
  • RocksDB:RocksDB 的指标不在默认值中

  • State Changelog

    • numberOfUploadRequests:上传请求次数
    • numberOfUploadFailures:上传请求失败次数
    • attemptsPerUpload:每次上传的尝试次数的直方图
    • totalAttemptsPerUpload:所有上传的尝试次数的直方图
    • uploadBatchSizes:上传 tasks 的数量
    • uploadLatenciesNanos:上传延迟(单位纳秒)
    • uploadSizes:上传大小
    • uploadQueueSize:当前上传队列的大小
    • startedMaterialization:已开始的 materializations 的数量
    • completedMaterialization:已成功完成的 materializations 数量
    • failedMaterialization:失败的 materializations 数量
    • lastDurationOfMaterialization:上一次 materializations 的持续时间(单位:毫秒)
    • lastFullSizeOfMaterialization:上一次 checkpoint 中 full size of the materialization 部分的大小(单位:字节)
    • lastIncSizeOfMaterialization:上一次 checkpoint 中 incremental size of the materialization 部分的大小(单位:字节)
    • lastFullSizeOfNonMaterialization:上一次 checkpoint 中 full size of the non-materialization 部分的大小(单位:字节)
    • lastIncSizeOfNonMaterialization:上一次 checkpoint 中 incremental size of the non-materialization 部分的大小(单位:字节)
  • IO

    • [.[.]]..latency:在两个给定 subtask 之间的延迟
    • numBytesInLocal【建议使用 Default Shuffle service metrics】
    • numBytesInLocalPerSecond【建议使用 Default Shuffle service metrics】
    • numBytesInRemote【建议使用 Default Shuffle service metrics】
    • numBytesInRemotePerSecond【建议使用 Default Shuffle service metrics】
    • numBuffersInLocal【建议使用 Default Shuffle service metrics】
    • numBuffersInLocalPerSecond【建议使用 Default Shuffle service metrics】
    • numBuffersInRemote【建议使用 Default Shuffle service metrics】
    • numBuffersInRemotePerSecond【建议使用 Default Shuffle service metrics】
    • numBytesOut:task 发送的字节总数
    • numBytesOutPerSecond:task 每秒发送的字节数
    • numBuffersOut:task 发送的网络缓冲区数
    • numBuffersOutPerSecond:task 每秒发送的缓冲区数
    • isBackPressured:task 是否被背压
    • idleTimeMsPerSecond:task 每秒的空闲毫秒数
    • busyTimeMsPerSecond:task 每秒的繁忙毫秒数
    • backPressuredTimeMsPerSecond:task 每秒的背压毫秒数
    • softBackPressuredTimeMsPerSecond:task 每秒的软背压(softly back pressured)毫秒数;在软背压时 task 仍能正常处理记录
    • hardBackPressuredTimeMsPerSecond:task 每秒的硬背压(back pressured in a hard way)毫秒数;在硬背压时 task 完全阻塞且无法处理记录
    • maxSoftBackPressuredTimeMs:在最近一段样本周期内,最长的软背压持续时间
    • maxHardBackPressuredTimeMs:在最近一段样本周期内,最长的硬背压持续时间
    • changelogBusyTimeMsPerSecond:每秒内 changelog 状态后端执行 IO 操作的毫秒数
    • mailboxMailsPerSecond:每秒内 actions 处理它的 mailbox 的时间
    • mailboxLatencyMs:task 在处理 mailbox 前的等待时间
    • mailboxQueueSize:task 的 mailbox 当前等待处理的 actions 数量
    • initializationTime:如果 task 处于初始化状态返回 task 的初始化时间,否则返回 0
    • estimatedTimeToConsumeBuffersMs:The estimated time (in milliseconds) by the buffer debloater to consume all of the buffered data in the network exchange preceding this task.
    • debloatedBufferSize:The desired buffer size (in bytes) calculated by the buffer debloater.
    • numRecordsIn:operator / task 接收的记录总数
    • numRecordsInPerSecond:operator / task 每秒接收的记录数
    • numRecordsOut:operator / task 发送的记录总数
    • numRecordsOutPerSecond:operator / task 每秒发送的记录数
    • numLateRecordsDropped:operator / task 因延迟丢弃的记录数
    • currentInputWatermark:operator / task 收到的上一个 watermark
    • currentInputNWatermark:operator 的第 N 个输入流收到的上一个 watermark
    • currentOutputWatermark:operator 上一个输出的 watermark
    • watermarkAlignmentDrift:当前属于同一 watermark group 所发出的最小 watermark
    • numSplitsProcessed:当前 source 已处理的的 InputSplits 数量
  • Connectors

    • Kafka Connectors

      • commitsSucceeded:提交成功的 offset 数量
      • commitsFailed:提交失败的 offset 数量
      • committedOffsets:上一次提交成功的 Kafka 偏移量
      • currentOffsets:当前消费者正在读取的偏移量
    • Kinesis 源

      • millisBehindLatest:消费者落后于流头部的毫秒数
      • sleepTimeMillis:消费者在从 Kinesis 获取记录之前睡眠的毫秒数
      • maxNumberOfRecordsPerFetch:消费者在对 Kinesis 的单个 getRecords 调用中请求的最大记录数。
      • numberOfAggregatedRecordsPerFetch:消费者在对 Kinesis 的单个 getRecords 调用中获取的聚合的 Kinesis 记录数
      • numberOfDeggregatedRecordsPerFetch:消费者在对 Kinesis 的单个 getRecords 调用中获取的非聚合的 Kinesis 记录数
      • averageRecordSizeBytes:Kinesis 记录的平均大小(单位:字节)
      • runLoopTimeNanos:消费者在运行循环中花费的实际时间(单位:纳秒)
      • loopFrequencyHz:一秒钟内调用 getRecords 的次数
      • bytesRequestedPerFetch:在对 getRecords 的单个调用中请求的字节数
    • Kinesis 接收器

      • numRecordsOutErrors :【建议使用 numRecordsSendErrors】
      • numRecordsSendErrors:被拒绝的记录写入数
      • CurrentSendTime:最后一批请求的一次往返所用的毫秒数
    • HBase Connectors

      • lookupCacheHitRate:查找的缓存命中率
  • 系统资源(Stream resources):系统资源的报告指标默认不启动,需将 metrics.system-resource 配置打开方可访问。

    • System CPU

      • System.CPU.Usage:当前系统 CPU 使用率
      • System.CPU.Idle:当前系统 CPU 空闲率
      • System.CPU.Sys:当前系统 CPU 系统态使用率
      • System.CPU.User:当前系统 CPU 用户态使用率
      • System.CPU.IOWait:当前系统 CPU 的 iowait 率
      • System.CPU.Irq:当前系统 CPU 的 Irq 率
      • System.CPU.SoftIrq:当前系统 CPU 的 SoftIrq 率
      • System.CPU.Nice:当前系统 CPU 的 Nice 率
      • System.CPU.Load1min:当前系统 CPU 最近 1 分钟的平均负载
      • System.CPU.Load5min:当前系统 CPU 最近 1 分钟的平均负载
      • System.CPU.Load15min:当前系统 CPU 最近 1 分钟的平均负载
      • System.CPU.UsageCPU:当前系统 CPU 中每个进程的使用率
    • System Memory

      • System.Memory.Available:当前系统可用内存字节数
      • System.Memory.Total:当前系统内存总数
      • System.Swap.Used:当前系统已用 Swap 内存字节数
      • Systemn.Swap.Total:当前系统 Swap 内存总数
    • System Network

      • System.Network.INTERFACE_NAME.ReceivceRate:当前系统每秒平均接受率
      • System.Network.INTERFACE_NAME.SendRate:当前系统每秒平均发送率
  • 预测执行

    • numSlowExecutionVertices:当前的慢执行节点数量
    • numEffectiveSpeculativeExecutions:有效的预测执行数量

端到端延迟追踪

Flink 允许追踪在系统中流动时的延迟。这个特性默认是关闭的,若需要打开则需要在 Flink 设置中将 latencyTrackingInterval 设置为正数。

当启动 company_change_info_reg_capital_format 后,source 会周期性地发送称为延迟标记(Latency Marker)的特殊记录。在延迟标记中,包含该标记从 source 被发出的时间戳。延迟标记不会超过普通的记录,因此如果记录在进入算子前排队,那么将会被延迟追踪记录下来。

需要注意的是,延迟标记并不考虑记录在 operator上花费的时间,因为它绕过了这些时间。此外,延迟标记也不考虑记录在窗口缓冲区上花费的时间。只有当 operator 无法接收新记录,导致输入流被迫排队时,这个延迟才会被延迟标记反映出来。

延迟标记用于推导每个上游算子到每个下游算子之间的延迟分布。这些分布将以直方图的形式报告。这些分布的粒度可以配置。对于最高粒度的 subtask,Flink 将会推导每个上游 subtask 到每个下游 subtask 之间的延迟分布,并生成二次直方图。

当前,Flink 假设加群中每个机器的时钟时间是同步的。我们推荐设置一个自动时钟同步服务以避免出现错误的延迟统计结果。

警告:开启延迟追踪将会显著地影响集群运行效率,特别是开启了 subtask 粒度。推荐只在 debug 时使用这个功能。

状态访问时间追踪

Flink 提供了 Flink 标准状态后端或继承自 AbstractStateBackend 的状态后端的状态访问的延迟追踪功能。这个功能默认是关闭的,如需要打开则需要在 Flink设置中将 state.backend.latency-track.keyed-state-enabled 设置为 true。

当监控状态访问延迟追踪被打开后,Flink 将会在每 state.backend.latency-track.sample-interval 次(默认 100)访问后抽取 1 次;这个值配置得越小,追踪结果将更准确但也会因抽样更频繁而对性能影响更大。此外,state.backend.latency-track.history-size 参数会控制保存的最大样本数,默认值为 128;这个值设置得越高,追踪结果将更准确,但也将占用更多的内存。

警告:开启状态访问时间追踪将会显著地影响集群运行效率,特别是开启了 subtask 粒度。推荐只在 debug 时使用这个功能。

与 TEST API 相结合

指标可以通过 Monitoring REST API 访问。以下路径均在 http://hostname:8081/jobmanager/metrics 路径下访问。

请求特定实体的测量指标:

  • /jobmanager/metrics
  • /taskmanagers//metrics
  • /jobs//metrics
  • /jobs//vertices//subtasks/

请求将所有实体间聚合后的测量指标:

  • /taskmanagers/metrics
  • /jobs/metrics
  • /jobs//vertices//subtasks/metrics

请求将部分实体聚合后的测量指标:

  • /taskmanagers/metrics?taskmanagers=A,B,C
  • /jobs/metrics?jobs=D,E,F
  • /jobs//vertices//subtasks/metrics?subtask=1,2,3

需要注意将路径中包含的如下字符使用转义:

Character Escape Sequence
# %23
$ %24
& %26
+ %2B
/ %2F
; %3B
= %3D
? %3F
@ %40

请求多个测量指标:

GET /jobmanager/metrics
[
  {
    "id": "metric1"
  },
  {
    "id": "metric2"
  }
]

请求特定测量指标(不聚合):

GET taskmanagers/ABCDE/metrics?get=metric1,metric2
[
  {
    "id": "metric1",
    "value": "34"
  },
  {
    "id": "metric2",
    "value": "2"
  }
]

请求特定特定指标聚合后的结果值:

GET /taskmanagers/metrics?get=metric1,metric2
[
  {
    "id": "metric1",
    "min": 1,
    "max": 34,
    "avg": 15,
    "sum": 45
  },
  {
    "id": "metric2",
    "min": 2,
    "max": 14,
    "avg": 7,
    "sum": 16
  }
]

请求特定指标的特定聚合方法的值:

GET /taskmanagers/metrics?get=metric1,metric2&agg=min,max
[
  {
    "id": "metric1",
    "min": 1,
    "max": 34
  },
  {
    "id": "metric2",
    "min": 2,
    "max": 14
  }
]

与 Dashboard 相结合

各作业和 operator 的测量指标也可以通过 Dashboard 可视化呈现。在作业的主页,选择 Metrics 选择卡,在选择一个 task后可以通过 Add Metric 下拉菜单添加测量指标:

  • Task 指标形如 .
  • Operator 指标形如 ..

每个指标会被可视化到分离的图中,其中 x 轴代表时间,y 轴代表测量指标。所有的图每 10秒自动更新一次,并会在导航到其他页面时继续执行。

可视化的指标数量没有上限,但只有数值型指标可以被可视化。

本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://net2asp.com/9a835e27b7.html