房?jī)r(jià)走勢(shì)最新消息2022aso優(yōu)化運(yùn)營(yíng)
ResourceManager 總結(jié)
一、概述
1、ResourceManager 管理 Flink 集群中的計(jì)算資源,計(jì)算資源主要來(lái)自 TaskManager 組件。
2、如果集群采用 Native【本地模式】部署,則 ResourceManager 會(huì)動(dòng)態(tài)地向集群資源管理器申請(qǐng) Container 并啟動(dòng)TaskManager,例如Hadoop Yarn、Kubernetes等。
3、ResourceManager主要接收來(lái)自 JobManager 的 SlotRequest 和 TaskManager 的 SlotReport。
二、分類
1、動(dòng)態(tài)資源管理 和 不支持動(dòng)態(tài)資源管理
1)一類支持動(dòng)態(tài)資源管理,例如KubernetesResourceManager、YarnResourceManager及MesosResourceManager
支持動(dòng)態(tài)資源管理的集群類型,可以按需啟動(dòng)TaskManager資源,根據(jù)Job所需的資源請(qǐng)求,動(dòng)態(tài)啟動(dòng)TaskManager節(jié)點(diǎn),這種資源管理方式不用擔(dān)心資源浪費(fèi)和資源動(dòng)態(tài)伸縮的問(wèn)題。
實(shí)現(xiàn)動(dòng)態(tài)資源管理的ResourceManager需要繼承ActiveResourceManager基本實(shí)現(xiàn)類。
2)另一類不支持動(dòng)態(tài)資源管理,例如StandaloneResourceManager
2、分類圖
三、核心服務(wù)
ResourceManagerRuntimeServices 中包含 SlotManager 和 JobLeaderldService 兩個(gè)主要服務(wù)和 HeartbeatService 心跳服務(wù)。
1、SlotManager 管理整個(gè)集群的 Slot 計(jì)算資源,并對(duì) Slot 計(jì)算資源進(jìn)行統(tǒng)一的分配和管理,同時(shí)實(shí)現(xiàn)了對(duì) TaskManager 信息的注冊(cè)和管理。
2、JobLeaderldService 通過(guò)實(shí)現(xiàn) jobLeaderldListeners 實(shí)時(shí)監(jiān)聽(tīng) JobManager 的運(yùn)行狀態(tài),以獲取集群?jiǎn)?dòng)的作業(yè)對(duì)應(yīng)的 JobLeaderld 信息,防止出現(xiàn) JobManager 無(wú)法連接的情況,用于管理注冊(cè)的 JobManager 節(jié)點(diǎn),包括對(duì) JobManager 的注冊(cè)和注銷等操作。
3、HeartbeatService 主要通過(guò) TaskManagerHeartbeatListener 和 JobManagerHeartbeatListener 兩個(gè)監(jiān)聽(tīng)器收集來(lái)自 TaskManager和 JobManager 的心跳信息,以保證整個(gè)運(yùn)行時(shí)中各個(gè)組件之間能夠正常通信。
四、ResourceManager 的初始化和啟動(dòng)
DefaultDispatcherResourceManagerComponentFactory#create 方法
1、初始化 ResourceManager
resourceManager =resourceManagerFactory.createResourceManager(configuration,ResourceID.generate(),rpcService,highAvailabilityServices,heartbeatServices,fatalErrorHandler,new ClusterInformation(hostname, blobServer.getPort()),webMonitorEndpoint.getRestBaseUrl(),metricRegistry,hostname,ioExecutor);
1)創(chuàng)建 ResourceManagerRuntimeServices
1.創(chuàng)建 SlotManager
SlotMatchingStrategy 根據(jù)作業(yè)中給定的 ResourceProfile 匹配 Slot 計(jì)算資源。SlotMatchingStrategy主要分為兩種類型:
一種是LeastUtilizationSlotMatchingStrategy,即按照利用率最低原則匹配Slot資源,盡可能保證TaskExecutor上資源的使用率處于比較低的水平,這種策略能夠有效降低機(jī)器的負(fù)載。
另一種是AnyMatchingSlotMatchingStrategy,即直接返回第一個(gè)匹配的Slot資源策略。
private static SlotManager createSlotManager(ResourceManagerRuntimeServicesConfiguration configuration,ScheduledExecutor scheduledExecutor,SlotManagerMetricGroup slotManagerMetricGroup) {final SlotManagerConfiguration slotManagerConfiguration =configuration.getSlotManagerConfiguration();if (configuration.isEnableFineGrainedResourceManagement()) {return new FineGrainedSlotManager(scheduledExecutor,slotManagerConfiguration,slotManagerMetricGroup,new DefaultResourceTracker(),new FineGrainedTaskManagerTracker(),new DefaultSlotStatusSyncer(slotManagerConfiguration.getTaskManagerRequestTimeout()),new DefaultResourceAllocationStrategy(SlotManagerUtils.generateTaskManagerTotalResourceProfile(slotManagerConfiguration.getDefaultWorkerResourceSpec()),slotManagerConfiguration.getNumSlotsPerWorker()),Time.milliseconds(REQUIREMENTS_CHECK_DELAY_MS));} else if (configuration.isDeclarativeResourceManagementEnabled()) {return new DeclarativeSlotManager(scheduledExecutor,slotManagerConfiguration,slotManagerMetricGroup,new DefaultResourceTracker(),new DefaultSlotTracker());} else {return new SlotManagerImpl(scheduledExecutor, slotManagerConfiguration, slotManagerMetricGroup);}}
2.創(chuàng)建 JobLeaderIdService
final JobLeaderIdService jobLeaderIdService =new DefaultJobLeaderIdService(highAvailabilityServices, scheduledExecutor, configuration.getJobTimeout());
2)返回創(chuàng)建的 StandaloneResourceManager
return new StandaloneResourceManager(rpcService,resourceId,highAvailabilityServices,heartbeatServices,resourceManagerRuntimeServices.getSlotManager(),ResourceManagerPartitionTrackerImpl::new,resourceManagerRuntimeServices.getJobLeaderIdService(),clusterInformation,fatalErrorHandler,resourceManagerMetricGroup,standaloneClusterStartupPeriodTime,AkkaUtils.getTimeoutAsTime(configuration),ioExecutor);
在 StandaloneResourceManager 構(gòu)造方法中啟動(dòng) RpcServer
this.rpcServer = rpcService.startServer(this);
2、啟動(dòng) ResourceManager
resourceManager.start()->ResourceManager#onStart
ResourceManager#startResourceManagerServices
1)獲取 leaderElectionService
leaderElectionService =highAvailabilityServices.getResourceManagerLeaderElectionService();
2)初始化 resourceManagerDriver【ActiveResourceManager需要】
resourceManagerDriver.initialize(this, new GatewayMainThreadExecutor(), ioExecutor);
3)啟動(dòng) leader 競(jìng)選,在 leader 節(jié)點(diǎn)啟動(dòng)服務(wù)
1.啟動(dòng)心跳服務(wù)
在ResourceManager中HeartbeatService的啟動(dòng)方法中,包括了對(duì)taskManagerHeartbeatManager和jobManagerHeartbeatManager兩個(gè)心跳管理服務(wù)的啟動(dòng)操作。
而心跳管理服務(wù)主要通過(guò)TaskManagerHeartbeatListener和JobManagerHeartbeatListener兩個(gè)監(jiān)聽(tīng)器收集來(lái)自TaskManager和JobManager的心跳信息,以保證整個(gè)運(yùn)行時(shí)中各個(gè)組件之間能夠正常通信。
startHeartbeatServices();
2.啟動(dòng) slotManager 服務(wù)
通過(guò)scheduledExecutor線程池啟動(dòng)TaskManager周期性超時(shí)檢查服務(wù),通過(guò)checkTaskManagerTimeouts()方法實(shí)現(xiàn)該檢查,防止TaskManager長(zhǎng)時(shí)間掉線等問(wèn)題。
啟動(dòng)單獨(dú)的線程對(duì)提交的SlotRequest進(jìn)行周期性超時(shí)檢查,防止Slot請(qǐng)求超時(shí)。
slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl());
4)啟動(dòng) jobLeaderIdService
jobLeaderIdService.start(new JobLeaderIdActionsImpl());