wordpress國(guó)內(nèi)支付上海搜索優(yōu)化推廣
通過(guò)如下demo,希望大家可以快速理解RPC的簡(jiǎn)單案例。如果對(duì)socket不熟悉的話可以先看下我的上篇文章?Java Scoket實(shí)現(xiàn)簡(jiǎn)單的時(shí)間服務(wù)器-CSDN博客?對(duì)socket現(xiàn)有基礎(chǔ)了解。
RPC簡(jiǎn)介
RPC(Remote Procedure Call,遠(yuǎn)程過(guò)程調(diào)用)是一種計(jì)算機(jī)通信協(xié)議,它允許一個(gè)程序(客戶端)通過(guò)網(wǎng)絡(luò)向另一個(gè)程序(服務(wù)器)請(qǐng)求服務(wù),而無(wú)需了解底層網(wǎng)絡(luò)技術(shù)的細(xì)節(jié)。RPC 使得開(kāi)發(fā)分布式應(yīng)用程序更加容易,因?yàn)樗[藏了網(wǎng)絡(luò)通信的復(fù)雜性,讓開(kāi)發(fā)者能夠像調(diào)用本地函數(shù)一樣調(diào)用遠(yuǎn)程函數(shù)。
RPC 的主要特點(diǎn):
- 透明性:調(diào)用遠(yuǎn)程過(guò)程就像調(diào)用本地過(guò)程一樣,無(wú)需關(guān)心遠(yuǎn)程調(diào)用的細(xì)節(jié)。
- 跨平臺(tái):可以在不同的操作系統(tǒng)和編程語(yǔ)言之間進(jìn)行通信。
- 網(wǎng)絡(luò)中立:RPC 協(xié)議可以運(yùn)行在各種網(wǎng)絡(luò)協(xié)議之上,如 TCP/IP、HTTP 等。
- 語(yǔ)言中立:支持多種編程語(yǔ)言,如 C、C++、Java、Python 等。
RPC 的工作流程:
- 客戶端調(diào)用:客戶端調(diào)用本地的 RPC 庫(kù),傳遞需要遠(yuǎn)程執(zhí)行的函數(shù)名和參數(shù)。
- 序列化:RPC 庫(kù)將參數(shù)序列化成網(wǎng)絡(luò)傳輸?shù)母袷?#xff08;如 JSON、XML 或二進(jìn)制格式)。
- 發(fā)送請(qǐng)求:客戶端通過(guò)網(wǎng)絡(luò)發(fā)送序列化后的請(qǐng)求到服務(wù)器。
- 服務(wù)器接收:服務(wù)器接收到請(qǐng)求后,反序列化參數(shù)。
- 執(zhí)行函數(shù):服務(wù)器調(diào)用相應(yīng)的函數(shù)執(zhí)行操作。
- 返回結(jié)果:服務(wù)器將執(zhí)行結(jié)果序列化后發(fā)送回客戶端。
- 客戶端接收:客戶端接收到結(jié)果,反序列化并處理。
RPC 的應(yīng)用場(chǎng)景:
- 分布式系統(tǒng):在分布式系統(tǒng)中,不同的服務(wù)可能部署在不同的服務(wù)器上,RPC 可以方便地進(jìn)行服務(wù)間的通信。
- 微服務(wù)架構(gòu):在微服務(wù)架構(gòu)中,各個(gè)微服務(wù)之間通常通過(guò) RPC 進(jìn)行通信。
- 云服務(wù):云服務(wù)提供商可能使用 RPC 來(lái)允許用戶遠(yuǎn)程調(diào)用云服務(wù)。
常見(jiàn)的 RPC 框架:
- gRPC:由 Google 開(kāi)發(fā)的高性能、開(kāi)源和通用的 RPC 框架,支持多種語(yǔ)言。
- Apache Thrift:由 Facebook 開(kāi)發(fā),后來(lái)捐贈(zèng)給 Apache 基金會(huì),支持多種編程語(yǔ)言。
- Dubbo:阿里巴巴開(kāi)源的一個(gè)高性能的 Java RPC 框架。
- XML-RPC?和?JSON-RPC:基于 XML 和 JSON 數(shù)據(jù)格式的簡(jiǎn)單 RPC 協(xié)議。
RPC 是構(gòu)建現(xiàn)代分布式系統(tǒng)和微服務(wù)架構(gòu)的關(guān)鍵技術(shù)之一。如下是個(gè)人對(duì)RPC服務(wù)的理解示圖。
代碼實(shí)現(xiàn)
1.創(chuàng)建Service以及實(shí)現(xiàn)類,由于演示使用的是jdk的動(dòng)態(tài)代理,所以必須得通過(guò)接口方式聲明資源。代碼如下:
HelloService.java
public interface HelloService {String sayHello();}
HelloServiceImpl.java 遠(yuǎn)程訪問(wèn)的接口實(shí)現(xiàn)
public class HelloServiceImpl implements HelloService {@Overridepublic String sayHello() {return "hello world !!!";}
}
參數(shù)傳輸DTO定義,為了簡(jiǎn)化get、set,請(qǐng)引入lombok工具包:
import lombok.Data;
import lombok.ToString;
import lombok.experimental.Accessors;import java.io.Serializable;@Data
@Accessors(chain = true)
@ToString
public class RequestDTO implements Serializable {/*** 請(qǐng)求函數(shù)名稱*/private String methodName;/*** 請(qǐng)求參數(shù)類型*/private Class<?>[] argsType;/*** 請(qǐng)求的接口*/private String clazz;/*** 請(qǐng)求參數(shù)*/private Object[] params;/*** 響應(yīng)結(jié)果*/private Object response;}
服務(wù)提供者設(shè)計(jì):
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class Provide {/*** 多線程處理請(qǐng)求*/private static final ExecutorService POOL = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());private static final Map<String,Object> CACHE_MAP = new HashMap<>();public static void main(String[] args) throws Exception {//注冊(cè)對(duì)象registerObject();//啟動(dòng)服務(wù)startServer();}/*** 啟動(dòng)服務(wù)端*/private static void startServer() throws Exception {ServerSocket serverSocket = new ServerSocket();serverSocket.bind(new InetSocketAddress(9999));System.out.println("服務(wù)器啟動(dòng)成功!!!");while (true){Socket socket = serverSocket.accept();POOL.execute(()->{try {ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());RequestDTO request = (RequestDTO)ois.readObject();System.out.println("獲取到客戶端的請(qǐng)求數(shù)據(jù)是:"+request);//目標(biāo)對(duì)象Object target = CACHE_MAP.get(request.getClazz());if(null!=target){//反射執(zhí)行并返回響應(yīng)結(jié)果Method method = target.getClass().getMethod(request.getMethodName(),request.getArgsType());method.setAccessible(true);request.setResponse(method.invoke(target,request.getParams()));}ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(bos);oos.writeObject(request);socket.getOutputStream().write(bos.toByteArray());socket.getOutputStream().flush();} catch (Exception e) {throw new RuntimeException(e);}finally {try {socket.close();} catch (IOException e) {throw new RuntimeException(e);}}});}}/*** 注冊(cè)對(duì)象,因?yàn)檠菔?#xff0c;簡(jiǎn)化自動(dòng)化掃描注冊(cè)過(guò)程*/private static void registerObject() {CACHE_MAP.put(HelloService.class.getSimpleName(),new HelloServiceImpl());}}
服務(wù)調(diào)用方實(shí)現(xiàn):
1.動(dòng)態(tài)代理
import java.io.*;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class DynamicProxy<T> {private final String host = "127.0.0.1";private final int port = 9999;/*** 被代理的對(duì)象*/private Class<T> clazz;public DynamicProxy(Class<T> clazz){this.clazz = clazz;}/*** 獲取代理對(duì)象*** @return T*/public T getService(){return (T) Proxy.newProxyInstance(DynamicProxy.class.getClassLoader(), new Class[]{clazz}, new InvocationHandler() {@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {RequestDTO requestDTO = new RequestDTO().setMethodName(method.getName()).setParams(args).setClazz(clazz.getSimpleName()).setArgsType(method.getParameterTypes());try (Socket socket = new Socket()){//連接客戶端socket.connect(new InetSocketAddress(host,port));//進(jìn)行參數(shù)傳輸ByteArrayOutputStream baos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(baos);oos.writeObject(requestDTO);oos.flush();socket.getOutputStream().write(baos.toByteArray());socket.getOutputStream().flush();ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());Object o = ois.readObject();if(null==o){return null;}return ((RequestDTO)o).getResponse();} catch (Exception e) {throw new RuntimeException(e);}}});}}
調(diào)用過(guò)程:
public class Consumer {public static void main(String[] args) {DynamicProxy<HelloService> helloServiceDynamicProxy = new DynamicProxy<>(HelloService.class);HelloService service = helloServiceDynamicProxy.getService();System.out.println(service.sayHello());}}
總結(jié)
RPC(遠(yuǎn)程過(guò)程調(diào)用)是一種允許一個(gè)程序(客戶端)通過(guò)網(wǎng)絡(luò)向另一個(gè)程序(服務(wù)器)請(qǐng)求服務(wù)的協(xié)議。在RPC框架中,客戶端可以像調(diào)用本地方法一樣調(diào)用服務(wù)器端的方法,而無(wú)需關(guān)心底層的網(wǎng)絡(luò)通信細(xì)節(jié)。
以下是一段基于Java Socket實(shí)現(xiàn)的RPC原理的demo總結(jié):
-
服務(wù)器端(Server):
- 創(chuàng)建一個(gè)服務(wù)器端Socket,監(jiān)聽(tīng)一個(gè)端口等待客戶端的連接。
- 接受客戶端的連接請(qǐng)求,創(chuàng)建一個(gè)新的線程或者使用線程池來(lái)處理客戶端請(qǐng)求。
- 接收客戶端發(fā)送的請(qǐng)求信息,這通常包括方法名、參數(shù)類型和參數(shù)值等。
- 根據(jù)請(qǐng)求信息,動(dòng)態(tài)調(diào)用對(duì)應(yīng)的本地方法,并獲取執(zhí)行結(jié)果。
- 將方法執(zhí)行結(jié)果返回給客戶端。
-
客戶端(Client):
- 創(chuàng)建一個(gè)客戶端Socket,連接到服務(wù)器端的IP地址和端口。
- 封裝需要遠(yuǎn)程調(diào)用的方法信息,包括方法名、參數(shù)等,并將這些信息發(fā)送給服務(wù)器。
- 等待服務(wù)器處理請(qǐng)求并返回結(jié)果。
- 接收并解析服務(wù)器返回的結(jié)果,這可能涉及到對(duì)象的反序列化。
-
序列化與反序列化:
- 為了通過(guò)網(wǎng)絡(luò)傳輸方法的參數(shù)和返回值,需要將對(duì)象序列化成字節(jié)流。
- Java提供了
Serializable
接口來(lái)支持對(duì)象的序列化。 - 客戶端發(fā)送序列化后的對(duì)象,服務(wù)器端接收后需要反序列化以恢復(fù)對(duì)象。
-
通信協(xié)議:
- 定義了客戶端和服務(wù)器之間的通信協(xié)議,包括請(qǐng)求格式和響應(yīng)格式。
- 協(xié)議需要規(guī)定如何表示方法名、參數(shù)列表、返回值等信息。
-
異常處理:
- 在網(wǎng)絡(luò)通信中,可能會(huì)遇到各種異常情況,如連接失敗、數(shù)據(jù)傳輸錯(cuò)誤等。
- 需要在客戶端和服務(wù)器端都實(shí)現(xiàn)異常處理機(jī)制,確保系統(tǒng)的健壯性。
-
多線程處理:
- 服務(wù)器端通常需要處理多個(gè)客戶端的并發(fā)請(qǐng)求。
- 使用線程池可以有效地管理線程資源,提高系統(tǒng)性能。
-
安全性:
- 在實(shí)際應(yīng)用中,需要考慮數(shù)據(jù)的安全性,比如使用SSL/TLS加密Socket通信。
通過(guò)這個(gè)Demo,我們可以了解到RPC的核心原理和實(shí)現(xiàn)方式,雖然這是一個(gè)簡(jiǎn)化的示例,但它展示了RPC框架的基本工作流程。在實(shí)際應(yīng)用中,RPC框架會(huì)提供更復(fù)雜的功能,如服務(wù)注冊(cè)與發(fā)現(xiàn)、負(fù)載均衡、超時(shí)重試等。