基層建設論文收錄在哪個網站公司做網站一般多少錢
文章目錄
- 1 解析命令行參數和配置文件
- 2 創(chuàng)建并啟動 NamesrvController
- 2.1 創(chuàng)建 NamesrvController 對象
- 2.2 啟動 NamesrvController 對象
- 第一步:初始化 controller
- 第二步:注冊 JVM 鉤子
- 第二步:啟動 controller
RocketMQ是一個分布式消息中間件,它的核心組件之一是namesrv,負責管理broker的路由信息和kv配置。本文將介紹RocketMQ5.1版本中namesrv的啟動過程,包括如何解析命令行參數、加載配置文件、初始化和啟動namesrv控制器等。
首先,我們需要在環(huán)境變量中設置ROCKETMQ_HOME,指向RocketMQ的安裝目錄。然后,我們可以使用如下命令啟動namesrv:
nohup sh mqnamesrv &
這條命令執(zhí)行運行的是 namesrv.NamesrvStartup#main
方法。
public static void main(String[] args) {main0(args);controllerManagerMain();
}
啟動過程分為兩部分
main0(args)
:啟動 NamesrvControllercontrollerManagerMain()
:啟動 ControllerManager
本文只分析 NamesrvController 的啟動
即 namesrv.NamesrvStartup#main0
方法,代碼如下:
public static NamesrvController main0(String[] args) {try {// 解析命令行參數和配置文件parseCommandlineAndConfigFile(args);// 創(chuàng)建并啟動 NamesrvControllerNamesrvController controller = createAndStartNamesrvController();return controller;} catch (Throwable e) {e.printStackTrace();System.exit(-1);}return null;
}
namesrv的啟動過程可以分為以下幾個步驟:
- 解析命令行參數和配置文件,初始化namesrvConfig、nettyServerConfig、nettyClientConfig等配置對象
- 創(chuàng)建并啟動 NamesrvController 對象,該對象是namesrv的核心控制器,它持有各種配置對象、網絡通信對象、路由管理對象等
1 解析命令行參數和配置文件
解析命令行參數和配置文件的方法是 namesrv.NamesrvStartup#parseCommandlineAndConfigFile
,代碼如下:
public static void parseCommandlineAndConfigFile(String[] args) throws Exception {System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));Options options = ServerUtil.buildCommandlineOptions(new Options());CommandLine commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new DefaultParser());if (null == commandLine) {System.exit(-1);return;}// 創(chuàng)建配置類對象namesrvConfig = new NamesrvConfig();nettyServerConfig = new NettyServerConfig();nettyClientConfig = new NettyClientConfig();// 設置namesrv的監(jiān)聽端口nettyServerConfig.setListenPort(9876);// 如果命令行中有-c參數,則從配置文件中加載namesrvConfig、nettyServerConfig、nettyClientConfig、controllerConfig的屬性if (commandLine.hasOption('c')) {String file = commandLine.getOptionValue('c');if (file != null) {InputStream in = new BufferedInputStream(Files.newInputStream(Paths.get(file)));properties = new Properties();properties.load(in);MixAll.properties2Object(properties, namesrvConfig);MixAll.properties2Object(properties, nettyServerConfig);MixAll.properties2Object(properties, nettyClientConfig);if (namesrvConfig.isEnableControllerInNamesrv()) {controllerConfig = new ControllerConfig();MixAll.properties2Object(properties, controllerConfig);}namesrvConfig.setConfigStorePath(file);System.out.printf("load config properties file OK, %s%n", file);in.close();}}MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);// 如果命令行中有-p參數,則打印namesrvConfig、nettyServerConfig、nettyClientConfig、controllerConfig的屬性if (commandLine.hasOption('p')) {MixAll.printObjectProperties(logConsole, namesrvConfig);MixAll.printObjectProperties(logConsole, nettyServerConfig);MixAll.printObjectProperties(logConsole, nettyClientConfig);if (namesrvConfig.isEnableControllerInNamesrv()) {MixAll.printObjectProperties(logConsole, controllerConfig);}System.exit(0);}if (null == namesrvConfig.getRocketmqHome()) {System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);System.exit(-2);}MixAll.printObjectProperties(log, namesrvConfig);MixAll.printObjectProperties(log, nettyServerConfig);}
上述代碼的執(zhí)行流程如下:
-
創(chuàng)建一個NamesrvConfig對象,用于存儲namesrv的配置信息,如rocketmqHome、kvConfigPath等。NamesrvConfig類的具體屬性如下表所示
屬性名 含義 默認值 rocketmqHome RocketMQ的根目錄 環(huán)境變量ROCKETMQ_HOME kvConfigPath KV配置文件的路徑 ${user.home}/namesrv/kvConfig.json configStorePath 配置存儲文件的路徑 ${user.home}/namesrv/namesrv.properties productEnvName 環(huán)境名稱 center clusterTest 是否開啟集群測試 false orderMessageEnable 是否支持順序消息 false -
創(chuàng)建一個NettyServerConfig對象,用于存儲netty服務端的配置信息,如listenPort、serverWorkerThreads等。NettyServerConfig類的具體屬性如下表所示
屬性名 含義 默認值 listenPort 監(jiān)聽端口,用于接收客戶端的連接請求 9876 serverWorkerThreads 服務端工作線程數量,用于處理網絡IO事件或執(zhí)行業(yè)務任務 8 serverCallbackExecutorThreads 服務端回調執(zhí)行線程數量,用于執(zhí)行異步回調任務,如果為0,則使用公共線程池 0 serverSelectorThreads 服務端選擇器線程數量,用于接收和分發(fā)網絡IO事件,建議不要超過3個 3 serverOnewaySemaphoreValue 服務端單向請求信號量值,用于限制單向請求的并發(fā)度,防止資源耗盡 256 serverAsyncSemaphoreValue 服務端異步請求信號量值,用于限制異步請求的并發(fā)度,防止資源耗盡 64 serverChannelMaxIdleTimeSeconds 服務端通道最大空閑時間(秒),超過該時間則關閉連接,釋放資源 120 serverSocketSndBufSize 服務端Socket發(fā)送緩沖區(qū)大小(字節(jié)),如果為-1,則使用操作系統(tǒng)默認值 -1 serverSocketRcvBufSize 服務端Socket接收緩沖區(qū)大小(字節(jié)),如果為-1,則使用操作系統(tǒng)默認值 -1 serverPooledByteBufAllocatorEnable 是否啟用服務端池化ByteBuf分配器,可以提高內存利用率和性能 true useEpollNativeSelector 是否使用Epoll本地選擇器,可以提高Linux平臺下的網絡IO效率,需要操作系統(tǒng)支持 false -
創(chuàng)建一個NettyClientConfig對象,用于存儲netty客戶端的配置信息,如clientWorkerThreads、clientCallbackExecutorThreads等。NettyClientConfig類的具體屬性如下表所示
屬性名 含義 默認值 clientWorkerThreads 客戶端工作線程數量,用于處理網絡IO事件或執(zhí)行業(yè)務任務 4 clientCallbackExecutorThreads 客戶端回調執(zhí)行線程數量,用于執(zhí)行異步回調任務,如果為0,則使用公共線程池 可用處理器的數量 connectTimeoutMillis 連接超時時間(毫秒),超過該時間則放棄連接 3000 channelNotActiveInterval 通道不活躍的間隔時間(毫秒),超過該時間則關閉通道,釋放資源 1000 * 60 clientChannelMaxIdleTimeSeconds 客戶端通道最大空閑時間(秒),超過該時間則關閉連接,釋放資源 120 clientSocketSndBufSize 客戶端Socket發(fā)送緩沖區(qū)大小(字節(jié)),如果為-1,則使用操作系統(tǒng)默認值 -1 clientSocketRcvBufSize 客戶端Socket接收緩沖區(qū)大小(字節(jié)),如果為-1,則使用操作系統(tǒng)默認值 -1 clientPooledByteBufAllocatorEnable 是否啟用客戶端池化ByteBuf分配器,可以提高內存利用率和性能 false clientCloseSocketIfTimeout 是否在超時時關閉客戶端Socket,可以避免資源泄漏 false useTLS 是否使用TLS協(xié)議進行加密通信,需要操作系統(tǒng)支持 false clientOnewaySemaphoreValue 客戶端單向請求信號量值,用于限制單向請求的并發(fā)度,防止資源耗盡 65535 clientAsyncSemaphoreValue 客戶端異步請求信號量值,用于限制異步請求的并發(fā)度,防止資源耗盡 65535 -
解析命令行參數,支持以下選項:
-c
指定配置文件路徑,如果指定了配置文件,會從文件中加載namesrvConfig、nettyServerConfig和nettyClientConfig的屬性。-p
打印所有配置信息,并退出程序- 其他選項會被轉換為properties對象,并覆蓋namesrvConfig、nettyServerConfig和nettyClientConfig的屬性
2 創(chuàng)建并啟動 NamesrvController
創(chuàng)建并啟動 NamesrvController 的方法是 namesrv.NamesrvStartup#createAndStartNamesrvController
:
public static NamesrvController createAndStartNamesrvController() throws Exception {// 創(chuàng)建 NamesrvControllerNamesrvController controller = createNamesrvController();// 啟動 NamesrvControllerstart(controller);// 輸出啟動成功的日志NettyServerConfig serverConfig = controller.getNettyServerConfig();String tip = String.format("The Name Server boot success. serializeType=%s, address %s:%d", RemotingCommand.getSerializeTypeConfigInThisServer(), serverConfig.getBindAddress(), serverConfig.getListenPort());log.info(tip);System.out.printf("%s%n", tip);return controller;
}
代碼執(zhí)行流程如下:
- 調用createNamesrvController方法,創(chuàng)建NamesrvController對象
- 調用start方法,傳入NamesrvController對象,啟動 NamesrvController 對象
- 格式化一個提示信息,包含序列化類型、綁定地址和監(jiān)聽端口信息
- 使用log對象記錄提示信息到日志文件中
- 把提示信息打印到控制臺中
- 返回NamesrvController對象
如果在執(zhí)行中出現(xiàn)異常,則拋出異常并退出程序
2.1 創(chuàng)建 NamesrvController 對象
在 namesrv.NamesrvStartup#createAndStartNamesrvController
方法中調用了 namesrv.NamesrvStartup#createNamesrvController
方法,用于創(chuàng)建 NamesrvController 對象,代碼為:
public static NamesrvController createNamesrvController() {// 創(chuàng)建 NamesrvController,傳入namesrvConfig、nettyServerConfig、nettyClientConfig配置final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig, nettyClientConfig);// remember all configs to prevent discardcontroller.getConfiguration().registerConfig(properties);return controller;
}
創(chuàng)建 NamesrvController 對象的步驟如下:
- 創(chuàng)建一個NamesrvController對象,傳入namesrvConfig、nettyServerConfig和nettyClientConfig對象,這些對象存儲了namesrv的業(yè)務配置和網絡配置
- 調用NamesrvController對象的getConfiguration方法,獲取一個Configuration對象,該對象負責管理namesrv的配置信息
- 調用Configuration對象的registerConfig方法,傳入properties對象,將properties對象中的配置信息注冊到Configuration對象中
- 返回NamesrvController對象
這個函數的返回值是NamesrvController對象
2.2 啟動 NamesrvController 對象
在 namesrv.NamesrvStartup#createAndStartNamesrvController
方法中調用了 namesrv.NamesrvStartup#start
方法,用于啟動創(chuàng)建好的 NamesrvController 對象,代碼為:
public static NamesrvController start(final NamesrvController controller) throws Exception {if (null == controller) {throw new IllegalArgumentException("NamesrvController is null");}// 初始化controllerboolean initResult = controller.initialize();if (!initResult) {controller.shutdown();System.exit(-3);}// 當jvm關閉的時候,會執(zhí)行系統(tǒng)中已經設置的所有通過方法addShutdownHook添加的鉤子,當系統(tǒng)執(zhí)行完這些鉤子后,jvm才會關閉。// 所以這些鉤子可以在jvm關閉的時候進行內存清理、對象銷毀等操作。Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, (Callable<Void>) () -> {controller.shutdown();return null;}));// 啟動controllercontroller.start();return controller;
}
啟動 NamesrvController 對象的步驟如下:
第一步:初始化 controller
調用 namesrv.NamesrvController#initialize
方法,初始化controller。如果初始化失敗,就調用controller的shutdown方法,關閉controller,并退出程序
public boolean initialize() {loadConfig();initiateNetworkComponents();initiateThreadExecutors();registerProcessor();startScheduleService();initiateSslContext();initiateRpcHooks();return true;
}
初始化操作包括:
-
調用
namesrv.NamesrvController#loadConfig
方法從 NamesrvConfig 對象中保存的 kvConfigPath 指定的文件中加載kv配置,并創(chuàng)建一個KVConfigManager對象,用于管理和打印kv配置private void loadConfig() {this.kvConfigManager.load(); }
-
調用
namesrv.NamesrvController#initiateNetworkComponents
方法初始化網絡組件,包括remotingClient和remotingServerremotingClient是一個NettyRemotingClient對象,它用于向其他服務發(fā)送請求或響應。remotingServer是一個NettyRemotingServer對象,它用于接收和處理來自其他服務的請求或響應。BrokerHousekeepingService對象用于處理broker的連接和斷開事件。
private void initiateNetworkComponents() {this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);this.remotingClient = new NettyRemotingClient(this.nettyClientConfig); }
-
調用
namesrv.NamesrvController#initiateThreadExecutors
方法初始化兩個線程池,一個是defaultExecutor,用于處理默認的遠程請求;另一個是clientRequestExecutor,用于處理客戶端的路由信息請求。這兩個線程池都使用了LinkedBlockingQueue作為任務隊列,并且重寫了newTaskFor方法,使用FutureTaskExt包裝了Runnable任務。private void initiateThreadExecutors() {this.defaultThreadPoolQueue = new LinkedBlockingQueue<>(this.namesrvConfig.getDefaultThreadPoolQueueCapacity());this.defaultExecutor = new ThreadPoolExecutor(this.namesrvConfig.getDefaultThreadPoolNums(),this.namesrvConfig.getDefaultThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS,this.defaultThreadPoolQueue, new ThreadFactoryImpl("RemotingExecutorThread_")) {@Overrideprotected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {return new FutureTaskExt<>(runnable, value);}};this.clientRequestThreadPoolQueue = new LinkedBlockingQueue<>(this.namesrvConfig.getClientRequestThreadPoolQueueCapacity());this.clientRequestExecutor = new ThreadPoolExecutor(this.namesrvConfig.getClientRequestThreadPoolNums(),this.namesrvConfig.getClientRequestThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS,this.clientRequestThreadPoolQueue, new ThreadFactoryImpl("ClientRequestExecutorThread_")) {@Overrideprotected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {return new FutureTaskExt<>(runnable, value);}}; }
-
調用
namesrv.NamesrvController#registerProcessor
方法根據namesrvConfig.isClusterTest()
的值,選擇使用ClusterTestRequestProcessor或者DefaultRequestProcessor作為默認處理器- ClusterTestRequestProcessor是一個用于集群測試的處理器,它會在請求前后添加一些環(huán)境信息,比如產品環(huán)境名稱、請求時間等
- DefaultRequestProcessor是一個用于正常運行的處理器,它會根據請求的類型,調用不同的方法來處理,比如注冊Broker、獲取路由信息、更新配置等。
- 在
namesrvConfig.isClusterTest() = false
時如果收到請求的requestCode
等于RequestCode.GET_ROUTEINFO_BY_TOPIC
則會使用ClientRequestProcessor來處理;當收到其他請求時,會使用DefaultRequestProcessor來處理。
private void registerProcessor() {if (namesrvConfig.isClusterTest()) {this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()), this.defaultExecutor);} else {// Support get route info only temporarilyClientRequestProcessor clientRequestProcessor = new ClientRequestProcessor(this);this.remotingServer.registerProcessor(RequestCode.GET_ROUTEINFO_BY_TOPIC, clientRequestProcessor, this.clientRequestExecutor);this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.defaultExecutor);} }
此部分為 RIP 29 Optimize RocketMQ NameServer 中
Thread pool separation
改進。在改進之前nameserver使用同一個線程池和隊列來處理所有的客戶端路由請求、broker注冊請求等,如果其中一種類型的請求爆發(fā),會影響所有的請求。為了解決這個問題,RIP-29 將最重要的客戶端路由請求單獨隔離出來,使用不同的線程池和隊列,隊列大小和線程數都可以配置,這樣可以保證不同類型的請求之間不會相互影響,如下圖所示 -
調用
namesrv.NamesrvController#startScheduleService
方法啟動定時服務,執(zhí)行以下三個任務:- 每隔一段時間掃描不活躍的broker,并清理路由信息
- 每隔 10 分鐘打印所有的KV配置信息
- 每隔 1 秒打印線程池的水位日志,即客戶端請求線程池和默認線程池的隊列大小和頭部任務的慢時間(從創(chuàng)建到執(zhí)行的時間)
private void startScheduleService() {// 周期性掃描不活躍的Broker,并從路由信息中移除this.scanExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker,5, this.namesrvConfig.getScanNotActiveBrokerInterval(), TimeUnit.MILLISECONDS);// 每隔 10 分鐘打印KVConfig 信息this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically,1, 10, TimeUnit.MINUTES);// 每隔 1 秒打印線程池的水位日志,// 即客戶端請求線程池和默認線程池的隊列大小和頭部任務的慢時間(從創(chuàng)建到執(zhí)行的時間)this.scheduledExecutorService.scheduleAtFixedRate(() -> {try {NamesrvController.this.printWaterMark();} catch (Throwable e) {LOGGER.error("printWaterMark error.", e);}}, 10, 1, TimeUnit.SECONDS); }
-
調用
namesrv.NamesrvController#initiateSslContext
方法初始化SSL上下文,即配置remotingServer使用TLS協(xié)議進行安全通信 -
調用
namesrv.NamesrvController#initiateRpcHooks
方法注冊RPC鉤子,即在remotingServer處理請求之前或之后執(zhí)行一些自定義的邏輯private void initiateRpcHooks() {this.remotingServer.registerRPCHook(new ZoneRouteRPCHook()); }
在
namesrv.route.ZoneRouteRPCHook
類中重寫了doAfterResponse
方法,它會在處理GET_ROUTEINFO_BY_TOPIC請求(即請求的requestCode = RequestCode.GET_ROUTEINFO_BY_TOPIC
)時,根據請求中的zoneName參數,過濾掉不屬于該區(qū)域的broker和queue數據,從而實現(xiàn)區(qū)域隔離的功能。具體來說,
doAfterResponse
會設置 response 的 body 為namesrv.route.ZoneRouteRPCHook#filterByZoneName
方法返回值的字節(jié)數組格式,filterByZoneName
方法作用是返回過濾掉不屬于該區(qū)域的broker和queue數據后的TopicRouteData數據對象@Override public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {if (RequestCode.GET_ROUTEINFO_BY_TOPIC != request.getCode()) {return;}// 省略部分代碼TopicRouteData topicRouteData = RemotingSerializable.decode(response.getBody(), TopicRouteData.class);response.setBody(filterByZoneName(topicRouteData, zoneName).encode()); }
第二步:注冊 JVM 鉤子
通過addShutdownHook方法注冊一個ShutdownHookThread對象,即 JVM 鉤子,用來在程序終止時調用controller的shutdown方法,釋放資源
public void shutdown() {this.remotingClient.shutdown();this.remotingServer.shutdown();this.defaultExecutor.shutdown();this.clientRequestExecutor.shutdown();this.scheduledExecutorService.shutdown();this.scanExecutorService.shutdown();this.routeInfoManager.shutdown();if (this.fileWatchService != null) {this.fileWatchService.shutdown();}
}
第二步:啟動 controller
調用 namesrv.NamesrvController#start
方法,啟動 controller
public void start() throws Exception {this.remotingServer.start();// In test scenarios where it is up to OS to pick up an available port, set the listening port back to configif (0 == nettyServerConfig.getListenPort()) {nettyServerConfig.setListenPort(this.remotingServer.localListenPort());}this.remotingClient.updateNameServerAddressList(Collections.singletonList(NetworkUtil.getLocalAddress()+ ":" + nettyServerConfig.getListenPort()));this.remotingClient.start();if (this.fileWatchService != null) {this.fileWatchService.start();}this.routeInfoManager.start();
}
啟動操作包括:
- 調用remotingServer對象的start方法,啟動一個NettyRemotingServer,用于接收和處理客戶端的請求。
- 如果nettyServerConfig對象的listenPort屬性為0,說明是由操作系統(tǒng)自動分配一個可用端口,那么將remotingServer對象的localListenPort屬性賦值給nettyServerConfig對象的listenPort屬性,保持一致。
- 調用remotingClient對象的updateNameServerAddressList方法,更新本地地址列表,只包含當前機器的IP地址和端口號。
- 調用remotingClient對象的start方法,啟動一個NettyRemotingClient,用于向其他服務發(fā)送請求。
- 如果fileWatchService對象不為空,調用它的start方法,啟動一個文件監(jiān)視服務,用于動態(tài)加載證書文件。
- 調用routeInfoManager對象的start方法,啟動一個路由信息管理器,用于維護Broker和Topic的路由關系。
至此,NamesrvController 的啟動流程結束