简介
简介
本文介绍分布式中的RPC。
开源的优秀RPC框架
- openfeign:GitHub – spring-cloud/spring-cloud-openfeign: Support for using OpenFeign in Spring Cloud apps
- 阿里巴巴 Dubbo:https://github.com/alibaba/dubbo
- 新浪微博 Motan:https://github.com/weibocom/motan
- gRPC:GitHub – grpc/grpc: The C based gRPC (C++, Python, Ruby, Objective-C, PHP, C#)
- rpcx :https://github.com/smallnest/rpcx
- Apache Thrift :Apache Thrift – Home
RPC与HTTP
应用远程调用另一个应用有两种解决方案:HTTP、RPC,如下图所示(左侧为HTTP,右侧为RPC)。
可以发现,RPC底层也可以使用HTTP来实现通信。
项 | HTTP | RPC |
传输效率 | 慢。 如果是基于HTTP1.1的协议,请求中会包含很多无用的内容,如果是基于HTTP2.0,那么简单的封装一下是可以作为一个RPC来使用的,这时标准RPC框架更多的是服务治理。 | 快。 使用自定义的TCP协议,可以让请求报文体积更小,或者使用HTTP2协议,也可以很好的减少报文的体积,提高传输效率 |
性能 | 低。 大部分是通过json来实现的,字节大小和序列化耗时都比thrift要更消耗性能。 | 高。 可以基于thrift实现高效的二进制传输。 |
难度 | 简单 | 复杂 |
灵活性 | 大。 它不关心实现细节,跨平台、跨语言。 | 小 |
RPC原理
什么是RPC
RPC(Remote Procedure Call Protocol)——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP/IP或UDP,为通信程序之间携带信息数据。RPC将原来的本地调用转变为调用远端的服务器上的方法,给系统的处理能力和吞吐量带来了近似于无限制提升的可能。在OSI网络通信模型中,RPC跨域了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。
RPC架构
一个完整的RPC架构里面包含了四个核心的组件,分别是Client,Client Stub,Server以及Server Stub,这个Stub可以理解为存根。
- 客户端(Client),服务的调用方。
- 客户端存根(Client Stub),存放服务端的地址消息,再将客户端的请求参数打包成网络消息,然后通过网络远程发送给服务方。
- 服务端(Server),真正的服务提供者。
- 服务端存根(Server Stub),接收客户端发送过来的消息,将消息解包,并调用本地的方法。
RPC调用过程
- 客户端(client)以本地调用方式(即以接口的方式)调用服务;
- 客户端存根(client stub)接收到调用后,负责将方法、参数等组装成能够进行网络传输的消息体(将消息体对象序列化为二进制);
- 客户端通过sockets将消息发送到服务端;
- 服务端存根( server stub)收到消息后进行解码(将消息对象反序列化);
- 服务端存根( server stub)根据解码结果调用本地的服务;
- 本地服务执行并将结果返回给服务端存根( server stub);
- 服务端存根( server stub)将返回结果打包成消息(将结果消息对象序列化);
- 服务端(server)通过sockets将消息发送到客户端;
- 客户端存根(client stub)接收到结果消息,并进行解码(将结果消息发序列化);
- 客户端(client)得到最终结果。
RPC的目标是要把2、3、4、7、8、9这些步骤都封装起来。
注意:无论是何种类型的数据,最终都需要转换成二进制流在网络上进行传输,数据的发送方需要将对象转换为二进制流,而数据的接收方则需要把二进制流再恢复为对象。
使用到的技术
1、动态代理
生成 client stub和server stub需要用到 Java 动态代理技术 ,我们可以使用JDK原生的动态代理机制,可以使用一些开源字节码工具框架 如:CgLib、Javassist等。
2、序列化
为了能在网络上传输和接收 Java对象,我们需要对它进行 序列化和反序列化操作。
* 序列化:将Java对象转换成byte[]的过程,也就是编码的过程;
* 反序列化:将byte[]转换成Java对象的过程;
可以使用Java原生的序列化机制,但是效率非常低,推荐使用一些开源的、成熟的序列化技术,例如:protobuf、Thrift、hessian、Kryo、Msgpack
关于序列化工具性能比较可以参考:jvm-serializers
3、NIO
当前很多RPC框架都直接基于netty这一IO通信框架,比如阿里巴巴的HSF、dubbo,Hadoop Avro,推荐使用Netty 作为底层通信框架。
4、服务注册中心
可选技术:
* Redis
* Zookeeper
* Consul
* Etcd
实例
本实例使用socket建立连接,JDK的API做动态代理,主要有服务提供方暴露服务、服务消费方获取代理对象、代理对象与服务提供方建立远程连接并调用三个方面。忽略所有的参数校验、异常。
这个简易实例中,整个RPC原理很清晰,本实例中最核心的一点,就是当proxy.sayHello执行的时候,实际是在执行RpcHandler的invoke方法,也就是远程调用。
如果要真正实现一个企业级RPC框架,仅仅有这个原理还是不够的。还需要考虑很多东西,例如建立连接的时候,使用NIO从而使得IO效率更高;或者在集群中,暴露服务的ip和端口都是动态的,而消费者此时也不能将要调用的服务提供方的ip和端口写死,于是需要一个注册中心的角色,产生注册服务、订阅服务等事件。
暴露服务
暴露服务
已知的某个服务的实例对象service,建立ServerSocket服务,并监听指定端口,当有远程连接建立时,创建一个线程,在线程中从输入流中依次读取方法名、参数类型、参数值等信息,并根据方法名和参数执行实例对象service中对应的方法,获得返回结果。
public class RpcExport { private static int port = 1234; /** * 暴露服务 * * @param service * 服务的实现 * @param port * 服务的端口 * @throws Exception */ public static void export(final Object service) throws Exception { System.out.println("Export service " + service.getClass().getName() + " on port " + port); // 创建socket,开始监听 ServerSocket server = new ServerSocket(port); while (true) { final Socket socket = server.accept(); new Thread(new Runnable() { @Override public void run() { ObjectInputStream input = null; ObjectOutputStream output = null; try { // 从监听的socket中获得输入流 input = new ObjectInputStream(socket.getInputStream()); String methodName = input.readUTF(); Class<?>[] parameterTypes = (Class<?>[]) input.readObject(); Object[] arguments = (Object[]) input.readObject(); // 从监听的socket中获得输出流 output = new ObjectOutputStream(socket.getOutputStream()); Method method = service.getClass().getMethod(methodName, parameterTypes); Object result = method.invoke(service, arguments); output.writeObject(result); } catch (Exception e) { } finally { try { if (output != null) { output.close(); } if (input != null) { input.close(); } socket.close(); } catch (IOException e) { e.printStackTrace(); } } } }).start(); } } }
代理工厂
消费方获取代理对象,使用JDK的动态代理API,传入接口类名。生成代理对象的时候,需要传入一个实现了InvocationHandler的对象,也就是下面的RpcHandler
import java.lang.reflect.Proxy; public class RpcProxyFactory { public static <T> T getService(Class<T> interfaceClass) { return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[] { interfaceClass }, new RpcHandler()); } }
代理对象建立远程连接
这个RpcHandler中重写了invoke方法,就是代理对象的方法执行的时候真正与服务提供方建立连接并获得返回结果的地方。
public class RpcHandler implements InvocationHandler { private String host = "127.0.0.1"; private int port = 1234; @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //与服务提供方建立连接 Socket socket = new Socket(host, port); try { //获取输出流,并写出方法名、参数名、参数值 ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream()); try { output.writeUTF(method.getName()); output.writeObject(method.getParameterTypes()); output.writeObject(args); ObjectInputStream input = new ObjectInputStream(socket.getInputStream()); try { //获得返回结果 Object result = input.readObject(); if (result instanceof Throwable) { throw (Throwable) result; } return result; } finally { input.close(); } } finally { output.close(); } } finally { socket.close(); } } }
使用
服务接口
public interface HelloService { public String sayHello(String name); }
服务实现类
public class HelloServiceImpl implements HelloService{ @Override public String sayHello(String name) { return "hello "+name; } }
暴露服务
public class RpcProvider { public static void main(String[] args) throws Exception { HelloService service = new HelloServiceImpl(); RpcExport.export(service); } }
消费者获取代理对象并调用
public class RpcConsumer { public static void main(String[] args) throws Exception { HelloService proxy = RpcProxyFactory.getService(HelloService.class); String result = proxy.sayHello("world"); System.out.println(result); } }
测试
先运行RpcProvider的main,再运行RpcConsumer的main
运行结果
输出:hello world
请先
!