flink提交流程源码
flink源码系列总述
本文基于flink-1.17版本,对于flink源码学习了解,仅作为个人学习笔记,如有错误,欢迎指正。
flink提交流程源码 流程解析
看以下流程时,请及时参考本图

- CliFrontend 客户端
- YarnJobClusterEntrypoint AM执行的入口
- YarnTaskExecutorRunner Yarn模式下TaskManager的入口类
1. CliFrontend 客户端
提交命令
通过flink on yarn per-job模式提交,查看flink脚本可以看到,程序被提交后,会寻找CliFrontend类
CliFrotend n main方法入口


其中: parseAndRun方法 
其中:parseAndRun的run中:
![[外链图片转存中...(img-XcrKfdwd-1705648822026)]](/img/e1/40a8d916302d4691b826bc1617cc5f23.png)
获取有效配置中:
![[外链图片转存中...(img-RXyd4Ols-1705648822027)]](/img/56/78ebe52fa89145cf9ca546597c08a076.png)
执行程序中:
![[外链图片转存中...(img-zB0E1XZ9-1705648822027)]](/img/4a/908f57bb208b40939445f7ec230ae5eb.png)
执行用户代码:
env.execute() ->StreamExecutionEnvironment.execute()
获取StreamGraph StreamExecutionEnvironment类

获取PipelineExecutor执行器 执行

生成jobGraph YarnJobClusterExecutor类

其中1:
创建yarnClient 生成YarnClusterDescriptor YarnClusterClientFactory类
![[外链图片转存中...(img-wQfYeuLu-1705648822029)]](/img/0b/c6752433ec8b4cd0ba6745958dffa0d2.png)
其中2:
启动am ApplicationMaster
![[外链图片转存中...(img-iqfHF7tv-1705648822029)]](/img/70/d199813b16da470780c1727c1685b8ec.png)
![[外链图片转存中...(img-t0YIgpEu-1705648822029)]](/img/06/390c4e0cab2f48a1aa6c9f4c48378aea.png)
2.在部署集群的时候,已经启动了ApplicationMaster
3.ApplicationMaster 执行过程
YarnJobClusterEntrypoint 启动入口 main方法
![[外链图片转存中...(img-VwNRfJwZ-1705648822030)]](/img/48/715f7ee140414a2d971bed6664f37538.png)
工厂类构建和对象创建

对象创建:
创建resourceManager
![[外链图片转存中...(img-Mc5iP1Qy-1705648822030)]](/img/5a/693ec1fa9b65451d9c347c16118675ef.png)
创建并启动dispatcher 启动resourceManager
![[外链图片转存中...(img-8QK5s2E7-1705648822030)]](/img/0f/560b68bdab26410187d113af0cb516eb.png)
其中:创建并启动dispatcher
![[外链图片转存中...(img-ycO6RjSl-1705648822031)]](/img/ee/afd3d30d50f8490e86dbe50aadfe14b6.png)
-
DefaultDispatcherRunnerFactory -> createDispatcherRunner
-
DefaultDispatcherRunner -> create
-
DefaultDispatcherRunner -> start
-
StandaloneLeaderElectionService -> start
-
DefaultDispatcherRunner -> grantLeadership
-
DefaultDispatcherRunner -> startNewDispatcherLeaderProcess
-
AbstractDispatcherLeaderProcess -> start
-
AbstractDispatcherLeaderProcess -> startInternal
-
JobDispatcherLeaderProcess -> onStart
-
DefaultDispatcherGatewayServiceFactory -> create
-
DefaultDispatcherGatewayServiceFactory
![[外链图片转存中...(img-wMqKpVun-1705648822031)]](/img/39/cc6dbe7ad3034db0a281804a4a9c24fb.png)
-
Dispatcher -> onStart
![[外链图片转存中...(img-9YjzheF8-1705648822031)]](/img/16/070d6a2529da486cbfeba7d94f113a84.png)
其中:dispatcher服务启动主要进行注册
其中:启动jobMaster
- Dispatcher -> startCleanupRetries
- Dispatcher -> runCleanupRetry
- Dispatcher -> runJob
- JobMasterServiceLeadershipRunner -> start
- StandaloneLeaderElectionService -> start
- JobMasterServiceLeadershipRunner -> grantLeadership
- JobMasterServiceLeadershipRunner -> startJobMasterServiceProcessAsync
- JobMasterServiceLeadershipRunner -> verifyJobSchedulingStatusAndCreateJobMasterServiceProcess
- JobMasterServiceLeadershipRunner -> createNewJobMasterServiceProcess
- DefaultJobMasterServiceProcessFactory -> create
- DefaultJobMasterServiceProcess -> DefaultJobMasterServiceProcess
- DefaultJobMasterServiceFactory -> createJobMasterService
- DefaultJobMasterServiceFactory -> internalCreateJobMasterService
- 启动
![[外链图片转存中...(img-BvoyM7U0-1705648822031)]](/img/8a/99a040c19f0a488c948830cbce28566e.png)
其中: 启动resourceManager
-
ResourceManagerServiceImpl -> grantLeadership
-
ResourceManagerServiceImpl -> startNewLeaderResourceManager
-
ResourceManagerServiceImpl -> startResourceManagerIfIsLeader
-
resourceManager -> start
![[外链图片转存中...(img-KGG3utJ9-1705648822032)]](/img/68/09ed6d18491643a39e9d0e08b86c70ec.png)
-
ResourceManager -> onStart
-
ResourceManager -> startResourceManagerServices
![[外链图片转存中...(img-ewVOPCOx-1705648822032)]](/img/31/60edb6ab0957490eabc88b6cdc5906a1.png)
initializate():
创建yarn的reasouceManager和nodeManager的client 并启动
![[外链图片转存中...(img-2rBTANHn-1705648822032)]](/img/e9/ff96eafdeb894c29a8fdfd7410749e60.png)
其中:启动jobMaster
JobMaster -> onStart
![[外链图片转存中...(img-UskQ4Xgr-1705648822032)]](/img/df/3af9cc9cd2d64e03b7b4375d379382f2.png)
![[外链图片转存中...(img-EcxNNU7m-1705648822032)]](/img/28/86737c0616b647249e95078a92075588.png)
真正启动jobMaster
![[外链图片转存中...(img-fTOnYDU4-1705648822033)]](/img/af/791a438f6b7a4d6397e243fe992d06f3.png)
建立连接,开始请求资源
-
StandaloneLeaderRetrievalService -> start
-
JobMaster -> notifyOfNewResourceManagerLeader
-
JobMaster -> reconnectToResourceManager
-
JobMaster -> tryConnectToResourceManager
-
JobMaster -> connectToResourceManager
![[外链图片转存中...(img-T0n1qcbR-1705648822033)]](/img/5a/1ffdbd456c5244459e4d1378f1106243.png)
-
RegisteredRpcConnection -> start
![[外链图片转存中...(img-NILsreuz-1705648822033)]](/img/69/b3485e77a73a4ba590efc357fe78b73a.png)
![[外链图片转存中...(img-ybOlhge3-1705648822033)]](/img/32/875c307994f3425982f7e998ce291e85.png)
![[外链图片转存中...(img-HqZMTtNE-1705648822033)]](/img/e0/c7cc8fb725bd4d4ca7e91d5fefa2a623.png)
注册成功调用 onRegistrationSuccess(), 向 ResourceManager 进行 slot 的申请
-
JobMaster -> onRegistrationSuccess
-
JobMaster -> establishResourceManagerConnection
![[外链图片转存中...(img-pZPmzMDo-1705648822034)]](/img/f5/bee05b30e14744bcb8779dae39feb6e0.png)
-
DeclarativeSlotPoolService -> connectToResourceManager
-
ResourceManager -> declareRequiredResources
![[外链图片转存中...(img-fGEJZ59q-1705648822034)]](/img/d7/446fc98b9f804e5e938eb50cf5712a45.png)
-
FineGrainedSlotManager -> processResourceRequirements
![[外链图片转存中...(img-XXgbFa1G-1705648822034)]](/img/66/7b23cfb63e5e491a84b535d83af51128.png)
其中:notifyResourceRequirements
- DefaultResourceTracker -> notifyResourceRequirements
- JobScopedResourceTracker -> notifyResourceRequirements
![[外链图片转存中...(img-JFP4mUVB-1705648822034)]](/img/35/22e7aa5629ea4096a4c97805da7b5207.png)
其中:checkResourceRequirementsWithDelay
- FineGrainedSlotManager -> checkResourceRequirementsWithDelay
- FineGrainedSlotManager -> checkResourceRequirements
![[外链图片转存中...(img-ABizKy9A-1705648822034)]](/img/2a/794b06f32e2a42618733c9693578d22c.png)
![[外链图片转存中...(img-H8xdnJBC-1705648822035)]](/img/0a/262c7b42ed7b43eab11400a868f6d0f2.png)
其中 tryFulfillRequirements
![[外链图片转存中...(img-FeN4Xt3E-1705648822035)]](/img/98/be5f2fe5e29a450b93699888ab46b408.png)
tryFulfilledRequirementWithResource
![[外链图片转存中...(img-7ZwYBv11-1705648822035)]](/img/da/f8cb5992fd54471eb017b6fec3d80b7f.png)
其中:分配资源allocateSlotsAccordingTo
![[外链图片转存中...(img-ycuQbtdR-1705648822035)]](/img/ea/f1afd3ed27aa42db991d190b4d55e8e5.png)
![[外链图片转存中...(img-9KKKIKIp-1705648822035)]](/img/bf/74735577c1b84d05a23dd3a2137e4683.png)
其中:declareNeededResourcesWithDelay
-
FineGrainedSlotManager -> declareNeededResourcesWithDelay
-
FineGrainedSlotManager -> declareNeededResources
-
ActiveResourceManager -> declareResourceNeeded
-
ActiveResourceManager -> declareResourceNeeded
-
ActiveResourceManager -> checkResourceDeclarations
-
ActiveResourceManager -> requestNewWorker
![[外链图片转存中...(img-TpRe15Az-1705648822036)]](/img/9d/6c13dd6177e8455fb9b7bd932d162f34.png)
-
YarnResourceManagerDriver -> requestResource
-
YarnResourceManagerDriver -> addContainerRequest
![[外链图片转存中...(img-pyZpjmDX-1705648822036)]](/img/c5/d3b1f413cca14695b3a4cae71bccfe9f.png)
请求容器 进行回调函数YarnResourceManagerDriver ->onContainersAllocated
2. YarnResourceManagerDriver ->onContainersOfPriorityAllocated
![[外链图片转存中...(img-tJLHvIcE-1705648822036)]](/img/20/8bf2879f48dc4944a99653608b942557.png)
-
YarnResourceManagerDriver -> startTaskExecutorInContainerAsync
-
YarnResourceManagerDriver -> createTaskExecutorLaunchContext
![[外链图片转存中...(img-8SWeWaO5-1705648822037)]](/img/5a/6e276b157ec94c1ea1afc80c5321caa5.png)
-
Utils.createTaskExecutorContext
![[外链图片转存中...(img-B1zrP3me-1705648822037)]](/img/a6/eb6484d49ff348bf91b077d9b9965fc6.png)
所以 能够 启动TaskExecutorRunner main方法
-
TaskExecutorRunner -> runTaskManagerProcessSecurely
-
TaskExecutorRunner -> runTaskManagerProcessSecurely
![[外链图片转存中...(img-b8mbyo3m-1705648822037)]](/img/09/8ebdec159ef843cbba614f43e9c52075.png)
-
TaskExecutorRunner -> runTaskManager
-
TaskExecutorToServiceAdapter -> start
![[外链图片转存中...(img-JKKolBO2-1705648822037)]](/img/c0/6b7a950701d74a54b8b2be69904dee00.png)
TaskExecutor onstart方法:
-
TaskExecutor -> onStart
![[外链图片转存中...(img-2NfQ5kFR-1705648822038)]](/img/e3/49ca3cdd05664437a39fb1be5493fc22.png)
-
TaskExecutor -> startTaskExecutorServices
![[外链图片转存中...(img-S6cZB0qx-1705648822038)]](/img/08/e3dea755a8df458c88a6d3ce9f0732d1.png)
其中resourceManagerLeaderRetriever.start
- ResourceManagerLeaderListener -> notifyLeaderAddress
- TaskExecutor -> notifyOfNewResourceManagerLeader
- TaskExecutor -> reconnectToResourceManager
- TaskExecutor -> tryConnectToResourceManager
- TaskExecutor -> connectToResourceManager
![[外链图片转存中...(img-kgysiIBl-1705648822038)]](/img/d1/bfe430eaff234898bfbdeaa3f055b5f2.png)
![[外链图片转存中...(img-cjtKJsfV-1705648822039)]](/img/28/a642e14b9d5548e58084c5dd366595ad.png)
其中:newRegistration.startRegistration(); // todo invokeRegistration
-
RetryingRegistration -> startRegistration
-
RetryingRegistration -> register
-
RetryingRegistration -> invokeOperation
-
TaskExecutorToResourceManagerConnection -> invokeRegistration
![[外链图片转存中...(img-gH3eCk1N-1705648822039)]](/img/e4/d99396b8e1494a71ba0531cc14eac5ef.png)
-
registerTaskExecutor registerTaskExecutorInternal
注册成功调用 onRegistrationSuccess(), 向 ResourceManager 进行 slot 的申请
-
TaskExecutorToResourceManagerConnection -> onRegistrationSuccess
-
ResourceManagerRegistrationListener -> onRegistrationSuccess
-
ResourceManagerRegistrationListener -> establishResourceManagerConnection
![[外链图片转存中...(img-WAssamTi-1705648822039)]](/img/aa/931907ad22604fca85004aca0cad57b6.png)
-
ResourceManager -> sendSlotReport
![[外链图片转存中...(img-7xolryvZ-1705648822039)]](/img/a7/a4f6184cb5454eed9de9863b679a3f15.png)
-
FineGrainedSlotManager -> registerTaskManager 进行allocateSlot
![[外链图片转存中...(img-OcU34w3z-1705648822040)]](/img/de/f0bd49b9b77240a49418b41e9d786d12.png)
![[外链图片转存中...(img-P3YbfxiK-1705648822040)]](/img/5e/bfa396d5c7344fc29c35a4d4b93b2b31.png)
-
DefaultSlotStatusSyncer -> allocateSlot
-
TaskExecutor(!!!) -> requestSlot
![[外链图片转存中...(img-FSCBjZMw-1705648822040)]](/img/d5/8736cb30df394225b7a9aeffe02d3125.png)
-
TaskExecutor -> requestSlot
其中 allocateSlotForJob:
- TaskExecutor -> allocateSlotForJob
- TaskExecutor -> allocateSlot
- TaskSlotTableImpl -> allocateSlot
![[外链图片转存中...(img-8IcyrUGN-1705648822041)]](/img/e8/c54586ef279f4452ad9fb76eb4247b56.png)
其中:offerSlotsToJobManager
- TaskExecutor -> internalOfferSlotsToJobManager
- JobMaster -> offerSlots
- DeclarativeSlotPoolService -> offerSlots
- TestingDeclarativeSlotPool -> offerSlots
- TestingDeclarativeSlotPool -> internalOfferSlots
![[外链图片转存中...(img-ldrX1KKH-1705648822041)]](/img/4a/255ca348887e4a1db2981cd7211e9ad7.png)
本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://net2asp.com/54033d8a2a.html
