服务消费方(client)以本地调用方式调用服务
client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
client stub 将消息进行编码并发送到服务端
server stub 收到消息后进行解码
server stub 根据解码结果调用本地的服务
本地服务执行并将结果返回给 server stub
server stub 将返回导入结果进行编码并发送至消费方
client stub 接收到消息并进行解码
服务消费方(client)得到结果
小结:RPC 的目标就是将 2-8 这些步骤都封装起来,用户无需关心这些细节,可以像调用本地方法一样即可 完成远程服务调用
public interface HelloService {
String hello(String str);
}
public class HelloServiceImpl implements HelloService {
@Override
public String hello(String str) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if(str != null){
return str + " 111";
}
return null;
}
}
public class ClientBootstrap {
public static final String prividerName = "HelloService#hello#";
public static void main(String[] args) {
NettyClient customer = new NettyClient();
//获取实体对象 .hello真正被调用的地方在于: getBean里面的 ->{xxx}这里:
// 只要通过代理对象去调用,那么就可以进入
HelloService helloService = (HelloService) customer.getBeans(HelloService.class, prividerName);
String str = helloService.hello("ni hao ");
System.out.println("服务端返回结果:" + str);
}
}
public class NettyClient {
private static NettyClientHandler client;
private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
//编写方法使用代理模式,获取一个代理对象
public Object getBeans(final Class<?> serviceClass, final String providerName) {
//使用jdk的静态代理方法 获取ClassLoader,类的字节码,初始化的handler
//Thread.currentThread().getContextClassLoader():获取类的加载器,用当前线程去获取;
//new Class<?>[]{serivceClass}:获取类的一个实例
// 那么也就是声明了代理类实现了这些接口,代理类就可以调用接口中声明的所有方法。
//h:一个InvocationHandler对象,表示的是当动态代理对象调用方法的时候会关联到哪一个InvocationHandler对象上,并最终由其调用。
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{serviceClass}, new InvocationHandler() {
@Override
//proxy: 代理对象本身
// method: 方法
// args: 将来传递的参数-->‘协议头’ 传递进来的信息,如:
// providerName: 协议头
// args[0]: 就是客户端调用api时, 如: hello(xxx) 里面的xxx参数
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//单利模式保证client有值
if (client == null) {
initClient();
}
//providerName 协议头 args[0] 就是客户端调用 api hello(???), 参数
client.setPara(providerName + args[0]);
//线程组取出线程开始调用client的call方法
return executorService.submit(client).get();
}
});
}
//初始化客户端
private static void initClient() {
client = new NettyClientHandler();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(client);
}
});
try {
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
System.out.println("客户端启动....");
} catch (Exception e) {
e.printStackTrace();
} finally {
}
}
}
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable{
private ChannelHandlerContext context; //上下文
private String result;
private String para; //参数
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.context = ctx;
//context.writeAndFlush("aaaa");
}
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("channelRead method invoke");
result = msg.toString();
System.out.println("server message:"+result);
notify();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
public void setPara(String para) {
this.para = para;
}
//NettyClientHandler可能在每个workerGroup中使用,而且如果是业务线程池去执行任务,那么
// 加上synchronized在方法前,就只能有一个线程去访问到这个synchronized;
@Override
public synchronized Object call() throws Exception {
System.out.println("call method invoke");
context.writeAndFlush(para); //发送消息给服务端
System.out.println("send message :"+ para);
wait(); //等待 channelRead 获取到数据
return result;
}
}
public class ServerBootstrap {
public static void main(String[] args) {
NettyServer.startServer("127.0.0.1",6666);
}
}
public class NettyServer {
public static void startServer(String hostName, int port) {
startServer0(hostName, port);
}
private static void startServer0(String hostName, int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new NettyServerHandler());
}
});
try {
ChannelFuture channelFuture = serverBootstrap.bind(hostName, port).sync();
System.out.println("服务提供方启动....");
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//获取客户端发送的消息,并调用服务
System.out.println("client msg:" + msg);
//客户端在调用服务器的 api 时,我们需要定义一个协议 //比如我们要求 每次发消息是都必须以某个字符串开头 "HelloService#hello#你好"
if (msg.toString().startsWith(ClientBootstrap.prividerName)) {
String hello = new HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
ctx.writeAndFlush(hello);
System.out.println("server send message:" + hello);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
因篇幅问题不能全部显示,请点此查看更多更全内容