公司網(wǎng)站的管理和維護uc瀏覽網(wǎng)頁版進入
目錄
說明
認識配置中心
Nacos架構(gòu)圖
Nacos配置管理實現(xiàn)原理
核心源碼分析-客戶端
核心源碼分析-服務(wù)端
配置修改的實時通知
主流配置中心對比
小小收獲
說明
本篇文章主要目的是從頭到尾比較粗粒度的分析Nacos配置中心的一些實現(xiàn),很多細節(jié)沒有涉及,希望能給大家?guī)硪欢ǖ膯l(fā)。如果大家對其中更多的實現(xiàn)細節(jié)感興趣,可以留言區(qū)留言大家一起討論。下面就讓我們一起開始Nacos配置中心的探索之旅吧!
認識配置中心
集中配置管理解決了之前各個應(yīng)用單獨維護配置在下面這些方面的不足:
- 動態(tài)更新
- 集中式管理
- 安全性和權(quán)限管理
- 不同部署環(huán)境的隔離問題
本篇內(nèi)容將帶領(lǐng)大家深入理解配置中心Nacos,如有紕漏,望大家及時指正。下面讓我們一起開始進入正題。
Nacos架構(gòu)圖
學(xué)習(xí)任何技術(shù),我們首先看下它官方的架構(gòu)圖,有個整體的認識。Nacos架構(gòu)圖如下:
核心內(nèi)容就是:Nacos Server作為Nacos的服務(wù)端,其中的Config Service模塊提供了配置管理服務(wù),然后對外提供了OpenAPI接口供客戶端調(diào)用。實際應(yīng)用當(dāng)中,我們是通過Nacos客戶端SDK來完成相關(guān)接口的調(diào)用的,SDK屏蔽了所有接口調(diào)用的細節(jié),我們只需要完成相關(guān)的配置即可。
Nacos配置管理實現(xiàn)原理
實際應(yīng)用的時候,我們會通過下面這樣的方式來讀取配置中心的值:
@Value("${manual.onFlag}")
private Boolean manualOnFlag;
這是一個開關(guān)配置,為什么我們通過@Value注解,就可以獲取到遠程配置中心的數(shù)據(jù)呢?因為遠程配置中心的所有數(shù)據(jù),在應(yīng)用程序啟動完成之后,都會被填充到Enviroment中,它是Spring中管理所有屬性值的對象。
接下來,我們就來一起梳理一下配置中心的實現(xiàn)細節(jié),并搞懂下面幾個事情:
- 配置中心的數(shù)據(jù)是怎么填充到Enviroment中的?
- 配置如何動態(tài)刷新?即改了配置內(nèi)容,應(yīng)用無需重啟即可生效。
其整體實現(xiàn)思路:客戶端啟動,會去遠程配置中心拉取當(dāng)前應(yīng)用關(guān)心的相關(guān)配置信息(這是主動拉取的過程,主動拉取的時候,會同服務(wù)端建立長輪詢機制);如果服務(wù)端(Nacos-Server)配置信息發(fā)生了變更,會推送變更的配置項到客戶端,然后客戶端拉取變更的配置項對應(yīng)的最新內(nèi)容(這是服務(wù)端推送),基于長輪詢機制實現(xiàn)。具體實現(xiàn)細節(jié)我們下面再慢慢分析。
這里貼兩張圖用于我們接下來更加直觀地理解Nacos配置管理的實現(xiàn)原理。
總的來說,Nacos配置中心采用了推拉結(jié)合的方式來實現(xiàn)配置管理,主動拉取會存在時效性,所以再基于長輪詢機制實現(xiàn)了推模式,解決主動拉取方式存在的時效性問題。
核心源碼分析-客戶端
- 核心類
Spring Cloud中的PropertySourceLocator類實現(xiàn)應(yīng)用外部化配置可動態(tài)加載,Spring Cloud Alibaba Nacos中的NacosPropertySourceLocator類實現(xiàn)了該接口,其中的locate方法就是配置獲取的核心邏輯。
public PropertySource<?> locate(Environment env) {this.nacosConfigProperties.setEnvironment(env);ConfigService configService = this.nacosConfigManager.getConfigService();if (null == configService) {log.warn("no instance of config service found, can't load config from nacos");return null;} else {long timeout = (long)this.nacosConfigProperties.getTimeout();this.nacosPropertySourceBuilder = new NacosPropertySourceBuilder(configService, timeout);String name = this.nacosConfigProperties.getName();String dataIdPrefix = this.nacosConfigProperties.getPrefix();if (StringUtils.isEmpty(dataIdPrefix)) {dataIdPrefix = name;}if (StringUtils.isEmpty(dataIdPrefix)) {dataIdPrefix = env.getProperty("spring.application.name");}CompositePropertySource composite = new CompositePropertySource("NACOS");this.loadSharedConfiguration(composite);this.loadExtConfiguration(composite);this.loadApplicationConfiguration(composite, dataIdPrefix, this.nacosConfigProperties, env);return composite;}}
?從這里我們可以看出,三類配置的一個加載順序:先是共享配置shared-config,然后是擴展配置ext-config,最后是應(yīng)用配置。如果遇到相同的配置項,后者會覆蓋前者。
//dataIdPrefix規(guī)則:spring.cloud.nacos.config.prefix如果沒有配置,使用spring.cloud.nacos.config.name;如果prefix和name都未配置,使用spring.application.name
//fileExtension:配置文件中的spring.cloud.nacos.config.file-extension屬性,默認為properties
private void loadApplicationConfiguration(CompositePropertySource compositePropertySource, String dataIdPrefix, NacosConfigProperties properties, Environment environment) {String fileExtension = properties.getFileExtension();String nacosGroup = properties.getGroup();//獲取配置方式一:Nacos中的dataId為dataIdPrefixthis.loadNacosDataIfPresent(compositePropertySource, dataIdPrefix, nacosGroup, fileExtension, true);//獲取配置方式二:Nacos中的dataId為dataIdPrefix.fileExtensionthis.loadNacosDataIfPresent(compositePropertySource, dataIdPrefix + "." + fileExtension, nacosGroup, fileExtension, true);String[] var7 = environment.getActiveProfiles();int var8 = var7.length;for(int var9 = 0; var9 < var8; ++var9) {String profile = var7[var9];String dataId = dataIdPrefix + "-" + profile + "." + fileExtension;//獲取配置方式三:Nacos中dataId為dataIdprefix-profile(區(qū)分不同環(huán)境).fileExtensionthis.loadNacosDataIfPresent(compositePropertySource, dataId, nacosGroup, fileExtension, true);}
}private void loadNacosDataIfPresent(final CompositePropertySource composite, final String dataId, final String group, String fileExtension, boolean isRefreshable) {if (null != dataId && dataId.trim().length() >= 1) {if (null != group && group.trim().length() >= 1) {NacosPropertySource propertySource = this.loadNacosPropertySource(dataId, group, fileExtension, isRefreshable);this.addFirstPropertySource(composite, propertySource, false);}}
}private NacosPropertySource loadNacosPropertySource(final String dataId, final String group, String fileExtension, boolean isRefreshable) {return NacosContextRefresher.getRefreshCount() != 0L && !isRefreshable ? NacosPropertySourceRepository.getNacosPropertySource(dataId, group) : this.nacosPropertySourceBuilder.build(dataId, group, fileExtension, isRefreshable);
}private void addFirstPropertySource(final CompositePropertySource composite, NacosPropertySource nacosPropertySource, boolean ignoreEmpty) {if (null != nacosPropertySource && null != composite) {if (!ignoreEmpty || !((Map)nacosPropertySource.getSource()).isEmpty()) {composite.addFirstPropertySource(nacosPropertySource);}}
}public void setNacosConfigManager(NacosConfigManager nacosConfigManager) {this.nacosConfigManager = nacosConfigManager;
}
客戶端SDK調(diào)用Nacos-Server提供的OpenAPI接口獲取配置信息。
NacosPropertySource build(String dataId, String group, String fileExtension, boolean isRefreshable) {List<PropertySource<?>> propertySources = this.loadNacosData(dataId, group, fileExtension);NacosPropertySource nacosPropertySource = new NacosPropertySource(propertySources, group, dataId, new Date(), isRefreshable);NacosPropertySourceRepository.collectNacosPropertySource(nacosPropertySource);return nacosPropertySource;
}private List<PropertySource<?>> loadNacosData(String dataId, String group, String fileExtension) {String data = null;try {data = this.configService.getConfig(dataId, group, this.timeout);if (StringUtils.isEmpty(data)) {log.warn("Ignore the empty nacos configuration and get it based on dataId[{}] & group[{}]", dataId, group);return Collections.emptyList();}if (log.isDebugEnabled()) {log.debug(String.format("Loading nacos data, dataId: '%s', group: '%s', data: %s", dataId, group, data));}return NacosDataParserHandler.getInstance().parseNacosData(dataId, data, fileExtension);} catch (NacosException var6) {log.error("get data from Nacos error,dataId:{} ", dataId, var6);} catch (Exception var7) {log.error("parse data from Nacos error,dataId:{},data:{}", new Object[]{dataId, data, var7});}return Collections.emptyList();
}
到這里,我們從外部讀取到了配置信息放入到PropertySource,那么它是在什么地方合并到Spring中的環(huán)境對象Environment中去的呢?答案是PropertySourceBootstrapConfiguration類中的initialize方法。
public void initialize(ConfigurableApplicationContext applicationContext) {List<PropertySource<?>> composite = new ArrayList();AnnotationAwareOrderComparator.sort(this.propertySourceLocators);boolean empty = true;ConfigurableEnvironment environment = applicationContext.getEnvironment();Iterator var5 = this.propertySourceLocators.iterator();while(true) {Collection source;do {do {if (!var5.hasNext()) {if (!empty) {MutablePropertySources propertySources = environment.getPropertySources();String logConfig = environment.resolvePlaceholders("${logging.config:}");LogFile logFile = LogFile.get(environment);Iterator var15 = environment.getPropertySources().iterator();while(var15.hasNext()) {PropertySource<?> p = (PropertySource)var15.next();if (p.getName().startsWith("bootstrapProperties")) {propertySources.remove(p.getName());}}this.insertPropertySources(propertySources, composite);this.reinitializeLoggingSystem(environment, logConfig, logFile);this.setLogLevels(applicationContext, environment);this.handleIncludedProfiles(environment);}return;}PropertySourceLocator locator = (PropertySourceLocator)var5.next();source = locator.locateCollection(environment);} while(source == null);} while(source.size() == 0);List<PropertySource<?>> sourceList = new ArrayList();Iterator var9 = source.iterator();while(var9.hasNext()) {PropertySource<?> p = (PropertySource)var9.next();if (p instanceof EnumerablePropertySource) {EnumerablePropertySource<?> enumerable = (EnumerablePropertySource)p;sourceList.add(new BootstrapPropertySource(enumerable));} else {sourceList.add(new SimpleBootstrapPropertySource(p));}}logger.info("Located property source: " + sourceList);composite.addAll(sourceList);empty = false;}
}
分析到這里,我們應(yīng)該就能明白我們?yōu)楹文茉诔绦蛑型ㄟ^@Value注解,獲取對應(yīng)的配置屬性的值了。
- 配置監(jiān)聽
現(xiàn)在我們獲取到了遠程配置中心的配置數(shù)據(jù),如果配置發(fā)生了變更,那我們怎么感知到呢?通過對相應(yīng)的配置添加監(jiān)聽來實現(xiàn)配置變更的動態(tài)感知,當(dāng)服務(wù)端配置發(fā)生變更,通過長輪詢機制,推送變化的配置項key到客戶端,然后客戶端重新從服務(wù)端去獲取最新的配置數(shù)據(jù)。
那么配置監(jiān)聽具體是怎么實現(xiàn)的呢?通過相關(guān)的自動裝配類可以找到NacosContextRefresher類,它監(jiān)聽ApplicationReadyEvent事件,該事件在上下文準備完畢之后發(fā)布,這里會完成Nacos事件監(jiān)聽的注冊。
package com.alibaba.cloud.nacos.refresh;
public class NacosContextRefresher implements ApplicationListener<ApplicationReadyEvent>, ApplicationContextAware {private static final Logger log = LoggerFactory.getLogger(NacosContextRefresher.class);private static final AtomicLong REFRESH_COUNT = new AtomicLong(0L);private NacosConfigProperties nacosConfigProperties;private final boolean isRefreshEnabled;private final NacosRefreshHistory nacosRefreshHistory;private final ConfigService configService;private ApplicationContext applicationContext;private AtomicBoolean ready = new AtomicBoolean(false);private Map<String, Listener> listenerMap = new ConcurrentHashMap(16);public NacosContextRefresher(NacosConfigManager nacosConfigManager, NacosRefreshHistory refreshHistory) {this.nacosConfigProperties = nacosConfigManager.getNacosConfigProperties();this.nacosRefreshHistory = refreshHistory;this.configService = nacosConfigManager.getConfigService();this.isRefreshEnabled = this.nacosConfigProperties.isRefreshEnabled();}//1.監(jiān)聽ApplicationReadyEvent事件,該事件在上下文準備完畢之后發(fā)布public void onApplicationEvent(ApplicationReadyEvent event) {if (this.ready.compareAndSet(false, true)) {this.registerNacosListenersForApplications();}}private void registerNacosListenersForApplications() {if (this.isRefreshEnabled()) {Iterator var1 = NacosPropertySourceRepository.getAll().iterator();while(var1.hasNext()) {NacosPropertySource propertySource = (NacosPropertySource)var1.next();if (propertySource.isRefreshable()) {String dataId = propertySource.getDataId();this.registerNacosListener(propertySource.getGroup(), dataId);}}}}//2.當(dāng)監(jiān)聽到事件之后,會調(diào)用registerNacosListenersForApplications方法來實現(xiàn)Nacos事件監(jiān)聽的注冊private void registerNacosListener(final String groupKey, final String dataKey) {String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey);Listener listener = (Listener)this.listenerMap.computeIfAbsent(key, (lst) -> {return new AbstractSharedListener() {public void innerReceive(String dataId, String group, String configInfo) {NacosContextRefresher.refreshCountIncrement();NacosContextRefresher.this.nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);//通過applicationContext.publishEvent發(fā)布一個RefreshEvent事件,而這個事件的監(jiān)聽實現(xiàn)在RefreshEventListener類中。NacosContextRefresher.this.applicationContext.publishEvent(new RefreshEvent(this, (Object)null, "Refresh Nacos config"));if (NacosContextRefresher.log.isDebugEnabled()) {NacosContextRefresher.log.debug(String.format("Refresh Nacos config group=%s,dataId=%s,configInfo=%s", group, dataId, configInfo));}}};});try {this.configService.addListener(dataKey, groupKey, listener);} catch (NacosException var6) {log.warn(String.format("register fail for nacos listener ,dataId=[%s],group=[%s]", dataKey, groupKey), var6);}}
}
可以看到如果監(jiān)聽到了相應(yīng)的事件,會發(fā)布RefreshEvent事件,它的監(jiān)聽實現(xiàn)在RefreshEventListener類中。
package org.springframework.cloud.endpoint.event;
public class RefreshEventListener implements SmartApplicationListener {private static Log log = LogFactory.getLog(RefreshEventListener.class);private ContextRefresher refresh;private AtomicBoolean ready = new AtomicBoolean(false);public void onApplicationEvent(ApplicationEvent event) {if (event instanceof ApplicationReadyEvent) {this.handle((ApplicationReadyEvent)event);} else if (event instanceof RefreshEvent) {this.handle((RefreshEvent)event);}}public void handle(ApplicationReadyEvent event) {this.ready.compareAndSet(false, true);}public void handle(RefreshEvent event) {if (this.ready.get()) {log.debug("Event received " + event.getEventDesc());Set<String> keys = this.refresh.refresh();log.info("Refresh keys changed: " + keys);}}
}
?完成配置的監(jiān)聽之后,也就啟動了客戶端長輪詢定時任務(wù)。具體是在什么地方呢?客戶端在構(gòu)建ConfigService的時候,最終調(diào)用的代碼如下所示:
package com.alibaba.nacos.api.config;import com.alibaba.nacos.api.exception.NacosException;
import java.lang.reflect.Constructor;
import java.util.Properties;public class ConfigFactory {public ConfigFactory() {}public static ConfigService createConfigService(Properties properties) throws NacosException {try {//1.通過Class.forName來加載NacosConfigService類。Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");Constructor constructor = driverImplClass.getConstructor(Properties.class);//2.使用反射來完成ConfigService類的實例化。ConfigService vendorImpl = (ConfigService)constructor.newInstance(properties);return vendorImpl;} catch (Throwable var4) {throw new NacosException(-400, var4);}}public static ConfigService createConfigService(String serverAddr) throws NacosException {Properties properties = new Properties();properties.put("serverAddr", serverAddr);return createConfigService(properties);}
}
?ClientWorker是Nacos客戶端的一個工作類,它的構(gòu)造方法如下:
public ClientWorker(final HttpAgent agent, ConfigFilterChainManager configFilterChainManager, Properties properties) {this.agent = agent;this.configFilterChainManager = configFilterChainManager;this.init(properties);//第一個線程池executor只擁有一個核心線程,每隔10s就會執(zhí)行一次checkConfigInfo()方法,從方法名上可以知道每10s檢查一次配置信息。this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {public Thread newThread(Runnable r) {Thread t = new Thread(r);t.setName("com.alibaba.nacos.client.Worker." + agent.getName());t.setDaemon(true);return t;}});//第二個線程池executorService只完成了初始化,主要用于實現(xiàn)客戶端的定時長輪詢功能。this.executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {public Thread newThread(Runnable r) {Thread t = new Thread(r);t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());t.setDaemon(true);return t;}});//每隔10s執(zhí)行一次,檢查配置是否發(fā)生了變化this.executor.scheduleWithFixedDelay(new Runnable() {public void run() {try {ClientWorker.this.checkConfigInfo();} catch (Throwable var2) {ClientWorker.LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", var2);}}}, 1L, 10L, TimeUnit.MILLISECONDS);
}
?長輪詢機制細節(jié)本篇文章不做過多分析,后續(xù)補充。
總的來說,長輪詢機制主要目的是實現(xiàn)配置的拉取(默認30s),核心內(nèi)容是ClientWorker類中的LongPollingRunnable線程,完成了如下內(nèi)容:檢查配置是否有變更;同服務(wù)端建立長輪詢機制(/v1/cs/configs/listener);獲取遠程配置等。
核心源碼分析-服務(wù)端
服務(wù)端長輪詢處理入口如下:
/*** The client listens for configuration changes.*/
@PostMapping("/listener")
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void listener(HttpServletRequest request, HttpServletResponse response)throws ServletException, IOException {request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);String probeModify = request.getParameter("Listening-Configs");if (StringUtils.isBlank(probeModify)) {throw new IllegalArgumentException("invalid probeModify");}probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);Map<String, String> clientMd5Map;try {clientMd5Map = MD5Util.getClientMd5Map(probeModify);} catch (Throwable e) {throw new IllegalArgumentException("invalid probeModify");}// do long-pollinginner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}
/*** 輪詢接口.*/
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,Map<String, String> clientMd5Map, int probeRequestSize) throws IOException {// Long polling.if (LongPollingService.isSupportLongPolling(request)) {longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);return HttpServletResponse.SC_OK + "";}// Compatible with short polling logic.List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);// Compatible with short polling result.String oldResult = MD5Util.compareMd5OldResult(changedGroups);String newResult = MD5Util.compareMd5ResultString(changedGroups);String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);if (version == null) {version = "2.0.0";}int versionNum = Protocol.getVersionNumber(version);// Before 2.0.4 version, return value is put into header.if (versionNum < START_LONG_POLLING_VERSION_NUM) {response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);} else {request.setAttribute("content", newResult);}Loggers.AUTH.info("new content:" + newResult);// Disable cache.response.setHeader("Pragma", "no-cache");response.setDateHeader("Expires", 0);response.setHeader("Cache-Control", "no-cache,no-store");response.setStatus(HttpServletResponse.SC_OK);return HttpServletResponse.SC_OK + "";
}
上述代碼中,首先會判斷當(dāng)前請求是否為長輪詢(請求頭包含Long-Pulling-Timeout),如果是,則調(diào)用addLongPollingClient。
- 獲取客戶端請求的超時時間,減去500ms后賦值給timeout變量。
- 判斷isFixedPolling,如果為true,定時任務(wù)將會在30s后開始執(zhí)行;否則,在29.5s后開始執(zhí)行。
- 和服務(wù)端的數(shù)據(jù)進行MD5對比,如果發(fā)生過變化則直接返回。
- scheduler.execute執(zhí)行ClientLongPolling線程。
這里的核心是ClientLongPolling,它封裝了客戶端的長輪詢請求。
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,int probeRequestSize) {String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);String tag = req.getHeader("Vipserver-Tag");int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);long timeout = Math.max(10000, Long.parseLong(str) - delayTime);if (isFixedPolling()) {timeout = Math.max(10000, getFixedPollingInterval());// Do nothing but set fix polling timeout.} else {long start = System.currentTimeMillis();List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);if (changedGroups.size() > 0) {generateResponse(req, rsp, changedGroups);return;} else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {return;}}String ip = RequestUtil.getRemoteIp(req);// Must be called by http thread, or send response.final AsyncContext asyncContext = req.startAsync();// AsyncContext.setTimeout() is incorrect, Control by oneselfasyncContext.setTimeout(0L);ConfigExecutor.executeLongPolling(new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
}
class ClientLongPolling implements Runnable {@Overridepublic void run() {asyncTimeoutFuture = ConfigExecutor.scheduleLongPolling(new Runnable() {@Overridepublic void run() {try {getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());boolean removeFlag = allSubs.remove(ClientLongPolling.this);if (removeFlag) {if (isFixedPolling()) {List<String> changedGroups = MD5Util.compareMd5((HttpServletRequest) asyncContext.getRequest(),(HttpServletResponse) asyncContext.getResponse(), clientMd5Map);if (changedGroups.size() > 0) {sendResponse(changedGroups);} else {sendResponse(null);}} else {sendResponse(null);}} else {}} catch (Throwable t) {}}}, timeoutTime, TimeUnit.MILLISECONDS);allSubs.add(this);}void sendResponse(List<String> changedGroups) {if (null != asyncTimeoutFuture) {asyncTimeoutFuture.cancel(false);}generateResponse(changedGroups);}void generateResponse(List<String> changedGroups) {if (null == changedGroups) {asyncContext.complete();return;}HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();try {final String respString = MD5Util.compareMd5ResultString(changedGroups);// Disable cache.response.setHeader("Pragma", "no-cache");response.setDateHeader("Expires", 0);response.setHeader("Cache-Control", "no-cache,no-store");response.setStatus(HttpServletResponse.SC_OK);response.getWriter().println(respString);asyncContext.complete();} catch (Exception ex) {PULL_LOG.error(ex.toString(), ex);asyncContext.complete();}}ClientLongPolling(AsyncContext ac, Map<String, String> clientMd5Map, String ip, int probeRequestSize,long timeoutTime, String appName, String tag) {this.asyncContext = ac;this.clientMd5Map = clientMd5Map;this.probeRequestSize = probeRequestSize;this.createTime = System.currentTimeMillis();this.ip = ip;this.timeoutTime = timeoutTime;this.appName = appName;this.tag = tag;}final AsyncContext asyncContext;final Map<String, String> clientMd5Map;final long createTime;final String ip;final String appName;final String tag;final int probeRequestSize;final long timeoutTime;Future<?> asyncTimeoutFuture;@Overridepublic String toString() {return "ClientLongPolling{" + "clientMd5Map=" + clientMd5Map + ", createTime=" + createTime + ", ip='" + ip+ '\'' + ", appName='" + appName + '\'' + ", tag='" + tag + '\'' + ", probeRequestSize="+ probeRequestSize + ", timeoutTime=" + timeoutTime + '}';}
}
從上面的分析我們可以看出,所謂的長輪詢就是服務(wù)端收到請求之后,不立即返回,而是在延后(30-0.5)s才把請求結(jié)果返回給客戶端,這就使得客戶端和服務(wù)端之間在30s之內(nèi)數(shù)據(jù)沒有發(fā)生變化的情況下一直處于連接狀態(tài)。
這里我們可能會有一個疑問,定時任務(wù)是延時29.5s之后執(zhí)行的,并沒有達到我們說的實時通知的目的,那我們修改配置之后是如何做到實時通知的呢?
配置修改的實時通知
核心是通過發(fā)布訂閱機制以及DataChangeTask來實現(xiàn)的。
public LongPollingService() {allSubs = new ConcurrentLinkedQueue<ClientLongPolling>();ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 10L, TimeUnit.SECONDS);// Register LocalDataChangeEvent to NotifyCenter.NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize);// Register A Subscriber to subscribe LocalDataChangeEvent.NotifyCenter.registerSubscriber(new Subscriber() {@Overridepublic void onEvent(Event event) {if (isFixedPolling()) {// Ignore.} else {if (event instanceof LocalDataChangeEvent) {LocalDataChangeEvent evt = (LocalDataChangeEvent) event;ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));}}}@Overridepublic Class<? extends Event> subscribeType() {return LocalDataChangeEvent.class;}});}
class DataChangeTask implements Runnable {@Overridepublic void run() {try {ConfigCacheService.getContentBetaMd5(groupKey);for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {ClientLongPolling clientSub = iter.next();if (clientSub.clientMd5Map.containsKey(groupKey)) {// If published tag is not in the beta list, then it skipped.if (isBeta && !CollectionUtils.contains(betaIps, clientSub.ip)) {continue;}// If published tag is not in the tag list, then it skipped.if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {continue;}getRetainIps().put(clientSub.ip, System.currentTimeMillis());iter.remove(); // Delete subscribers' relationships.clientSub.sendResponse(Arrays.asList(groupKey));}}} catch (Throwable t) {}}DataChangeTask(String groupKey, boolean isBeta, List<String> betaIps) {this(groupKey, isBeta, betaIps, null);}DataChangeTask(String groupKey, boolean isBeta, List<String> betaIps, String tag) {this.groupKey = groupKey;this.isBeta = isBeta;this.betaIps = betaIps;this.tag = tag;}final String groupKey;final long changeTime = System.currentTimeMillis();final boolean isBeta;final List<String> betaIps;final String tag;
}
遍歷隊列allSubs中的所有客戶端長輪詢請求,比較每一個客戶端長輪詢請求攜帶的groupKey,如果服務(wù)端變更的配置和客戶端請求關(guān)注的配置一致,則直接返回。
主流配置中心對比
這里我們對比幾個常用的配置中心,Nacos、Apollo和Spring Cloud Config。下面是網(wǎng)上找到的一個比較全面的比較內(nèi)容,供大家參考:
對于Spring Cloud Config、Apollo和Nacos這三個開源的配置中心中間件,Spring Cloud自帶的Config配置中心依賴git,性能較差;Apollo相比較而已,功能更加完善,相比其他配置中心,它內(nèi)置支持CAT,性能也算OK;Nacos畢竟是阿里開源,經(jīng)過線上的各種考驗,性能最優(yōu)??偟膩碚f,對于配置中心的選型,能滿足我們的需求就行,再就是考慮和現(xiàn)有團隊技術(shù)棧的一個吻合度。
小小收獲
通過對Nacos由表及里的學(xué)習(xí),我們能從中學(xué)習(xí)到的知識點包括以下這些方面,如果有大家感興趣的點,建議大家去深入了解和學(xué)習(xí)一下,作為對自己的一個技術(shù)能力提升。
- SpringBoot啟動原理(梳理啟動原理找到配置獲取的入口)
- 【重要】SpringBoot自動裝配機制SPI(這個比較重要,Dubbo的核心思想之一,實現(xiàn)方式不同但是思想一致)
- Spring事件發(fā)布與訂閱(閱讀源碼過程中,很多場景有涉及)
- Spring應(yīng)用程序環(huán)境配置封裝類Environment(配置項的載體)
- 反射機制(Java基礎(chǔ)內(nèi)容)
- 長輪詢機制(配置中心核心內(nèi)容)
- 線程池(定時線程池,延遲特定時間之后執(zhí)行)
- 隊列(長輪詢機制實現(xiàn),客戶端請求封裝之后全部放入隊列,然后慢慢處理)