国产亚洲精品福利在线无卡一,国产精久久一区二区三区,亚洲精品无码国模,精品久久久久久无码专区不卡

當前位置: 首頁 > news >正文

溫州建設網(wǎng)站百度上海總部

溫州建設網(wǎng)站,百度上海總部,同城信息商家的網(wǎng)站開發(fā),wordpress極簡風dubbo在服務消費時調用的方法棧比較深,所以得一邊看一邊記,還是比較費力的。在dubbo服務發(fā)現(xiàn)中,我們看到通過ReferenceConfig#get()返回的是要調用接口的代理對象,因此通過接口的代理對象調用方法時是調用InvocationHandler(Invok…

dubbo在服務消費時調用的方法棧比較深,所以得一邊看一邊記,還是比較費力的。在dubbo服務發(fā)現(xiàn)中,我們看到通過ReferenceConfig#get()返回的是要調用接口的代理對象,因此通過接口的代理對象調用方法時是調用InvocationHandler(InvokerInvocationHandler)#invoke()方法,此時會調用注入的Invoker#invoke()方法,而InvokerInvocationHandler對象的invoker默認是DubboInvoker實現(xiàn)的,因此DubboInvoker#invoke()方法被調用,最終調用子類DubboInvoker#doInvoke()方法。

// DubboInvoker=>AbstractInvoker.javapublic Result invoke(Invocation inv) throws RpcException {//設置一系列參數(shù)......try {//調用子類方法return doInvoke(invocation);} catch (InvocationTargetException e) { // biz exception......}}

這里是調用遠程服務有三種,異步無返回值、異步有返回值、同步阻塞獲取返回,以同步方式往下分析。

// DubboInvokerprotected Result doInvoke(final Invocation invocation) throws Throwable {RpcInvocation inv = (RpcInvocation) invocation;final String methodName = RpcUtils.getMethodName(invocation);//接口路徑inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());inv.setAttachment(Constants.VERSION_KEY, version);//獲取已經(jīng)建立連接的客戶端ExchangeClient currentClient;if (clients.length == 1) {currentClient = clients[0];} else {currentClient = clients[index.getAndIncrement() % clients.length];}try {//先從invocation中獲取,再從URL中獲取boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);//單向調用,直接調用不獲取返回值if (isOneway) {boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);currentClient.send(inv, isSent);RpcContext.getContext().setFuture(null);return new RpcResult();} else if (isAsync) {//異步調用有返回值需要手動獲取ResponseFuture future = currentClient.request(inv, timeout);RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));return new RpcResult();} else {//同步調用,get()會阻塞地獲取返回值RpcContext.getContext().setFuture(null);return (Result) currentClient.request(inv, timeout).get();}} catch (TimeoutException e) {throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);} catch (RemotingException e) {throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);}}

DubboProtocol#getSharedClient()生成netty客戶端時,會使用ReferenceCountExchangeClient裝飾客戶端ExchangeClient,然后被保存到數(shù)組中,因此這里獲取的currentClient為ReferenceCountExchangeClient,調用注入的ExchangeClient(HeaderExchangeClient)對象client的request()方法。

// ReferenceCountExchangeClientpublic ResponseFuture request(Object request, int timeout) throws RemotingException {return client.request(request, timeout);}

這里會繼續(xù)調用內(nèi)部的ExchangeChannel對象channel的request()方法,那這里的channel怎么來得呢。構造HeaderExchangeClient對象時new出來的,同時構造方法中的client為NettyClient實例對象。

// HeaderExchangeClient.javapublic ResponseFuture request(Object request, int timeout) throws RemotingException {return channel.request(request, timeout);}public HeaderExchangeClient(Client client, boolean needHeartbeat) {....this.client = client;this.channel = new HeaderExchangeChannel(client);....}

那繼續(xù)往下看HeaderExchangeChannel#request()方法,好像離發(fā)送請求數(shù)據(jù)越來越近了,這里會先構造Request和DefaultFuture,再調用內(nèi)部注入的channel(NettyClient)的send()方法。

// HeaderExchangeChannelpublic ResponseFuture request(Object request, int timeout) throws RemotingException {if (closed) {throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");}// create request.Request req = new Request();req.setVersion("2.0.0");req.setTwoWay(true);req.setData(request);DefaultFuture future = new DefaultFuture(channel, req, timeout);try {channel.send(req);} catch (RemotingException e) {future.cancel();throw e;}return future;}

NettyClient的繼承關系是NettyClient=>AbstractClient=>AbstractPeer,此時內(nèi)部AbstractClient#send()方法被調用。這里就是先getChannel()獲取通道再調用send(),再分析這個channel怎么來的,在doConnect()時如果連接服務端成功則能獲取Channel,默認為NettyChannel。

// AbstractClientpublic void send(Object message, boolean sent) throws RemotingException {if (send_reconnect && !isConnected()) {connect();}//getChannel()由子類NettyClientChannel channel = getChannel();//TODO Can the value returned by getChannel() be null? need improvement.if (channel == null || !channel.isConnected()) {throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());}channel.send(message, sent);}

NettyChannel#send()中就會把請求發(fā)送給服務端了。

// NettyChannelpublic void send(Object message, boolean sent) throws RemotingException {super.send(message, sent);boolean success = true;int timeout = 0;try {ChannelFuture future = channel.write(message);if (sent) {timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);success = future.await(timeout);}Throwable cause = future.getCause();if (cause != null) {throw cause;}} catch (Throwable e) {throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);}if (!success) {throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()+ "in timeout(" + timeout + "ms) limit");}}

至此服務消費者向服務提供者發(fā)送調用服務的請求完成,最終返回的在HeaderExchangeChannel#request()構建的DefaultFuture。還記得DubboInvoker#doInvoke()嗎,這里的get()獲取服務端響應的代碼如下,就是不斷輪詢判斷服務端是否響應,超時則拋出異常。

// DefaultFuturepublic Object get(int timeout) throws RemotingException {if (timeout <= 0) {timeout = Constants.DEFAULT_TIMEOUT;}if (!isDone()) {long start = System.currentTimeMillis();lock.lock();try {while (!isDone()) {done.await(timeout, TimeUnit.MILLISECONDS);if (isDone() || System.currentTimeMillis() - start > timeout) {break;}}} catch (InterruptedException e) {throw new RuntimeException(e);} finally {lock.unlock();}if (!isDone()) {throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));}}return returnFromResponse();}

總結下發(fā)送請求的過程:首先獲取已經(jīng)建立連接的netty客戶端,然后構建Request和DefaultFuture,通過netty通道將請求發(fā)送給netty服務端之后,DefaultFuture#get()會超時等待,默認超時時間1S。

接下來看服務端怎么處理調用請求的。服務提供者收到服務消費者的調用請求后,首先在DubboCodec#decodeBody()對編碼后的請求字節(jié)流數(shù)據(jù)進行解碼,得到調用遠程服務的Request對象。

    protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);// get request id.long id = Bytes.bytes2long(header, 4);if ((flag & FLAG_REQUEST) == 0) {......} else {// decode request.Request req = new Request(id);req.setVersion("2.0.0");req.setTwoWay((flag & FLAG_TWOWAY) != 0);if ((flag & FLAG_EVENT) != 0) {req.setEvent(Request.HEARTBEAT_EVENT);}try {Object data;if (req.isHeartbeat()) {data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));} else if (req.isEvent()) {data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));} else {DecodeableRpcInvocation inv;if (channel.getUrl().getParameter(Constants.DECODE_IN_IO_THREAD_KEY,Constants.DEFAULT_DECODE_IN_IO_THREAD)) {inv = new DecodeableRpcInvocation(channel, req, is, proto);inv.decode();} else {inv = new DecodeableRpcInvocation(channel, req,new UnsafeByteArrayInputStream(readMessageData(is)), proto);}data = inv;}req.setData(data);} catch (Throwable t) {if (log.isWarnEnabled()) {log.warn("Decode request failed: " + t.getMessage(), t);}// bad requestreq.setBroken(true);req.setData(t);}return req;}}

在創(chuàng)建netty服務器的時候會調用NettyServer#doOpen()創(chuàng)建NettyHandler,當前服務端收到請求數(shù)據(jù)時鉤子函數(shù)messageReceived會被調用。

//NettyHandler public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);try {handler.received(channel, e.getMessage());} finally {NettyChannel.removeChannelIfDisconnected(ctx.getChannel());}}

服務端處理請求是分層的,消費者調用提供者的請求在AllChannelHandler#received()中被處理的,此時請求會通過線程池調度線程執(zhí)行。

// AllChannelHandlerpublic void received(Channel channel, Object message) throws RemotingException {ExecutorService cexecutor = getExecutorService();try {cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {......}}

這里會用DecodeHandler對請求解碼后到HeaderExchangeHandler#received()中繼續(xù)處理。

// ChannelEventRunnablepublic void run() {switch (state) {......case RECEIVED:try {handler.received(channel, message);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel+ ", message is " + message, e);}break;......}}

發(fā)現(xiàn)收到的消息是請求Request時,調用handleRequest()處理。

// HeaderExchangeHandlerpublic void received(Channel channel, Object message) throws RemotingException {channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);try {if (message instanceof Request) {// handle request.Request request = (Request) message;if (request.isEvent()) {handlerEvent(channel, request);} else {if (request.isTwoWay()) {Response response = handleRequest(exchangeChannel, request);channel.send(response);} else {handler.received(exchangeChannel, request.getData());}}} else if (message instanceof Response) {handleResponse(channel, (Response) message);} else if (message instanceof String) {if (isClientSide(channel)) {Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());logger.error(e.getMessage(), e);} else {String echo = handler.telnet(channel, (String) message);if (echo != null && echo.length() > 0) {channel.send(echo);}}} else {handler.received(exchangeChannel, message);}} finally {HeaderExchangeChannel.removeChannelIfDisconnected(channel);}}

在handleRequest()中會調用DubboProtocol中的屬性requestHandler引用的匿名內(nèi)部類對象中的reply()方法。

// HeaderExchangeHandlerResponse handleRequest(ExchangeChannel channel, Request req) throws RemotingException {Response res = new Response(req.getId(), req.getVersion());if (req.isBroken()) {Object data = req.getData();String msg;if (data == null) msg = null;else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);else msg = data.toString();res.setErrorMessage("Fail to decode request due to: " + msg);res.setStatus(Response.BAD_REQUEST);return res;}// find handler by message class.//獲取請求數(shù)據(jù),即客戶端創(chuàng)建的RpcInvocation對象Object msg = req.getData();try {// handle data.//調用接口方法Object result = handler.reply(channel, msg);res.setStatus(Response.OK);res.setResult(result);} catch (Throwable e) {res.setStatus(Response.SERVICE_ERROR);res.setErrorMessage(StringUtils.toString(e));}return res;}

這里其實就是獲取Invoker,再調用Invoker#invoke()。

// DubboProtocol.javaprivate ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {@Overridepublic Object reply(ExchangeChannel channel, Object message) throws RemotingException {if (message instanceof Invocation) {Invocation inv = (Invocation) message;Invoker<?> invoker = getInvoker(channel, inv);......return invoker.invoke(inv);}......}};

在DubboProtocol#export()暴露接口服務時會將serviceKey對應的DubboExporter添加到exporterMap(ConcurrentHashMap<String, Exporter<?>>)中,此時就可以通過serviceKey獲取到對應的DubboExporter,再通過DubboExporter獲取Invoker,Invoker是在ServiceConfig中的doExportUrlsFor1Protocol()中生成的。

    Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {......String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);......return exporter.getInvoker();}

找到Invoker以后就會調用invoke(),最終調用子類的doInvoke()方法,wrapper是動態(tài)生成的,但是邏輯就是調用接口實現(xiàn)類對象中的方法。

    public Result invoke(Invocation invocation) throws RpcException {try {//調用子類的doInvoke()方法return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));} catch (InvocationTargetException e) {return new RpcResult(e.getTargetException());} catch (Throwable e) {throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);}}public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);return new AbstractProxyInvoker<T>(proxy, type, url) {@Overrideprotected Object doInvoke(T proxy, String methodName,Class<?>[] parameterTypes,Object[] arguments) throws Throwable {return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);}};}

調用方法獲取到返回值以后就會在HeaderExchangeHandler#received()中將響應發(fā)給服務消費者,服務消費者再以類似的過程對響應數(shù)據(jù)進行解碼,返回到應用層。
總結下服務提供者處理調用請求的過程:首先對請求字節(jié)流數(shù)據(jù)進行解碼,得到請求Request,然后到保存導出模塊的map根據(jù)serviceKey獲取到代理對象,最終通過代理對象調用接口實現(xiàn)類對象的方法,將返回值發(fā)送給服務提供者

http://m.aloenet.com.cn/news/1211.html

相關文章:

  • 天津企商網(wǎng)站建設公司關鍵詞優(yōu)化按天計費
  • 網(wǎng)站如何做銀聯(lián)在線支付大連中小企業(yè)網(wǎng)絡營銷
  • 一般做外貿(mào)上什么網(wǎng)站熱狗網(wǎng)站排名優(yōu)化外包
  • 建設網(wǎng)站com艾滋病阻斷藥
  • 網(wǎng)站robots.txt檢測網(wǎng)站關鍵詞在線優(yōu)化
  • 用html5做的個人網(wǎng)站網(wǎng)絡營銷試卷及答案
  • python合適做網(wǎng)站嗎海外網(wǎng)絡推廣方案
  • 做網(wǎng)站圖片百度競價排名系統(tǒng)
  • 網(wǎng)站 默認首頁濟南seo的排名優(yōu)化
  • 商城開發(fā)價格服務排名優(yōu)化百度
  • 和先鋒影音和做的網(wǎng)站百度關鍵詞排名推廣
  • 騎行網(wǎng)站模板網(wǎng)站搭建平臺
  • wordpress 黃藍 現(xiàn)代企業(yè)教程seo推廣排名網(wǎng)站
  • 建立網(wǎng)站需要注冊公司嗎seo引擎優(yōu)化公司
  • 網(wǎng)站做哪些主題比較容易做幽默廣告軟文案例
  • 專做外貿(mào)衣服鞋網(wǎng)站有哪些網(wǎng)址搜索引擎入口
  • 還有什么網(wǎng)站可以做面包車拉貨做一個網(wǎng)站需要多少錢大概
  • 福建網(wǎng)站建設公司交換友情鏈接的意義是什么
  • 常州建設工程監(jiān)理員掛證網(wǎng)站百度軟件開放平臺
  • 做網(wǎng)站的時候賣過假貨而出過事搜索引擎優(yōu)化是免費的嗎
  • 重點項目建設網(wǎng)站商業(yè)策劃公司十大公司
  • 營銷型網(wǎng)站系統(tǒng)網(wǎng)絡營銷策劃方案
  • 國內(nèi)做新聞比較好的網(wǎng)站有哪些企業(yè)網(wǎng)站制作公司
  • wordpress漢語公益搜索網(wǎng)站排名優(yōu)化
  • 網(wǎng)站被降權會發(fā)生什么長春網(wǎng)站公司哪家好
  • 廊坊網(wǎng)站快速排名優(yōu)化杭州seo營銷
  • 旅游網(wǎng)站開發(fā)功能網(wǎng)絡廣告投放網(wǎng)站
  • 公安部門網(wǎng)站備案網(wǎng)站產(chǎn)品推廣
  • 政府網(wǎng)站建設工作匯報網(wǎng)頁設計和網(wǎng)站制作
  • 寧波網(wǎng)站建設免費咨詢漯河網(wǎng)絡推廣哪家好