国产亚洲精品福利在线无卡一,国产精久久一区二区三区,亚洲精品无码国模,精品久久久久久无码专区不卡

當前位置: 首頁 > news >正文

深圳網(wǎng)站建設網(wǎng)站制作網(wǎng)站推廣濰坊seo網(wǎng)絡推廣

深圳網(wǎng)站建設網(wǎng)站制作網(wǎng)站推廣,濰坊seo網(wǎng)絡推廣,WordPress 升級 php,wordpress網(wǎng)站合并基于Dubbo 3.1,詳細介紹了Dubbo服務的發(fā)布與引用的源碼。 此前我們學習了接口級的服務引入訂閱的refreshInterfaceInvoker方法,當時還有最為關鍵的notify服務通知更新的部分源碼沒有學習,本次我們來學習notify通知本地服務更新的源碼。 Dubb…

基于Dubbo 3.1,詳細介紹了Dubbo服務的發(fā)布與引用的源碼。

此前我們學習了接口級的服務引入訂閱的refreshInterfaceInvoker方法,當時還有最為關鍵的notify服務通知更新的部分源碼沒有學習,本次我們來學習notify通知本地服務更新的源碼。

Dubbo 3.x服務引用源碼:

  1. Dubbo 3.x源碼(11)—Dubbo服務的發(fā)布與引用的入口
  2. Dubbo 3.x源碼(18)—Dubbo服務引用源碼(1)
  3. Dubbo 3.x源碼(19)—Dubbo服務引用源碼(2)
  4. Dubbo 3.x源碼(20)—Dubbo服務引用源碼(3)
  5. Dubbo 3.x源碼(21)—Dubbo服務引用源碼(4)
  6. Dubbo 3.x源碼(22)—Dubbo服務引用源碼(5)服務引用bean的獲取以及懶加載原理
  7. Dubbo 3.x源碼(23)—Dubbo服務引用源碼(6)MigrationRuleListener遷移規(guī)則監(jiān)聽器
  8. Dubbo 3.x源碼(24)—Dubbo服務引用源碼(7)接口級服務發(fā)現(xiàn)訂閱refreshInterfaceInvoker
  9. Dubbo 3.x源碼(25)—Dubbo服務引用源碼(8)notify訂閱服務通知更新

Dubbo 3.x服務發(fā)布源碼:

  1. Dubbo 3.x源碼(11)—Dubbo服務的發(fā)布與引用的入口
  2. Dubbo 3.x源碼(12)—Dubbo服務發(fā)布導出源碼(1)
  3. Dubbo 3.x源碼(13)—Dubbo服務發(fā)布導出源碼(2)
  4. Dubbo 3.x源碼(14)—Dubbo服務發(fā)布導出源碼(3)
  5. Dubbo 3.x源碼(15)—Dubbo服務發(fā)布導出源碼(4)
  6. Dubbo 3.x源碼(16)—Dubbo服務發(fā)布導出源碼(5)
  7. Dubbo 3.x源碼(17)—Dubbo服務發(fā)布導出源碼(6)

1 notify服務通知更新

當?shù)谝淮斡嗛喎展?jié)點,或者服務節(jié)點目錄的子節(jié)點更新時,例如新的producer上下線,將會調(diào)用notify服務通知更新的方法,會更新本地緩存的數(shù)據(jù)。

notify方法的入口是FailbackRegistry的notify方法。

/*** FailbackRegistry的方法* <p>* 服務通知** @param url      consumer side url* @param listener listener* @param urls     provider latest urls*/
@Override
protected void notify(URL url, NotifyListener listener, List<URL> urls) {if (url == null) {throw new IllegalArgumentException("notify url == null");}if (listener == null) {throw new IllegalArgumentException("notify listener == null");}try {/** 調(diào)用doNotify方法更新*/doNotify(url, listener, urls);} catch (Exception t) {// Record a failed registration request to a failed listlogger.error("Failed to notify addresses for subscribe " + url + ", cause: " + t.getMessage(), t);}
}
/*** FailbackRegistry的方法* <p>* 服務通知*/
protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {//調(diào)用父類AbstractRegistry的方法super.notify(url, listener, urls);
}

2 AbstractRegistry#notify通知更新

該方法涉及兩個重要知識點:

  1. 一是對于拉取到的服務節(jié)點url按照類別providers、configurators 、routers進行分類,然后遍歷每個類別,依次調(diào)用RegistryDirectory#notify方法觸發(fā)監(jiān)聽回調(diào),進行服務數(shù)據(jù)的更新。
  2. 二是RegistryDirectory#notify方法通知執(zhí)行完畢之后,調(diào)用saveProperties方法更新緩存文件。當注冊中心由于網(wǎng)絡抖動而訂閱失敗時,至少可以返回現(xiàn)有的緩存的URL。
/*** AbstractRegistry的方法* <p>* 通知更新** @param url      consumer side url* @param listener listener* @param urls     provider latest urls*/
protected void notify(URL url, NotifyListener listener, List<URL> urls) {if (url == null) {throw new IllegalArgumentException("notify url == null");}if (listener == null) {throw new IllegalArgumentException("notify listener == null");}if ((CollectionUtils.isEmpty(urls)) && !ANY_VALUE.equals(url.getServiceInterface())) {// 1-4 Empty address.logger.warn("1-4", "", "", "Ignore empty notify urls for subscribe url " + url);return;}if (logger.isInfoEnabled()) {logger.info("Notify urls for subscribe url " + url + ", url size: " + urls.size());}//根據(jù)節(jié)點類別對url進行分類Map<String, List<URL>> result = new HashMap<>();//遍歷url,進行分類for (URL u : urls) {//服務消費者和服務提供者的服務接口名匹配if (UrlUtils.isMatch(url, u)) {//獲取url的category類別,默認providers,同時服務提供者urlServiceAddressURL固定返回providersString category = u.getCategory(DEFAULT_CATEGORY);//將url加入到對應類別的categoryList中List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());categoryList.add(u);}}//result,一般有三個元素,即三個類別,providers、configurators 、routersif (result.size() == 0) {return;}Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());//遍歷每一個類別for (Map.Entry<String, List<URL>> entry : result.entrySet()) {//獲取類別String category = entry.getKey();List<URL> categoryList = entry.getValue();//存入categoryNotifiedcategoryNotified.put(category, categoryList);//執(zhí)行l(wèi)eitener的notify方法進行通知,listener可以是RegistryDirectory/** RegistryDirectory#notify通知*/listener.notify(categoryList);/** 本地緩存*/// We will update our cache file after each notification.// When our Registry has a subscribed failure due to network jitter, we can return at least the existing cache URL.//將在每次通知后更新緩存文件。當注冊中心由于網(wǎng)絡抖動而訂閱失敗時,至少可以返回現(xiàn)有的緩存的URL。//本地緩存,默認支持if (localCacheEnabled) {saveProperties(url);}}
}

3 RegistryDirectory#notify更新本地內(nèi)存信息

該方法根據(jù)url更新RegistryDirectory對象的內(nèi)存信息,將可能會更新RegistryDirectory 內(nèi)部的configurators配置信息集合,routerChain路由鏈以及urlInvokerMap緩存。

在最后,會專門調(diào)用refreshOverrideAndInvoker方法,將服務提供者url轉換為invoker,進行服務提供者的更新。

/*** RegistryDirectory的方法* * 服務變更通知* @param urls 服務提供者注冊信息列表*/
@Override
public synchronized void notify(List<URL> urls) {if (isDestroyed()) {return;}Map<String, List<URL>> categoryUrls = urls.stream().filter(Objects::nonNull)//類別合法性過濾.filter(this::isValidCategory).filter(this::isNotCompatibleFor26x)//根據(jù)類別分組.collect(Collectors.groupingBy(this::judgeCategory));//獲取配置信息url集合,可以為空List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());//將配置信息url轉換為Configurator集合,并賦值給configurators屬性,可以為空this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);//獲取路由信息url集合,可以為空List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());//將配置信息url轉換為Router集合,并加入routerChain路由鏈,可以為空toRouters(routerURLs).ifPresent(this::addRouters);// providers//獲取服務提供者url集合,可以為空List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());// 3.x added for extend URL address//添加擴展URL地址 3.x的特性ExtensionLoader<AddressListener> addressListenerExtensionLoader = getUrl().getOrDefaultModuleModel().getExtensionLoader(AddressListener.class);//獲取AddressListener,默認空集合List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);if (supportedListeners != null && !supportedListeners.isEmpty()) {for (AddressListener addressListener : supportedListeners) {providerURLs = addressListener.notify(providerURLs, getConsumerUrl(), this);}}/** 將服務提供者url轉換為invoker,進行服務提供者的更新*/refreshOverrideAndInvoker(providerURLs);
}

3.1 refreshOverrideAndInvoker刷新invoker

該方法將服務提供者url轉換為invoker,進行服務提供者的更新,這在consumer對producer的信息更新部分是非常重要的一個方法。

url轉換規(guī)則為:

  1. 如果URL已轉換為invoker,則不再重新引用它并直接從緩存獲取它,請注意,URL中的任何參數(shù)更改都將被重新引用。
  2. 如果傳入invoker列表不為空,則表示它是最新的invoker列表。
  3. 如果傳入invokerUrl的列表為空,則意味著該規(guī)則只是一個覆蓋規(guī)則或路由規(guī)則,需要重新對比以決定是否重新引用。
/*** RegistryDirectory的方法* <p>* 將服務提供者url轉換為invoker,進行服務提供者的更新** @param urls 服務提供者url*/
private synchronized void refreshOverrideAndInvoker(List<URL> urls) {// mock zookeeper://xxx?mock=return nullrefreshInvoker(urls);
}/*** 將invokerURL列表轉換為Invoker Map** @param invokerUrls this parameter can't be null*/
private void refreshInvoker(List<URL> invokerUrls) {Assert.notNull(invokerUrls, "invokerUrls should not be null");//如果只有一個協(xié)議為empty的url,表示最新注冊中心沒有任何該服務提供者url信息if (invokerUrls.size() == 1&& invokerUrls.get(0) != null&& EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {//設置為禁止訪問this.forbidden = true; // Forbid to access//設置routerChain的服務提供者invoker集合為一個空集合routerChain.setInvokers(BitList.emptyList());//關閉urlInvokerMap中的所有服務提供者invokerdestroyAllInvokers(); // Close all invokers}//表明可能存在服務提供者urlelse {//允許訪問this.forbidden = false; // Allow to accessif (invokerUrls == Collections.<URL>emptyList()) {invokerUrls = new ArrayList<>();}// use local reference to avoid NPE as this.cachedInvokerUrls will be set null by destroyAllInvokers().//使用本地引用來避免NPE。cachedInvokerUrls將被destroyAllInvokers()方法設置為空。Set<URL> localCachedInvokerUrls = this.cachedInvokerUrls;//空的服務提供者url集合if (invokerUrls.isEmpty() && localCachedInvokerUrls != null) {// 1-4 Empty address.logger.warn("1-4", "configuration ", "","Service" + serviceKey + " received empty address list with no EMPTY protocol set, trigger empty protection.");invokerUrls.addAll(localCachedInvokerUrls);} else {//緩存的invoker url,便于比較localCachedInvokerUrls = new HashSet<>();localCachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparisonthis.cachedInvokerUrls = localCachedInvokerUrls;}if (invokerUrls.isEmpty()) {return;}// use local reference to avoid NPE as this.urlInvokerMap will be set null concurrently at destroyAllInvokers().//使用本地引用來避免NPE。urlInvokerMap將在destroyAllInvokers()方法設置為空。Map<URL, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap;// can't use local reference as oldUrlInvokerMap's mappings might be removed directly at toInvokers().//不能使用本地引用,因為oldUrlInvokerMap的映射可能會直接在toInvokers()中刪除。Map<URL, Invoker<T>> oldUrlInvokerMap = null;if (localUrlInvokerMap != null) {// the initial capacity should be set greater than the maximum number of entries divided by the load factor to avoid resizing.oldUrlInvokerMap = new LinkedHashMap<>(Math.round(1 + localUrlInvokerMap.size() / DEFAULT_HASHMAP_LOAD_FACTOR));localUrlInvokerMap.forEach(oldUrlInvokerMap::put);}/** 將URL轉換為Invoker*/Map<URL, Invoker<T>> newUrlInvokerMap = toInvokers(oldUrlInvokerMap, invokerUrls);// Translate url list to Invoker map/** If the calculation is wrong, it is not processed.** 1. The protocol configured by the client is inconsistent with the protocol of the server.*    eg: consumer protocol = dubbo, provider only has other protocol services(rest).* 2. The registration center is not robust and pushes illegal specification data.**/if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {// 3-1 - Failed to convert the URL address into Invokers.logger.error("3-1", "inconsistency between the client protocol and the protocol of the server","", "urls to invokers error",new IllegalStateException("urls to invokers error. invokerUrls.size :" +invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));return;}List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));this.setInvokers(multiGroup ? new BitList<>(toMergeInvokerList(newInvokers)) : new BitList<>(newInvokers));// pre-route and build cache//invoker集合存入routerChain的invokers屬性routerChain.setInvokers(this.getInvokers());//設置urlInvokerMap為新的urlInvokerMapthis.urlInvokerMap = newUrlInvokerMap;try {//銷毀無用 InvokerdestroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker} catch (Exception e) {logger.warn("destroyUnusedInvokers error. ", e);}// 通知invoker刷新this.invokersChanged();}
}

3.2 toInvokers將URL轉換為Invoker

將url轉換為Invoker,如果url已被引用,將不會重新引用。將放入newUrlInvokeMap的項將從oldUrlInvokerMap中刪除。

該方法的大概邏輯為:

  1. 獲取獲取消費者需要查詢過濾的協(xié)議,遍歷全部最新服務提供者url,依次進行如下操作:
  2. 調(diào)用checkProtocolValid方法,校驗當前提供者url協(xié)議是否支持當前服務消費者調(diào)用,如果不支持則跳過該提供者。服務消費者可以手動指定消費某些協(xié)議的服務提供者,其他的服務提供者將被丟棄。
  3. 調(diào)用mergeUrl方法,合并服務提供者url的配置,合并覆蓋順序是:override > -D參數(shù) >Consumer配置 > Provider配置,從這里可以知道消費者的配置優(yōu)先級大于提供者的配置。
  4. 從原來的緩存中獲取該url對應的invoker:
    1. 如果已經(jīng)存在該緩存,那么直接將緩存的invoker加入到新的invoker map緩存中,不再從新引用。
    2. 如果緩存沒有該url對應的invoker,那么將會重新引用該invoker,并將新引入的invoker加入到新的invoker map緩存中。
  5. 返回最新的url到invoker的緩存map。
/*** RegistryDirectory的的方法** 將url轉換為Invoker,如果url已被引用,將不會重新引用。將放入newUrlInvokeMap的項將從oldUrlInvokerMap中刪除。** @param oldUrlInvokerMap 此前的url到invoker的映射* @param urls 最新服務提供者url集合* @return invokers 最新的url到invoker的映射*/
private Map<URL, Invoker<T>> toInvokers(Map<URL, Invoker<T>> oldUrlInvokerMap, List<URL> urls) {//新的映射mapMap<URL, Invoker<T>> newUrlInvokerMap = new ConcurrentHashMap<>(urls == null ? 1 : (int) (urls.size() / 0.75f + 1));if (urls == null || urls.isEmpty()) {return newUrlInvokerMap;}//獲取消費者需要查詢過濾的協(xié)議String queryProtocols = this.queryMap.get(PROTOCOL_KEY);//遍歷最新服務提供者url集合for (URL providerUrl : urls) {//校驗當前提供者url協(xié)議是否支持當前服務消費者調(diào)用,如果不支持則跳過該提供者//服務消費者可以手動指定消費某些協(xié)議的服務提供者,其他的服務提供者將被丟棄if (!checkProtocolValid(queryProtocols, providerUrl)) {continue;}//合并服務提供者url的配置,合并覆蓋順序是:override > -D參數(shù) >Consumer配置 > Provider配置//從這里可以知道消費者的配置優(yōu)先級大于提供者的配置URL url = mergeUrl(providerUrl);// Cache key is url that does not merge with consumer side parameters,// regardless of how the consumer combines parameters,// if the server url changes, then refer again//從原來的緩存中獲取該url對應的invokerInvoker<T> invoker = oldUrlInvokerMap == null ? null : oldUrlInvokerMap.remove(url);//如果緩存沒有該url對應的invoker,那么將會重新引用該invokerif (invoker == null) { // Not in the cache, refer againtry {boolean enabled = true;if (url.hasParameter(DISABLED_KEY)) {enabled = !url.getParameter(DISABLED_KEY, false);} else {enabled = url.getParameter(ENABLED_KEY, true);}//如果啟用服務if (enabled) {//再次通過Protocol$Adaptive的refer方法引用該服務提供者//在最開始我們就是通過refer方法引用服務的,在再次見到這個方法,只不過這里的url已經(jīng)變成了某個服務提供者的url了invoker = protocol.refer(serviceType, url);}} catch (Throwable t) {// Thrown by AbstractProtocol.optimizeSerialization()if (t instanceof RpcException && t.getMessage().contains("serialization optimizer")) {// 4-2 - serialization optimizer class initialization failed.logger.error("4-2", "typo in optimizer class", "","Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);} else {// 4-3 - Failed to refer invoker by other reason.logger.error("4-3", "", "","Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);}}//加入到新的invoker map緩存中if (invoker != null) { // Put new invoker in cachenewUrlInvokerMap.put(url, invoker);}} else {//如果已經(jīng)存在該緩存,那么直接將緩存的invoker加入到新的invoker map緩存中,不再從新引用newUrlInvokerMap.put(url, invoker);}}//返回新的invoker mapreturn newUrlInvokerMap;
}

在上面的步驟中,如果是首次啟動消費者,將會統(tǒng)一走Protocol$Adaptive的refer方法引用該服務提供者的邏輯。還記得在最開始講consumer服務引入的時候嗎,那時候我們就是通過這個refer方法引用服務的,現(xiàn)在再次見到這個方法,只不過此前的url則是注冊中心協(xié)議url,對應著RegistryProtocol,而這里的url已經(jīng)變成了某個服務提供者的url了,對應著具體的協(xié)議實現(xiàn),例如DubboProtocol、RestProtocol。

我們此前就講過了Protocol$Adaptive的refer方法實際上返回的是被wrapper包裝的Protocol,這里我們直接看最底層的Protocol的refer方法,以默認協(xié)議dubbo協(xié)議的Protocol實現(xiàn)DubboProtocol為例子!

4 DubboProtocol#refer dubbo協(xié)議服務引入

該方法執(zhí)行基于dubbo序列化協(xié)議的服務引入,最終會創(chuàng)建一個DubboInvoker,內(nèi)部包含一個nettyClient,已經(jīng)與對應的服務提供者的nettyServer建立了連接,可用于發(fā)起rpc遠程調(diào)用請求。

/*** DubboProtocol的方法** @param type 服務類型* @param url  遠程服務提供者url* @return* @param <T>* @throws RpcException*/
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {//銷毀檢測checkDestroyed();//協(xié)議綁定引用return protocolBindingRefer(type, url);
}@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {//銷毀檢測checkDestroyed();//序列化優(yōu)化optimizeSerialization(url);// create rpc invoker.//創(chuàng)建一個DubboInvoker,可用于發(fā)起rpc遠程調(diào)用DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);//加入?yún)f(xié)議緩存invokersinvokers.add(invoker);return invoker;
}

4.1 getClients獲取服務客戶端

該方法獲取服務提供者網(wǎng)絡調(diào)用客戶端。這里會判斷是否使用共享連接,因為一個服務提供者根提供了很多的服務接口,這個的是否共享連接,實際上就是指的消費者引入時候,是這些服務接口是否共用一些客戶端連接(默認一個),或者說不同的服務接口使用獨立的客戶端連接(默認一個服務一個連接)。默認是共享連接

/*** DubboProtocol的方法* 獲取服務客戶端** @param url 服務提供者url* @return ExchangeClient數(shù)組*/
private ExchangeClient[] getClients(URL url) {//獲取配置的連接數(shù),默認為0int connections = url.getParameter(CONNECTIONS_KEY, 0);// whether to share connection// if not configured, connection is shared, otherwise, one connection for one service//是否共享連接,如果沒有配置connections,那么連接是共享的,否則,一個服務連接一個服務if (connections == 0) {/** The xml configuration should have a higher priority than properties.* 共享連接配置,xml配置的優(yōu)先級應該高于屬性*/String shareConnectionsStr = StringUtils.isBlank(url.getParameter(SHARE_CONNECTIONS_KEY, (String) null))? ConfigurationUtils.getProperty(url.getOrDefaultApplicationModel(), SHARE_CONNECTIONS_KEY, DEFAULT_SHARE_CONNECTIONS): url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);connections = Integer.parseInt(shareConnectionsStr);//獲取共享客戶端List<ReferenceCountExchangeClient> shareClients = getSharedClient(url, connections);//設置到ExchangeClient數(shù)組中ExchangeClient[] clients = new ExchangeClient[connections];Arrays.setAll(clients, shareClients::get);return clients;}//非共享連接,表示當前服務接口使用單獨的連接ExchangeClient[] clients = new ExchangeClient[connections];for (int i = 0; i < clients.length; i++) {//初始化新的客戶端clients[i] = initClient(url);}return clients;
}

4.2 getSharedClient獲取共享客戶端連接

如果是共享連接配置,那么調(diào)用getSharedClient方法獲取共享客戶端連接,默認連接數(shù)為1。該方法的大概步驟為:

  1. 首先獲取服務提供者ip:port 作為共享連接的key,即共享連接情況下,同一個服務提供者實例下的所有服務接口共享某些連接。
  2. 從緩存referenceClientMap獲取key對應的共享客戶端連接。
  3. 如果存在緩存,并且客戶端連接全部可用,那么增加連接技術,然后返回即可。否則,只要有一個客戶端不可用,就需要用可用的客戶端替換不可用的客戶端。
  4. 如果此前沒有該key的客戶端連接緩存或者連接不是全部可用,都要走下面的步驟,嘗試新創(chuàng)建連接。
  5. synchronized鎖,在鎖代碼中再次雙重檢測,注意這里還有線程等待喚醒機制。
  6. 最后判斷如果客戶端連接為空,那么調(diào)用buildReferenceCountExchangeClientList方法構建指定數(shù)量的客戶端連接。如果連接不為空,那么遍歷連接,判斷如果該連接不可用,那么新創(chuàng)建一個連接補充進來。
  7. 最后的處理仍需要加synchronized鎖,判斷如果最終沒建立連接,那么移除無效緩存,否則將最終的客戶端連接存入緩存,最后喚醒其他等待的線程。

該方法的核心知識點有兩個,一個是buildReferenceCountExchangeClientList方法構建指定數(shù)量的客戶端連接,另一個就是方法中的synchronized鎖以及等待喚醒機制。

為什么需要等待喚醒呢?因為這是共享客戶端,那么可能有多個線程都在初始化同一個ip:port的多個客戶端,為了避免沖突,需要加鎖。

/*** DubboProtocol的方法* <p>* 獲取共享客戶端連接** @param url        服務提供者url* @param connectNum 共享連接數(shù)量,默認1*/
@SuppressWarnings("unchecked")
private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {//獲取 服務提供者ip:port 作為共享連接的keyString key = url.getAddress();//從緩存獲取key對應的共享客戶端連接Object clients = referenceClientMap.get(key);if (clients instanceof List) {//轉換為ReferenceCountExchangeClient集合,帶有引用計數(shù)的功能List<ReferenceCountExchangeClient> typedClients = (List<ReferenceCountExchangeClient>) clients;//檢測客戶端連接是否全部可用//只要有一個客戶端不可用,就需要用可用的客戶端替換不可用的客戶端。if (checkClientCanUse(typedClients)) {//如果可用//增加連接的引用計數(shù),如果我們創(chuàng)建新的調(diào)用者共享相同的連接,連接將關閉,沒有任何引用batchClientRefIncr(typedClients);return typedClients;}}//如果此前沒有該key的連接緩存,那么新創(chuàng)建List<ReferenceCountExchangeClient> typedClients = null;synchronized (referenceClientMap) {//死循環(huán)for (; ; ) {// guarantee just one thread in loading condition. And Other is waiting It had finished.//雙重檢測鎖clients = referenceClientMap.get(key);if (clients instanceof List) {typedClients = (List<ReferenceCountExchangeClient>) clients;if (checkClientCanUse(typedClients)) {batchClientRefIncr(typedClients);return typedClients;} else {//如果共享連接不是全部可用,那么緩存值先設置為為一個object對象,跳出循環(huán)referenceClientMap.put(key, PENDING_OBJECT);break;}}//如果客戶端連接PENDING_OBJECT,那么表示有其他線程正在初始化當前客戶端連接,那么當前線程等待直到被通知else if (clients == PENDING_OBJECT) {try {referenceClientMap.wait();} catch (InterruptedException ignored) {}}//如果沒有共享連接,那么緩存值先設置為為一個object對象,跳出循環(huán)else {referenceClientMap.put(key, PENDING_OBJECT);break;}}}try {//連接數(shù)量必須大于等于1connectNum = Math.max(connectNum, 1);// If the clients is empty, then the first initialization is//如果客戶端連接為空if (CollectionUtils.isEmpty(typedClients)) {/** 構建客戶端連接*/typedClients = buildReferenceCountExchangeClientList(url, connectNum);}//如果連接不為空else {//遍歷連接for (int i = 0; i < typedClients.size(); i++) {//如果該連接不可用,那么新創(chuàng)建一個連接補充進來ReferenceCountExchangeClient referenceCountExchangeClient = typedClients.get(i);// If there is a client in the list that is no longer available, create a new one to replace him.if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {typedClients.set(i, buildReferenceCountExchangeClient(url));continue;}referenceCountExchangeClient.incrementAndGetCount();}}} finally {synchronized (referenceClientMap) {//如果最終沒建立連接,那么移除無效緩存if (typedClients == null) {referenceClientMap.remove(key);} else {//將最終的客戶端連接存入緩存referenceClientMap.put(key, typedClients);}//喚醒其他線程referenceClientMap.notifyAll();}}return typedClients;
}

4.3 buildReferenceCountExchangeClientList構建客戶端連接

該方法構建指定數(shù)量的引用計數(shù)交換器客戶端,內(nèi)部循環(huán)調(diào)用buildReferenceCountExchangeClient方法構建耽單個客戶端連接,內(nèi)部調(diào)用initClient方法,初始化交換器客戶端,啟動一個nettyClient并與服務端建立了連接。

/*** DubboProtocol的方法* 構建指定數(shù)量的引用計數(shù)交換器客戶端** @param url 服務提供者url* @param connectNum 客戶端數(shù)量* @return*/
private List<ReferenceCountExchangeClient> buildReferenceCountExchangeClientList(URL url, int connectNum) {List<ReferenceCountExchangeClient> clients = new ArrayList<>();//循環(huán)調(diào)用buildReferenceCountExchangeClient方法for (int i = 0; i < connectNum; i++) {clients.add(buildReferenceCountExchangeClient(url));}return clients;
}/*** 構建一個引用計數(shù)交換器客戶端** @param url* @return*/
private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {//初始化交換器客戶端,啟動一個nettyClient并與服務端建立了連接ExchangeClient exchangeClient = initClient(url);//創(chuàng)建ReferenceCountExchangeClientReferenceCountExchangeClient client = new ReferenceCountExchangeClient(exchangeClient, DubboCodec.NAME);// read configs//獲取服務器關閉等待超時時間,默認10000msint shutdownTimeout = ConfigurationUtils.getServerShutdownTimeout(url.getScopeModel());client.setShutdownWaitTime(shutdownTimeout);return client;
}

4.4 initClient建立客戶端連接

該方法創(chuàng)建客戶端連接,大概步驟為:

  1. 首先獲取客戶端底層通信框架類型,應該和服務端的底層通信框統(tǒng)一,默認netty。
  2. 用ServiceConfigURL替換InstanceAddressURL,協(xié)議為dubbo協(xié)議。
  3. 獲取lazy參數(shù),判斷連接是否懶加載,默認false,即餓加載。如果懶加載,那么只有在第一次調(diào)用服務時才會創(chuàng)建與服務端的連接,否則立即調(diào)用Exchangers.connect(url, requestHandler)方法與服務端建立底層通信客戶端連接。

默認情況下,客戶端為餓加載,客戶端與服務端的連接,在消費者客戶端啟動引用服務的時候就已經(jīng)建立了,即服務提供者url轉換為invoker的時候,就已經(jīng)建立了連接。

/*** DubboProtocol的方法* 創(chuàng)建一個新的連接** @param url 服務提供者url*/
private ExchangeClient initClient(URL url) {/** Instance of url is InstanceAddressURL, so addParameter actually adds parameters into ServiceInstance,* which means params are shared among different services. Since client is shared among services this is currently not a problem.*///獲取客戶端底層通信框架類型,應該和服務端的底層通信框統(tǒng)一,默認nettyString str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));// BIO is not allowed since it has severe performance issue.//不允許使用BIO,因為它有嚴重的性能問題,目前都是使用netty4if (StringUtils.isNotEmpty(str) && !url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).hasExtension(str)) {throw new RpcException("Unsupported client type: " + str + "," +" supported client type is " + StringUtils.join(url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));}try {// Replace InstanceAddressURL with ServiceConfigURL.//用ServiceConfigURL替換InstanceAddressURL,協(xié)議為dubbo協(xié)議url = new ServiceConfigURL(DubboCodec.NAME, url.getUsername(), url.getPassword(), url.getHost(), url.getPort(), url.getPath(), url.getAllParameters());url = url.addParameter(CODEC_KEY, DubboCodec.NAME);// enable heartbeat by default//默認啟用心跳url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));//連接是否懶加載,默認false,即餓加載return url.getParameter(LAZY_CONNECT_KEY, false)//如果懶加載,那么只有在第一次調(diào)用服務時才會創(chuàng)建與服務端的連接? new LazyConnectExchangeClient(url, requestHandler)//餓加載,與服務端建立底層通信客戶端連接: Exchangers.connect(url, requestHandler);} catch (RemotingException e) {throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);}
}

4.5 Exchangers#connect建立連接

該方法和我們此前學習的服務提供者的Exchangers#bind方法類型,只不過bind方法創(chuàng)建服務端,該方法創(chuàng)建客戶端。

該方法內(nèi)部基于Dubbo SPI獲取Exchanger,默認HeaderExchanger,然后調(diào)用HeaderExchanger#connect方法。

/*** Exchangers的方法** 客戶端建立與服務端的連接** @param url 服務提供者url* @param handler 請求處理器* @return 客戶端連接*/
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {if (url == null) {throw new IllegalArgumentException("url == null");}if (handler == null) {throw new IllegalArgumentException("handler == null");}//基于Dubbo SPI獲取Exchanger,默認HeaderExchanger,然后調(diào)用HeaderExchanger#connect方法return getExchanger(url).connect(url, handler);
}

HeaderExchanger#connect方法中,首先對handler進行包裝:DecodeHandler -> HeaderExchangeHandler -> requestHandler。

  1. DecodeHandler用于負責內(nèi)部的dubbo協(xié)議的請求解碼。
  2. HeaderExchangeHandler用于完成請求響應的映射。
  3. requestHandler用于nettyHandler真正處理請求。

隨后調(diào)用Transporters#connect方法啟動底層遠程網(wǎng)絡通信客戶端,返回Client。Transporter是Dubbo對網(wǎng)絡傳輸層的抽象接口,Exchanger依賴于Transporter。

最后基于Client構建HeaderExchangeClient返回。


@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {//包裝handler:DecodeHandler -> HeaderExchangeHandler -> handler//調(diào)用Transporters#connect方法啟動底層遠程網(wǎng)絡通信客戶端,返回Client//基于Client構建HeaderExchangeClient返回return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}

Transporters#connect方法將會在handler的最外層繼續(xù)包裝一層ChannelHandlerDispatcher,它所有的 ChannelHandler 接口實現(xiàn)都會調(diào)用其中每個 ChannelHandler 元素的相應方法。隨后基于Dubbo SPI機制獲取Transporter的實現(xiàn),并調(diào)用connect方法完成綁定,目前僅NettyTransporter,基于netty4。

public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {if (url == null) {throw new IllegalArgumentException("url == null");}//繼續(xù)包裝一層ChannelHandlerDispatcherChannelHandler handler;if (handlers == null || handlers.length == 0) {handler = new ChannelHandlerAdapter();} else if (handlers.length == 1) {handler = handlers[0];} else {handler = new ChannelHandlerDispatcher(handlers);}//基于Dubbo SPI機制獲取Transporter的實現(xiàn),并調(diào)用connect方法完成綁定return getTransporter(url).connect(url, handler);
}

4.6 NettyTransporter#connect創(chuàng)建NettyClient

該方法很簡單,就是根據(jù)url和handler創(chuàng)建一個NettyClient實例,在NettyClient的構造器中,會調(diào)用doOpen()開啟客戶端,創(chuàng)建Bootstrap,設置EventLoopGroup,編配ChannelHandlerPipeline,隨后調(diào)用connect方法連接服務提供者所在服務端。

@Override
public Client connect(URL url, ChannelHandler handler) throws RemotingException {//基于url和handler創(chuàng)建NettyClientreturn new NettyClient(url, handler);
}

NettyClient的構造器如下,將會調(diào)用父類構造器啟動客戶端。

public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.// the handler will be wrapped: MultiMessageHandler->HeartbeatHandler->handler//可通過CommonConstants中的THREAD_NAME_KEY和THREAD_POOL_KEY自定義客戶端線程池的名稱和類型//繼續(xù)包裝handler: MultiMessageHandler->HeartbeatHandler->handlersuper(url, wrapChannelHandler(url, handler));
}

AbstractClient的構造器如下,將會獲取綁定的ip和端口以及其他參數(shù),然后調(diào)用doOpen方法真正的開啟netty客戶端,最后調(diào)用connect方法連接服務提供者所在服務端。

public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {super(url, handler);// set default needReconnect true when channel is not connected//當通道未連接時設置默認needReconnect為trueneedReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, true);//初始化執(zhí)行器,消費者的執(zhí)行程序是全局共享的,提供者ip不需要是線程名的一部分。initExecutor(url);try {/** 創(chuàng)建netty客戶端*/doOpen();} catch (Throwable t) {close();throw new RemotingException(url.toInetSocketAddress(), null,"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);}try {// connect./** 連接服務提供者所在服務端*/connect();if (logger.isInfoEnabled()) {logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());}} catch (RemotingException t) {// If lazy connect client fails to establish a connection, the client instance will still be created,// and the reconnection will be initiated by ReconnectTask, so there is no need to throw an exceptionif (url.getParameter(LAZY_CONNECT_KEY, false)) {logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() +" connect to the server " + getRemoteAddress() +" (the connection request is initiated by lazy connect client, ignore and retry later!), cause: " +t.getMessage(), t);return;}if (url.getParameter(Constants.CHECK_KEY, true)) {close();throw t;} else {logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()+ " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);}} catch (Throwable t) {close();throw new RemotingException(url.toInetSocketAddress(), null,"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);}
}

4.7 doOpen初始化NettyClient

該方法用于初始化并啟動netty客戶端,是非常標準的netty客戶端啟動代碼,如果你們使用過Netty,看過Netty源碼,一定就會感到非常熟悉。

創(chuàng)建Bootstrap,設置eventGroup,編配ChannelHandler。至此成功初始化了Bootstrap,但是并沒有連接服務端。

/*** NettyClient的方法** 初始化 bootstrap*/
@Override
protected void doOpen() throws Throwable {//創(chuàng)建NettyClientHandlerfinal NettyClientHandler nettyClientHandler = createNettyClientHandler();//創(chuàng)建Bootstrap,說明這是一個netty客戶端bootstrap = new Bootstrap();//初始化NettyClientinitBootstrap(nettyClientHandler);
}protected NettyClientHandler createNettyClientHandler() {//創(chuàng)建NettyClientHandler,當前NettyClient對象本身也是一個ChannelHandler實例,其received方法委托給創(chuàng)建實例時傳遞的內(nèi)部的handler處理return new NettyClientHandler(getUrl(), this);
}protected void initBootstrap(NettyClientHandler nettyClientHandler) {//配置線程組bootstrap.group(EVENT_LOOP_GROUP.get())//設置Socket 參數(shù).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)//.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())//IO模型.channel(socketChannelClass());bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(DEFAULT_CONNECT_TIMEOUT, getConnectTimeout()));//設置處理器bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {ch.pipeline().addLast("negotiation", new SslClientTlsHandler(getUrl()));}NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);//自定義客戶端消息的業(yè)務處理邏輯Handlerch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug//解碼.addLast("decoder", adapter.getDecoder())//編碼.addLast("encoder", adapter.getEncoder())//心跳檢測.addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))//最后是此前創(chuàng)建的nettyClientHandler.addLast("handler", nettyClientHandler);String socksProxyHost = ConfigurationUtils.getProperty(getUrl().getOrDefaultApplicationModel(), SOCKS_PROXY_HOST);if(socksProxyHost != null && !isFilteredAddress(getUrl().getHost())) {int socksProxyPort = Integer.parseInt(ConfigurationUtils.getProperty(getUrl().getOrDefaultApplicationModel(), SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));ch.pipeline().addFirst(socks5ProxyHandler);}}});
}

4.8 connect連接服務端

在初始化Bootstrap之后,將調(diào)用connect方法真正的連接服務提供者所在的服務端,內(nèi)部調(diào)用doConnect方法執(zhí)行連接,該方法由子類實現(xiàn)。

 /*** AbstractClient的方法* <p>* 連接服務提供者所在服務端*/
protected void connect() throws RemotingException {//加鎖connectLock.lock();try {//如果已連接則返回if (isConnected()) {return;}//如果已關閉則返回if (isClosed() || isClosing()) {logger.warn("No need to connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion() + ", cause: client status is closed or closing.");return;}/** 執(zhí)行連接*/doConnect();if (!isConnected()) {throw new RemotingException(this, "Failed to connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()+ ", cause: Connect wait timeout: " + getConnectTimeout() + "ms.");} else {if (logger.isInfoEnabled()) {logger.info("Successfully connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()+ ", channel is " + this.getChannel());}}} catch (RemotingException e) {throw e;} catch (Throwable e) {throw new RemotingException(this, "Failed to connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()+ ", cause: " + e.getMessage(), e);} finally {connectLock.unlock();}
}

NettyClient的doConnect方法如下,主要邏輯就是調(diào)用bootstrap.connect方法連接服務端:

/*** NettyClient的方法* 連接服務端*/
@Override
protected void doConnect() throws Throwable {long start = System.currentTimeMillis();//通過bootstrap連接服務端ChannelFuture future = bootstrap.connect(getConnectAddress());try {//等待連接超時事件boolean ret = future.awaitUninterruptibly(getConnectTimeout(), MILLISECONDS);//如果連接成功if (ret && future.isSuccess()) {//獲取通道Channel newChannel = future.channel();try {// Close old channel// copy reference//關閉舊的ChannelChannel oldChannel = NettyClient.this.channel;if (oldChannel != null) {try {if (logger.isInfoEnabled()) {logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);}oldChannel.close();} finally {NettyChannel.removeChannelIfDisconnected(oldChannel);}}} finally {if (NettyClient.this.isClosed()) {try {if (logger.isInfoEnabled()) {logger.info("Close new netty channel " + newChannel + ", because the client closed.");}newChannel.close();} finally {NettyClient.this.channel = null;NettyChannel.removeChannelIfDisconnected(newChannel);}} else {NettyClient.this.channel = newChannel;}}} else if (future.cause() != null) {Throwable cause = future.cause();// 6-1 Failed to connect to provider server by other reason.RemotingException remotingException = new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "+ getRemoteAddress() + ", error message is:" + cause.getMessage(), cause);logger.error("6-1", "network disconnected", "","Failed to connect to provider server by other reason.", cause);throw remotingException;} else {// 6-2 Client-side timeoutRemotingException remotingException = new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "+ getRemoteAddress() + " client-side timeout "+ getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());logger.error("6-2", "provider crash", "","Client-side timeout.", remotingException);throw remotingException;}} finally {// just add new valid channel to NettyChannel's cacheif (!isConnected()) {//future.cancel(true);}}
}

5 saveProperties更新本地文件信息

在每次通知內(nèi)存數(shù)據(jù)更新之后,更新緩存文件。當注冊中心由于網(wǎng)絡抖動而訂閱失敗時,至少可以返回現(xiàn)有的緩存的URL。

/*** AbstractRegistry的方法** @param url 服務消費者url*/
private void saveProperties(URL url) {//服務緩存文件路徑為 {user.home}/.dubbo/dubbo-registry-{dubbo.application.name}-{ip}-{post}.cacheif (file == null) {return;}try {//需要存儲的url字符串StringBuilder buf = new StringBuilder();//獲取該url的不同類別節(jié)點到對應url列表的mapMap<String, List<URL>> categoryNotified = notified.get(url);//遍歷所有的節(jié)點urlif (categoryNotified != null) {for (List<URL> us : categoryNotified.values()) {for (URL u : us) {if (buf.length() > 0) {//追加空格buf.append(URL_SEPARATOR);}//追加url字符串buf.append(u.toFullString());}}}//消費者url key以及對應的節(jié)點url字符串存入propertiesproperties.setProperty(url.getServiceKey(), buf.toString());//版本自增long version = lastCacheChanged.incrementAndGet();//保存properties到本地文件if (syncSaveFile) {doSaveProperties(version);} else {registryCacheExecutor.schedule(() -> doSaveProperties(version), DEFAULT_INTERVAL_SAVE_PROPERTIES, TimeUnit.MILLISECONDS);}} catch (Throwable t) {logger.warn(t.getMessage(), t);}
}

本地緩存文件路徑為:{user.home}/.dubbo/dubbo-registry-{dubbo.application.name}-{ip}-{post}.cache,里面緩存的內(nèi)容如下,每一個服務接口占據(jù)一行,它的所有url字符串都追加在后面,通過空格分隔。
在這里插入圖片描述

6 總結

本次我們學習了接口級別服務發(fā)現(xiàn)訂閱refreshInterfaceInvoker方法的具體實現(xiàn),大概步驟為:

  1. 第一次調(diào)用refreshInterfaceInvoker方法的時候,由于MigrationInvoker內(nèi)部的真實消費者Invoker為null,那么需要創(chuàng)建一個消費者Invoker。
  2. 首先創(chuàng)建動態(tài)注冊心中目錄DynamicDirectory,隨后調(diào)用doCreateInvoker方法創(chuàng)建服務消費者Invoker。
    1. 首先根據(jù)消費者信息轉換為消費者注冊信息url,內(nèi)部包括消費者ip、指定引用的protocol(默認consumer協(xié)議)、指定引用的服務接口、指定引用的方法以及其他消費者信息。
    2. 調(diào)用registry.register方法將消費者注冊信息url注冊到注冊中心。
    3. 調(diào)用directory.buildRouterChain方法構建服務調(diào)用路由鏈RouterChain,賦給directory的routerChain屬性。
    4. 調(diào)用directory.subscribe方法進行服務發(fā)現(xiàn)、引入并訂閱服務。
      1. directory本身是一個監(jiān)聽器,directory將會訂閱zookeeper對應的服務接口節(jié)點下的dubbo/[service name]/providers,服務提供者目錄,以及dubbo/[service name]/configurators,即配置目錄,以及dubbo/[service name]/routers,即服務路由目錄。
      2. 依靠著zookeeper的watch監(jiān)聽回調(diào)機制,當這些節(jié)點下的子節(jié)點發(fā)生變化時會觸發(fā)回調(diào)通知RegistryDirectory執(zhí)行notify方法,進而完成本地服務列表的動態(tài)更新功能。實際上服務提供者也會訂閱,只不過只會訂閱configurators節(jié)點。
      3. 在執(zhí)行訂閱的時候,將會進行一次providers,configurators,routers節(jié)點目錄下字節(jié)點的獲取,這樣就獲取到了當前的服務提供者url、配置信息url、服務路由url。
      4. 在subscribe方法的最后,也是最關鍵的一步,主動調(diào)用notify方法通知數(shù)據(jù)變更。這里實際上會動態(tài)更新本地內(nèi)存和文件中的服務提供者緩存,可能會更新RegistryDirectory 內(nèi)部的configurators配置信息集合,routerChain路由鏈以及urlInvokerMap緩存,這里面存放著服務提供者url到對應的Invoker的映射。
        1. 如果沒有在本地緩存中找到某個服務提供者url的緩存,那么會將url轉換為對應協(xié)議的Invoker,默認DubboInvoker,DubboInvoker的內(nèi)部還會創(chuàng)建NettyClient客戶端,并與服務提供者所在的服務端建立連接。
        2. 將url轉換為Invoker之前,將會進行配置的合并,合并覆蓋順序是:override > -D參數(shù) >Consumer配置 > Provider配置,從這里可以知道消費者的配置優(yōu)先級大于提供者的配置。
    5. 調(diào)用cluster.join方法傳入directory進行集群容錯能力包裝,最終返回一個ClusterInvoker作為消費者Invoker,即MockClusterInvoker,這是一個包裝類,內(nèi)部包含真正的集群容錯Invoker,默認是FailoverClusterInvoker。

到此我們可以知道上面的各種對象的關系(注意MockClusterInvoker上面還有一個MigrationInvoker沒畫出來):

在這里插入圖片描述

到此接口級服務引入學習完畢,實際上Dubbo2就是采用的接口級別服務注冊和引入。后面我們將繼續(xù)學習應用級服務引入,實際上這才是Dubbo3升級的一個重點,非常值得學習!

http://m.aloenet.com.cn/news/45716.html

相關文章:

  • 網(wǎng)站有幾種類型vi設計
  • 二 網(wǎng)站建設的重要性今日頭條荊州新聞
  • 鞍山seo寧波網(wǎng)站關鍵詞優(yōu)化代碼
  • 婚戀網(wǎng)站翻譯可以做嗎模板建站常規(guī)流程
  • 企業(yè)網(wǎng)站開發(fā)注意什么百度電話查詢
  • 便利的邯鄲網(wǎng)站建設網(wǎng)站統(tǒng)計
  • 廣州做網(wǎng)站海珠新科優(yōu)化疫情防控措施
  • 做網(wǎng)站設計的成都市seo網(wǎng)站公司
  • 做項目掙錢的網(wǎng)站競價托管如何托管
  • 企業(yè)移動網(wǎng)站建設軟文街官方網(wǎng)站
  • 網(wǎng)站開發(fā)需要多線程嗎視頻優(yōu)化是什么意思
  • 常德市政府門戶網(wǎng)站群建設方案河南網(wǎng)站顧問
  • 網(wǎng)站建設與網(wǎng)頁設計 視頻百度權重什么意思
  • 網(wǎng)站開發(fā) chrome gimp網(wǎng)絡平臺營銷
  • wordpress生成xml網(wǎng)站優(yōu)化排名方案
  • 如何用電腦記事本做網(wǎng)站網(wǎng)絡營銷企業(yè)有哪些公司
  • 北京公司招聘整站優(yōu)化系統(tǒng)廠家
  • 免費做網(wǎng)站安全嗎黑鋒網(wǎng)seo
  • 建設信用卡官網(wǎng)網(wǎng)站濟南計算機培訓機構哪個最好
  • 給網(wǎng)站做腳本算違法嗎淘寶seo什么意思
  • 做代理網(wǎng)站百度指數(shù)搜索榜
  • 李寧運動服網(wǎng)站建設規(guī)劃書b2b網(wǎng)站有哪些平臺
  • 杭州網(wǎng)站建設工作室網(wǎng)站關鍵詞優(yōu)化怎么弄
  • 怎么找網(wǎng)站app推廣方式有哪些
  • 怎樣自己做免費網(wǎng)站網(wǎng)絡推廣運營優(yōu)化
  • 湘潭做網(wǎng)站問下磐石網(wǎng)絡nba最新交易信息
  • 有什么網(wǎng)站可以做運動阿里指數(shù)官網(wǎng)最新版本
  • 提供邯鄲手機網(wǎng)站建設蘇州關鍵詞優(yōu)化seo
  • wordpress照片ppt網(wǎng)絡優(yōu)化師是什么工作
  • php做企業(yè)網(wǎng)站需要多久技術培訓