简单了解 RPC 实现原理

时下很多企业应用更新换代到分布式,一篇文章了解什么是 RPC。
原作者梁飞,在此记录下他非常简洁的 rpc 实现思路。

核心框架类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
/*
* Copyright 2011 Alibaba.com All right reserved. This software is the
* confidential and proprietary information of Alibaba.com ("Confidential
* Information"). You shall not disclose such Confidential Information and shall
* use it only in accordance with the terms of the license agreement you entered
* into with Alibaba.com.
*/
package com.alibaba.study.rpc.framework;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.ServerSocket;
import java.net.Socket;

/**
* RpcFramework
*
* @author william.liangf
*/
public class RpcFramework {

/**
* 暴露服务
*
* @param service 服务实现
* @param port 服务端口
* @throws Exception
*/
public static void export(final Object service, int port) throws Exception {
if (service == null)
throw new IllegalArgumentException("service instance == null");
if (port <= 0 || port > 65535)
throw new IllegalArgumentException("Invalid port" + port);
System.out.println("Export service" + service.getClass().getName()+ "on port" + port);
ServerSocket server = new ServerSocket(port);
for(;;) {
try {
final Socket socket = server.accept();
new Thread(new Runnable() {
@Override
public void run() {
try {
try {
ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
try {
String methodName = input.readUTF();
Class<?>[] parameterTypes = (Class<?>[])input.readObject();
Object[] arguments = (Object[])input.readObject();
ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
try {
Method method = service.getClass().getMethod(methodName, parameterTypes);
Object result = method.invoke(service, arguments);
output.writeObject(result);
} catch (Throwable t) {
output.writeObject(t);
} finally {
output.close();
}
} finally {
input.close();
}
} finally {
socket.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
} catch (Exception e) {
e.printStackTrace();
}
}
}

/**
* 引用服务
*
* @param <T> 接口泛型
* @param interfaceClass 接口类型
* @param host 服务器主机名
* @param port 服务器端口
* @return 远程服务
* @throws Exception
*/
@SuppressWarnings("unchecked")
public static <T> T refer(final Class<T> interfaceClass, final String host, final int port) throws Exception {
if (interfaceClass == null)
throw new IllegalArgumentException("Interface class == null");
if (! interfaceClass.isInterface())
throw new IllegalArgumentException("The" + interfaceClass.getName() + "must be interface class!");
if (host == null || host.length() == 0)
throw new IllegalArgumentException("Host == null!");
if (port <= 0 || port > 65535)
throw new IllegalArgumentException("Invalid port" + port);
System.out.println("Get remote service" + interfaceClass.getName() + "from server" + host + ":" + port);
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] {interfaceClass}, new InvocationHandler() {
public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
Socket socket = new Socket(host, port);
try {
ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
try {
output.writeUTF(method.getName());
output.writeObject(method.getParameterTypes());
output.writeObject(arguments);
ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
try {
Object result = input.readObject();
if (result instanceof Throwable) {
throw (Throwable) result;
}
return result;
} finally {
input.close();
}
} finally {
output.close();
}
} finally {
socket.close();
}
}
});
}

}

定义服务接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/*
* Copyright 2011 Alibaba.com All right reserved. This software is the
* confidential and proprietary information of Alibaba.com ("Confidential
* Information"). You shall not disclose such Confidential Information and shall
* use it only in accordance with the terms of the license agreement you entered
* into with Alibaba.com.
*/
package com.alibaba.study.rpc.test;

/**
* HelloService
*
* @author william.liangf
*/
public interface HelloService {

String hello(String name);

}

实现服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/*
* Copyright 2011 Alibaba.com All right reserved. This software is the
* confidential and proprietary information of Alibaba.com ("Confidential
* Information"). You shall not disclose such Confidential Information and shall
* use it only in accordance with the terms of the license agreement you entered
* into with Alibaba.com.
*/
package com.alibaba.study.rpc.test;

/**
* HelloServiceImpl
*
* @author william.liangf
*/
public class HelloServiceImpl implements HelloService {

public String hello(String name) {
return "Hello" + name;
}

}

暴露服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/*
* Copyright 2011 Alibaba.com All right reserved. This software is the
* confidential and proprietary information of Alibaba.com ("Confidential
* Information"). You shall not disclose such Confidential Information and shall
* use it only in accordance with the terms of the license agreement you entered
* into with Alibaba.com.
*/
package com.alibaba.study.rpc.test;

import com.alibaba.study.rpc.framework.RpcFramework;

/**
* RpcProvider
*
* @author william.liangf
*/
public class RpcProvider {

public static void main(String[] args) throws Exception {
HelloService service = new HelloServiceImpl();
RpcFramework.export(service, 1234);
}

}

引用服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/*
* Copyright 2011 Alibaba.com All right reserved. This software is the
* confidential and proprietary information of Alibaba.com ("Confidential
* Information"). You shall not disclose such Confidential Information and shall
* use it only in accordance with the terms of the license agreement you entered
* into with Alibaba.com.
*/
package com.alibaba.study.rpc.test;

import com.alibaba.study.rpc.framework.RpcFramework;

/**
* RpcConsumer
*
* @author william.liangf
*/
public class RpcConsumer {

public static void main(String[] args) throws Exception {
HelloService service = RpcFramework.refer(HelloService.class, "127.0.0.1", 1234);
for (int i = 0; i < Integer.MAX_VALUE; i ++) {
String hello = service.hello("World" + i);
System.out.println(hello);
Thread.sleep(1000);
}
}

}

总结

这个简单的例子的实现思路是使用阻塞的 socket IO 流来进行 server 和 client 的通信,也就是 rpc 应用中服务提供方和服务消费方。并且是端对端的,用端口号来直接进行通信。方法的远程调用使用的是 jdk 的动态代理,参数的序列化也是使用的最简单的 objectStream。

真实的 rpc 框架会对上面的实现方式进行替换,采用更快更稳定,更高可用易扩展,更适宜分布式场景的中间件,技术来替换。例如使用 netty 的 nio 特性达到非阻塞的通信,使用 zookeeper 统一管理服务注册与发现,解决了端对端不灵活的劣势。代理方式有 cglib 字节码技术。序列化方式有 hession2,fastjson 等等。不过梁飞大大的博客使用原生的 jdk api 就展现给各位读者一个生动形象的 rpc demo,实在是强。rpc 框架解决的不仅仅是技术层面的实现,还考虑到了 rpc 调用中的诸多问题,重试机制,超时配置… 这些就需要去了解成熟的 rpc 框架是如果考虑这些问题的了。

推荐一个轻量级的 rpc 框架:motan。weibo 团队在 github 开源的一个 rpc 框架,有相应的文档,用起来感觉比 dubbo 要轻量级,易上手。

分享到