哈爾濱網(wǎng)站建設(shè)開發(fā)外包品牌網(wǎng)站建設(shè)方案
Nacos服務(wù)發(fā)現(xiàn)的方式
1.客戶端獲取
1.1:先是故障轉(zhuǎn)移機(jī)制判斷是否去本地文件中讀取信息,讀到則返回
1.2:再去本地服務(wù)列表讀取信息(本地緩存),沒讀到則創(chuàng)建一個(gè)空的服務(wù),然后立刻去nacos中讀取更新
1.3:讀到了就返回,同時(shí)開啟定時(shí)更新,定時(shí)向服務(wù)端同步信息 (正常1s,異常最多60s一次)
2.服務(wù)端通過GRPC推送
建立長連接、當(dāng)服務(wù)發(fā)現(xiàn)變更的時(shí)候往訂閱了服務(wù)的客戶端推送事件
SpringBoot自動(dòng)注入
項(xiàng)目啟動(dòng)的時(shí)候會(huì)通過自動(dòng)注入的機(jī)制將
NacosDiscoveryClientConfiguration注入
當(dāng)注入NacosDiscoveryClientConfiguration的時(shí)候會(huì)將DiscoveryClient一起注入Bean
DiscoveryClient實(shí)現(xiàn)了SpringCloud的DiscoveryClient接口,重點(diǎn)是getInstances和getServices方法,而且都是由NacosServiceDiscovery實(shí)現(xiàn)
獲取實(shí)例信息
NacosDiscoveryClient
private NacosServiceDiscovery serviceDiscovery;public NacosDiscoveryClient(NacosServiceDiscovery nacosServiceDiscovery) {this.serviceDiscovery = nacosServiceDiscovery;}@Overridepublic List<ServiceInstance> getInstances(String serviceId) {try {return serviceDiscovery.getInstances(serviceId);}catch (Exception e) {throw new RuntimeException("Can not get hosts from nacos server. serviceId: " + serviceId, e);}}
public List<ServiceInstance> getInstances(String serviceId) throws NacosException {String group = discoveryProperties.getGroup();List<Instance> instances = namingService().selectInstances(serviceId, group,true);return hostToServiceInstanceList(instances, serviceId);}
NacosServiceDiscovery
public List<ServiceInstance> getInstances(String serviceId) throws NacosException {//獲取分組String group = discoveryProperties.getGroup();//查詢服務(wù)下的實(shí)例List<Instance> instances = namingService().selectInstances(serviceId, group,true);//填充返回的實(shí)例信息數(shù)據(jù)return hostToServiceInstanceList(instances, serviceId);}
@Overridepublic List<Instance> selectInstances(String serviceName, String groupName, boolean healthy) throws NacosException {return selectInstances(serviceName, groupName, healthy, true);}
@Overridepublic List<Instance> selectInstances(String serviceName, String groupName, boolean healthy, boolean subscribe)throws NacosException {return selectInstances(serviceName, groupName, new ArrayList<String>(), healthy, subscribe);}
@Overridepublic List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy,boolean subscribe) throws NacosException {ServiceInfo serviceInfo;String clusterString = StringUtils.join(clusters, ",");// 默認(rèn)是訂閱的if (subscribe) {//從緩存中獲取實(shí)例信息serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);//如果獲取不到則從服務(wù)端拉取if (null == serviceInfo) {serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);}} else {// 如果未訂閱服務(wù)信息,則直接從服務(wù)器進(jìn)行查詢serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);}//獲取服務(wù)中的實(shí)例信息return selectInstances(serviceInfo, healthy);}
從緩存中拿數(shù)據(jù)
public ServiceInfo getServiceInfo(final String serviceName, final String groupName, final String clusters) {NAMING_LOGGER.debug("failover-mode: {}", failoverReactor.isFailoverSwitch());String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);String key = ServiceInfo.getKey(groupedServiceName, clusters);if (failoverReactor.isFailoverSwitch()) {return failoverReactor.getService(key);}return serviceInfoMap.get(key);}
獲取服務(wù)的實(shí)例信息
private List<Instance> selectInstances(ServiceInfo serviceInfo, boolean healthy) {List<Instance> list;if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {return new ArrayList<Instance>();}Iterator<Instance> iterator = list.iterator();while (iterator.hasNext()) {Instance instance = iterator.next();// 保留 健康、啟用、權(quán)重大于0 的實(shí)例if (healthy != instance.isHealthy() || !instance.isEnabled() || instance.getWeight() <= 0) {iterator.remove();}}return list;}
GRPC請求拉取服務(wù)實(shí)例信息
@Overridepublic ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {NAMING_LOGGER.info("[SUBSCRIBE-SERVICE] service:{}, group:{}, clusters:{} ", serviceName, groupName, clusters);String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);//定時(shí)同步服務(wù)端serviceInfoserviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);//獲取ServiceInfo 信息ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);//如果沒有則從服務(wù)端拿if (null == result || !isSubscribed(serviceName, groupName, clusters)) {//GRPC請求result = grpcClientProxy.subscribe(serviceName, groupName, clusters);}//填充進(jìn)Map中 這里可以看服務(wù)注冊最后那部分代碼最后也是調(diào)用serviceInfoHolder保存的serviceInfoHolder.processServiceInfo(result);return result;}
定時(shí)同步服務(wù)端ServiceInfo
public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);if (futureMap.get(serviceKey) != null) {return;}synchronized (futureMap) {if (futureMap.get(serviceKey) != null) {return;}//構(gòu)建任務(wù)放到ScheduledFuture執(zhí)行ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));futureMap.put(serviceKey, future);}}
緩存訂閱信息
public void cacheSubscriberForRedo(String serviceName, String groupName, String cluster) {//拿服務(wù)當(dāng)keyString key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), cluster);//構(gòu)建需要緩存的訂閱信息SubscriberRedoData redoData = SubscriberRedoData.build(serviceName, groupName, cluster);//緩存訂閱信息synchronized (subscribes) {subscribes.put(key, redoData);}}
執(zhí)行訂閱
public ServiceInfo doSubscribe(String serviceName, String groupName, String clusters) throws NacosException {//構(gòu)建訂閱請求SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, groupName, serviceName, clusters,true);//執(zhí)行訂閱SubscribeServiceResponse response = requestToServer(request, SubscribeServiceResponse.class);//設(shè)置已訂閱redoService.subscriberRegistered(serviceName, groupName, clusters);return response.getServiceInfo();}
設(shè)置訂閱信息已訂閱
public void subscriberRegistered(String serviceName, String groupName, String cluster) {String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), cluster);synchronized (subscribes) {SubscriberRedoData redoData = subscribes.get(key);// 標(biāo)記訂閱數(shù)據(jù)已訂閱if (null != redoData) {redoData.setRegistered(true);}}}
Nacos訂閱機(jī)制
Nacos的訂閱機(jī)制,如果用一句話來描述就是:Nacos客戶端通過一個(gè)定時(shí)任務(wù),每6秒從注冊中心獲取實(shí)例列表,當(dāng)發(fā)現(xiàn)實(shí)例發(fā)生變化時(shí),發(fā)布變更事件,訂閱者進(jìn)行業(yè)務(wù)處理。該更新實(shí)例的更新實(shí)例,該更新本地緩存的更新本地緩存。
UpdateTask
public class UpdateTask implements Runnable {public void run() {long delayTime = DEFAULT_DELAY;try {//校驗(yàn)訂閱任務(wù)是否不對 如果不對就不處理 if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(serviceKey)) {NAMING_LOGGER.info("update task is stopped, service:{}, clusters:{}", groupedServiceName, clusters);isCancel = true;return;}//從緩存中拿 Service信息ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);//如果拿不到則去服務(wù)端拉取if (serviceObj == null) {serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false); //然后再填充進(jìn)緩存 serviceInfoHolder.processServiceInfo(serviceObj);//更新下事件lastRefTime = serviceObj.getLastRefTime();return;}// 過期服務(wù)(服務(wù)的最新更新時(shí)間小于等于緩存刷新時(shí)間),從注冊中心重新查詢if (serviceObj.getLastRefTime() <= lastRefTime) {//服務(wù)過期了重新查serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false); //在緩存進(jìn)去serviceInfoHolder.processServiceInfo(serviceObj);}// 刷新更新時(shí)間lastRefTime = serviceObj.getLastRefTime();if (CollectionUtils.isEmpty(serviceObj.getHosts())) {incFailCount();return;}// 下次更新緩存時(shí)間設(shè)置,默認(rèn)為6秒// TODO multiple time can be configured.delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;// 重置失敗數(shù)量為0resetFailCount();} catch (Throwable e) {incFailCount();NAMING_LOGGER.warn("[NA] failed to update serviceName: {}", groupedServiceName, e);} finally {// 下次調(diào)度刷新時(shí)間,下次執(zhí)行的時(shí)間與failCount有關(guān)// failCount=0,則下次調(diào)度時(shí)間為6秒,最長為1分鐘// 即當(dāng)無異常情況下緩存實(shí)例的刷新時(shí)間是6秒if (!isCancel) {executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60),TimeUnit.MILLISECONDS);}}}}
實(shí)例變更事件處理
監(jiān)聽事件的注冊
在NacosNamingService的subscribe方法中,通過如下方式進(jìn)行了監(jiān)聽事件的注冊:
@Overridepublic void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)throws NacosException {if (null == listener) {return;}String clusterString = StringUtils.join(clusters, ",");changeNotifier.registerListener(groupName, serviceName, clusterString, listener);clientProxy.subscribe(serviceName, groupName, clusterString);}
這里的changeNotifier.registerListener便是進(jìn)行具體的事件注冊邏輯
public void registerListener(String groupName, String serviceName, String clusters, EventListener listener) {String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);//這里用到了雙重檢查鎖機(jī)制if (eventListeners == null) {synchronized (lock) {eventListeners = listenerMap.get(key);if (eventListeners == null) {eventListeners = new ConcurrentHashSet<EventListener>();listenerMap.put(key, eventListeners);}}}eventListeners.add(listener);}
可以看出,事件的注冊便是將EventListener存儲(chǔ)在InstancesChangeNotifier的listenerMap屬性當(dāng)中了。
這里的數(shù)據(jù)結(jié)構(gòu)為Map,key為服務(wù)實(shí)例信息的拼接,value為監(jiān)聽事件的集合。
監(jiān)聽服務(wù)變更事件
因?yàn)閁pdateTask 中假如沒有從緩存中拿到服務(wù)信息則會(huì)通過grpc協(xié)議從服務(wù)端拉取然后會(huì)執(zhí)行serviceInfoHolder.processServiceInfo方法緩存服務(wù)信息,當(dāng)實(shí)例發(fā)生變化的話這個(gè)方法最終會(huì)發(fā)送一個(gè)InstancesChangeEvent 事件 所以這里會(huì)監(jiān)聽InstancesChangeEvent 事件進(jìn)行處理
InstancesChangeNotifier
public class InstancesChangeNotifier extends Subscriber<InstancesChangeEvent> {private final Map<String, ConcurrentHashSet<EventListener>> listenerMap = new ConcurrentHashMap<String, ConcurrentHashSet<EventListener>>();@Overridepublic void onEvent(InstancesChangeEvent event) {String key = ServiceInfo.getKey(NamingUtils.getGroupedName(event.getServiceName(), event.getGroupName()), event.getClusters());ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);if (CollectionUtils.isEmpty(eventListeners)) {return;}for (final EventListener listener : eventListeners) {//[] final com.alibaba.nacos.api.naming.listener.Event namingEvent = transferToNamingEvent(event);final com.alibaba.nacos.api.naming.listener.Event namingEvent = new NamingEvent(instancesChangeEvent.getServiceName(), instancesChangeEvent.getGroupName(),instancesChangeEvent.getClusters(), instancesChangeEvent.getHosts());// 最終調(diào)度執(zhí)行l(wèi)istener.onEvent(namingEvent),只在NacosWatch#start找到了有效的EventListener,見下文if (listener instanceof AbstractEventListener && ((AbstractEventListener) listener).getExecutor() != null) {((AbstractEventListener) listener).getExecutor().execute(() -> listener.onEvent(namingEvent));} else {listener.onEvent(namingEvent);}}}
}
}
public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycle, DisposableBean {private Map<String, EventListener> listenerMap = new ConcurrentHashMap(16);private final AtomicBoolean running = new AtomicBoolean(false);public void start() {if (this.running.compareAndSet(false, true)) {EventListener eventListener = (EventListener)this.listenerMap.computeIfAbsent(this.buildKey(), (event) -> {return new EventListener() {public void onEvent(Event event) {if (event instanceof NamingEvent) {List instances = ((NamingEvent)event).getInstances();//[] Optional instanceOptional = NacosWatch.this.selectCurrentInstance(instances);// 按IP和端口選擇第一個(gè)instance作為當(dāng)前的instanceOptional instanceOptional = instances.stream().filter((instance) -> {return this.properties.getIp().equals(instance.getIp()) && this.properties.getPort() == instance.getPort();}).findFirst()instanceOptional.ifPresent((currentInstance) -> {//[] NacosWatch.this.resetIfNeeded(currentInstance);// 重新設(shè)置properties的metadataif (!this.properties.getMetadata().equals(instance.getMetadata())) {this.properties.setMetadata(instance.getMetadata());}});}}};});}}
獲取服務(wù)信息
NacosDiscoveryClient
@Overridepublic List<String> getServices() {try {return serviceDiscovery.getServices();}catch (Exception e) {log.error("get service name from nacos server fail,", e);return Collections.emptyList();}}
public List<String> getServices() throws NacosException {//獲取分組String group = discoveryProperties.getGroup();//獲取服務(wù)信息ListView<String> services = namingService().getServicesOfServer(1,Integer.MAX_VALUE, group);return services.getData();}
@Overridepublic ListView<String> getServicesOfServer(int pageNo, int pageSize, String groupName) throws NacosException {return getServicesOfServer(pageNo, pageSize, groupName, null);}
@Overridepublic ListView<String> getServicesOfServer(int pageNo, int pageSize, String groupName, AbstractSelector selector)throws NacosException {return clientProxy.getServiceList(pageNo, pageSize, groupName, selector);}
GRPC拉取信息
@Overridepublic ListView<String> getServiceList(int pageNo, int pageSize, String groupName, AbstractSelector selector)throws NacosException {//構(gòu)建請求ServiceListRequest request = new ServiceListRequest(namespaceId, groupName, pageNo, pageSize);if (selector != null) {if (SelectorType.valueOf(selector.getType()) == SelectorType.label) {request.setSelector(JacksonUtils.toJson(selector));}}//采用GRPC協(xié)議拉取信息ServiceListResponse response = requestToServer(request, ServiceListResponse.class);ListView<String> result = new ListView<String>();result.setCount(response.getCount());result.setData(response.getServiceNames());return result;}
服務(wù)端處理GRPC請求
接收獲取服務(wù)的請求
ServiceListRequestHandler
@Override@Secured(action = ActionTypes.READ)public ServiceListResponse handle(ServiceListRequest request, RequestMeta meta) throws NacosException {//根據(jù)命名空間獲取這個(gè)命名空間下的所有服務(wù)信息 erviceManager.getInstance().getSingletons這個(gè)方法服務(wù)注冊的時(shí)候里有Collection<Service> serviceSet = ServiceManager.getInstance().getSingletons(request.getNamespace());//構(gòu)建返回信息ServiceListResponse result = ServiceListResponse.buildSuccessResponse(0, new LinkedList<>());//服務(wù)信息不等于空填充返回信息if (!serviceSet.isEmpty()) {Collection<String> serviceNameSet = selectServiceWithGroupName(serviceSet, request.getGroupName());// TODO select service by selectorList<String> serviceNameList = ServiceUtil.pageServiceName(request.getPageNo(), request.getPageSize(), serviceNameSet);result.setCount(serviceNameSet.size());result.setServiceNames(serviceNameList);}return result;}
訂閱服務(wù)請求
SubscribeServiceRequestHandler
@Override@Secured(action = ActionTypes.READ)public SubscribeServiceResponse handle(SubscribeServiceRequest request, RequestMeta meta) throws NacosException {//命名空間String namespaceId = request.getNamespace();//服務(wù)名稱String serviceName = request.getServiceName();//分組名稱String groupName = request.getGroupName();String app = request.getHeader("app", "unknown");String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);//構(gòu)建服務(wù)信息Service service = Service.newService(namespaceId, groupName, serviceName, true);//組裝訂閱請求Subscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), app, meta.getClientIp(),namespaceId, groupedServiceName, 0, request.getClusters());//獲取健康的實(shí)例 ServiceInfo serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceStorage.getData(service), //服務(wù)元數(shù)據(jù)信息 metadataManager.getServiceMetadata(service).orElse(null), subscriber);//是否訂閱 if (request.isSubscribe()) {clientOperationService.subscribeService(service, subscriber, meta.getConnectionId());} else {clientOperationService.unsubscribeService(service, subscriber, meta.getConnectionId());}//構(gòu)建返回?cái)?shù)據(jù)return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);}
發(fā)送訂閱事件 后續(xù)事件監(jiān)聽可參考服務(wù)事件處理的那篇文章
@Overridepublic void subscribeService(Service service, Subscriber subscriber, String clientId) {Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);Client client = clientManager.getClient(clientId);if (!clientIsLegal(client, clientId)) {return;}client.addServiceSubscriber(singleton, subscriber);client.setLastUpdatedTime();NotifyCenter.publishEvent(new ClientOperationEvent.ClientSubscribeServiceEvent(singleton, clientId));}
發(fā)送取消訂閱事件 后續(xù)事件監(jiān)聽可參考服務(wù)事件處理的那篇文章
@Overridepublic void unsubscribeService(Service service, Subscriber subscriber, String clientId) {Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);Client client = clientManager.getClient(clientId);if (!clientIsLegal(client, clientId)) {return;}client.removeServiceSubscriber(singleton);client.setLastUpdatedTime();NotifyCenter.publishEvent(new ClientOperationEvent.ClientUnsubscribeServiceEvent(singleton, clientId));}
服務(wù)查詢請求
ServiceQueryRequestHandler
@Override@Secured(action = ActionTypes.READ)public QueryServiceResponse handle(ServiceQueryRequest request, RequestMeta meta) throws NacosException {//獲取命名空間String namespaceId = request.getNamespace();//分組明String groupName = request.getGroupName();//服務(wù)名String serviceName = request.getServiceName();//創(chuàng)建服務(wù)信息Service service = Service.newService(namespaceId, groupName, serviceName);//集群String cluster = null == request.getCluster() ? "" : request.getCluster();boolean healthyOnly = request.isHealthyOnly();//獲取服務(wù)信息ServiceInfo result = serviceStorage.getData(service);//獲取服務(wù)元數(shù)據(jù)信息ServiceMetadata serviceMetadata = metadataManager.getServiceMetadata(service).orElse(null);// 獲取有保護(hù)機(jī)制的健康實(shí)例result = ServiceUtil.selectInstancesWithHealthyProtection(result, serviceMetadata, cluster, healthyOnly, true,meta.getClientIp());//構(gòu)建返回信息return QueryServiceResponse.buildSuccessResponse(result);}
public ServiceInfo getData(Service service) {return serviceDataIndexes.containsKey(service) ? serviceDataIndexes.get(service) : getPushData(service);}
public Optional<ServiceMetadata> getServiceMetadata(Service service) {return Optional.ofNullable(serviceMetadataMap.get(service));}