網(wǎng)站開發(fā)服務(wù)費(fèi)記賬個(gè)人網(wǎng)站制作源代碼
我的技術(shù)網(wǎng)站?java-broke.site,有大廠完整面經(jīng),工作技術(shù),架構(gòu)師成長(zhǎng)之路,等經(jīng)驗(yàn)分享
Linkis-RPC的設(shè)計(jì)目標(biāo)是提供一種靈活、可擴(kuò)展的微服務(wù)間通信機(jī)制,支持以下功能:
- 異步請(qǐng)求與響應(yīng):支持請(qǐng)求方發(fā)送異步請(qǐng)求,接收方處理完后再響應(yīng)給請(qǐng)求方。
- 廣播請(qǐng)求:支持將請(qǐng)求廣播給所有目標(biāo)服務(wù)實(shí)例。
- 自定義攔截器:允許用戶定義攔截器來(lái)實(shí)現(xiàn)特定功能,如緩存、重試等。
- 服務(wù)選擇:基于Eureka實(shí)現(xiàn)的服務(wù)發(fā)現(xiàn),支持通過(guò)服務(wù)名和實(shí)例信息選擇特定服務(wù)實(shí)例。
- 解耦通信機(jī)制:請(qǐng)求方和接收方的代碼實(shí)現(xiàn)解耦,便于獨(dú)立開發(fā)和維護(hù)。
Linkis-RPC的架構(gòu)概述
Linkis-RPC主要由以下幾個(gè)核心組件構(gòu)成:
- Sender:請(qǐng)求發(fā)送器,用于發(fā)送請(qǐng)求到目標(biāo)服務(wù)。
- Receiver:請(qǐng)求接收器,負(fù)責(zé)接收和處理請(qǐng)求。
- Interceptor:攔截器鏈,用于在請(qǐng)求發(fā)送前進(jìn)行處理,如廣播、重試、緩存等。
- Decoder/Encoder:用于請(qǐng)求和響應(yīng)的序列化和反序列化。
- Eureka:服務(wù)注冊(cè)與發(fā)現(xiàn)中心。
Linkis-RPC的代碼結(jié)構(gòu)
Linkis-RPC的源碼結(jié)構(gòu)清晰,主要代碼模塊包括:
- Sender接口和實(shí)現(xiàn):負(fù)責(zé)發(fā)送請(qǐng)求并處理響應(yīng)。
- Receiver接口和實(shí)現(xiàn):負(fù)責(zé)接收請(qǐng)求并執(zhí)行業(yè)務(wù)邏輯。
- Interceptor接口和實(shí)現(xiàn):攔截器實(shí)現(xiàn)類,用于增強(qiáng)請(qǐng)求功能。
- RPCReceiveRestful:內(nèi)嵌的HTTP服務(wù),用于接收和解碼請(qǐng)求。
- 請(qǐng)求和響應(yīng)編碼器/解碼器:實(shí)現(xiàn)請(qǐng)求和響應(yīng)的序列化和反序列化。
接下來(lái),我們將逐步分析每個(gè)模塊的代碼實(shí)現(xiàn)。
Sender的實(shí)現(xiàn)
Sender接口
Sender
接口提供了多種發(fā)送請(qǐng)求的方法,包括同步和異步請(qǐng)求。
public abstract class Sender {/*** 同步請(qǐng)求方法,等待接收方返回響應(yīng)。** @param message 請(qǐng)求消息對(duì)象* @return 響應(yīng)對(duì)象*/public abstract Object ask(Object message);/*** 帶超時(shí)設(shè)置的同步請(qǐng)求方法。** @param message 請(qǐng)求消息對(duì)象* @param timeout 超時(shí)時(shí)間* @return 響應(yīng)對(duì)象*/public abstract Object ask(Object message, Duration timeout);/*** 僅發(fā)送請(qǐng)求,不關(guān)心響應(yīng)。** @param message 請(qǐng)求消息對(duì)象*/public abstract void send(Object message);/*** 異步請(qǐng)求方法,在稍后通過(guò)其他線程發(fā)送請(qǐng)求。** @param message 請(qǐng)求消息對(duì)象*/public abstract void deliver(Object message);
}
Sender的實(shí)現(xiàn)類
Sender
的實(shí)現(xiàn)類主要負(fù)責(zé)構(gòu)建請(qǐng)求并通過(guò)Feign客戶端發(fā)送請(qǐng)求。
public class DefaultSender extends Sender {private final FeignClient feignClient;private final String serviceName;public DefaultSender(String serviceName, FeignClient feignClient) {this.serviceName = serviceName;this.feignClient = feignClient;}@Overridepublic Object ask(Object message) {return feignClient.post(message);}@Overridepublic Object ask(Object message, Duration timeout) {// 設(shè)置請(qǐng)求超時(shí)邏輯return feignClient.postWithTimeout(message, timeout);}@Overridepublic void send(Object message) {feignClient.send(message);}@Overridepublic void deliver(Object message) {// 異步發(fā)送邏輯CompletableFuture.runAsync(() -> feignClient.post(message));}
}
Receiver的實(shí)現(xiàn)
Receiver接口
Receiver
接口定義了接收請(qǐng)求和響應(yīng)的基本方法。
public interface Receiver {/*** 處理異步請(qǐng)求的方法。** @param message 請(qǐng)求消息對(duì)象* @param sender 請(qǐng)求發(fā)送方的Sender實(shí)例*/void receive(Object message, Sender sender);/*** 處理同步請(qǐng)求的方法,返回響應(yīng)對(duì)象。** @param message 請(qǐng)求消息對(duì)象* @param sender 請(qǐng)求發(fā)送方的Sender實(shí)例* @return 響應(yīng)對(duì)象*/Object receiveAndReply(Object message, Sender sender);/*** 帶超時(shí)設(shè)置的同步請(qǐng)求處理方法。** @param message 請(qǐng)求消息對(duì)象* @param duration 超時(shí)時(shí)間* @param sender 請(qǐng)求發(fā)送方的Sender實(shí)例* @return 響應(yīng)對(duì)象*/Object receiveAndReply(Object message, Duration duration, Sender sender);
}
Receiver的實(shí)現(xiàn)類
public class DefaultReceiver implements Receiver {@Overridepublic void receive(Object message, Sender sender) {// 異步處理邏輯System.out.println("Received async message: " + message);// 根據(jù)業(yè)務(wù)需求決定是否需要響應(yīng)發(fā)送方}@Overridepublic Object receiveAndReply(Object message, Sender sender) {// 處理請(qǐng)求并返回響應(yīng)System.out.println("Received sync message: " + message);return processRequest(message);}@Overridepublic Object receiveAndReply(Object message, Duration duration, Sender sender) {// 處理請(qǐng)求并返回響應(yīng),考慮超時(shí)System.out.println("Received sync message with timeout: " + message);return processRequestWithTimeout(message, duration);}private Object processRequest(Object message) {// 業(yè)務(wù)邏輯處理return "Processed: " + message;}private Object processRequestWithTimeout(Object message, Duration duration) {// 業(yè)務(wù)邏輯處理,支持超時(shí)try {Thread.sleep(duration.toMillis());} catch (InterruptedException e) {Thread.currentThread().interrupt();return "Processing interrupted";}return "Processed with timeout: " + message;}
}
Interceptor的實(shí)現(xiàn)
攔截器接口
Interceptor
接口定義了在請(qǐng)求發(fā)送前后的處理邏輯。
public interface RPCInterceptor {/*** 請(qǐng)求發(fā)送前的處理邏輯。** @param message 請(qǐng)求消息對(duì)象*/void preHandle(Object message);/*** 請(qǐng)求發(fā)送后的處理邏輯。** @param message 請(qǐng)求消息對(duì)象* @param response 響應(yīng)對(duì)象*/void postHandle(Object message, Object response);
}
廣播攔截器實(shí)現(xiàn)
廣播攔截器用于將請(qǐng)求廣播給所有服務(wù)實(shí)例。
@Componentpublic class BroadcastRPCInterceptor implements RPCInterceptor {@Overridepublic void preHandle(Object message) {if (message instanceof BroadcastMessage) {// 廣播邏輯System.out.println("Broadcasting message: " + message);}}@Overridepublic void postHandle(Object message, Object response) {// 后處理邏輯}
}
重試攔截器實(shí)現(xiàn)
重試攔截器用于在請(qǐng)求失敗時(shí)進(jìn)行重試。
@Componentpublic class RetryableRPCInterceptor implements RPCInterceptor {@Overridepublic void preHandle(Object message) {// 重試邏輯處理}@Overridepublic void postHandle(Object message, Object response) {if (response instanceof Throwable) {System.out.println("Request failed, retrying...");// 重試邏輯}}
}
緩存攔截器實(shí)現(xiàn)
緩存攔截器用于對(duì)不頻繁變動(dòng)的響應(yīng)進(jìn)行緩存。
@Componentpublic class CacheableRPCInterceptor implements RPCInterceptor {private final Map<Object, Object> cache = new ConcurrentHashMap<>();@Overridepublic void preHandle(Object message) {if (message instanceof CacheableMessage) {// 檢查緩存Object cachedResponse = cache.get(message);if (cachedResponse != null) {System.out.println("Cache hit: " + message);// 返回緩存響應(yīng)}}}@Overridepublic void postHandle(Object message, Object response) {if (message instanceof CacheableMessage) {// 緩存響應(yīng)cache.put(message, response);}}
}
自定義攔截器實(shí)現(xiàn)
用戶可以根據(jù)需要實(shí)現(xiàn)自定義攔截器,實(shí)現(xiàn)特定功能。
@Componentpublic class CustomRPCInterceptor implements RPCInterceptor {@Overridepublic void preHandle(Object message) {// 自定義處理邏輯}@Overridepublic void postHandle(Object message, Object response) {// 自定義處理邏輯}}
請(qǐng)求和響應(yīng)編碼器/解碼器
請(qǐng)求編碼器
將請(qǐng)求對(duì)象序列化為JSON字符串。
public class RPCEncoder {public String encode(Object message) {// 使用Jackson或Gson進(jìn)行序列化return new ObjectMapper().writeValueAsString(message);}
}
請(qǐng)求解碼器
將JSON字符串反序列化為請(qǐng)求對(duì)象。
public class RPCDecoder {public <T> T decode(String json, Class<T> clazz) {// 使用Jackson或Gson進(jìn)行反序列化return new ObjectMapper().readValue(json, clazz);}
}
RPCReceiveRestful的實(shí)現(xiàn)
RPCReceiveRestful
是一個(gè)內(nèi)嵌的HTTP服務(wù),用于接收請(qǐng)求并調(diào)用解碼器進(jìn)行解碼。
@RestController@RequestMapping("/rpc")
public class RPCReceiveRestful {private final RPCDecoder decoder;private final ReceiverManager receiverManager;public RPCReceiveRestful(RPCDecoder decoder, ReceiverManager receiverManager) {this.decoder = decoder;this.receiverManager = receiverManager;}@PostMapping("/receive")public ResponseEntity<Object> receiveRequest(@RequestBody String requestJson) {try {// 解碼請(qǐng)求RPCRequest request = decoder.decode(requestJson, RPCRequest.class);// 獲取對(duì)應(yīng)的ReceiverReceiver receiver = receiverManager.getReceiver(request.getServiceName());// 調(diào)用Receiver處理請(qǐng)求Object response = receiver.receiveAndReply(request.getMessage(), request.getSender());return ResponseEntity.ok(response);} catch (Exception e) {// 請(qǐng)求解碼或處理失敗return ResponseEntity.status(HttpStatus.BAD_REQUEST).body("Request processing failed");}}
}
示例代碼:使用Linkis-RPC進(jìn)行通信
創(chuàng)建Sender
public class RpcClient {private final Sender sender;public RpcClient(Sender sender) {this.sender = sender;}public void sendMessage(String message) {// 異步發(fā)送消息sender.deliver(message);}public Object requestResponse(String message) {// 同步請(qǐng)求響應(yīng)return sender.ask(message);}
}
創(chuàng)建Receiver
public class RpcServer implements Receiver {@Overridepublic void receive(Object message, Sender sender) {// 處理異步請(qǐng)求System.out.println("Server received async message: " + message);}@Overridepublic Object receiveAndReply(Object message, Sender sender) {// 處理同步請(qǐng)求并返回響應(yīng)System.out.println("Server received sync message: " + message);return "Server response to: " + message;}@Overridepublic Object receiveAndReply(Object message, Duration duration, Sender sender) {// 處理帶超時(shí)的同步請(qǐng)求并返回響應(yīng)System.out.println("Server received sync message with timeout: " + message);return "Server response with timeout to: " + message;}
}
結(jié)論
Linkis-RPC提供了一種靈活且強(qiáng)大的微服務(wù)間通信機(jī)制,解決了傳統(tǒng)RPC框架在復(fù)雜場(chǎng)景中的不足。通過(guò)自定義攔截器、異步請(qǐng)求處理和廣播機(jī)制,Linkis-RPC能夠滿足現(xiàn)代微服務(wù)架構(gòu)的通信需求。本文詳細(xì)分析了Linkis-RPC的設(shè)計(jì)思想、代碼結(jié)構(gòu),并提供了完整的代碼示例,希望能為開發(fā)者提供有價(jià)值的參考。
如需進(jìn)一步的討論和問(wèn)題解答,歡迎留言,共同探討Linkis-RPC的更多應(yīng)用場(chǎng)景和實(shí)現(xiàn)細(xì)節(jié)。
參考鏈接:公共模塊 - RPC 模塊 - 《Apache Linkis v1.3.0 中文文檔》 - 書棧網(wǎng) · BookStack