【手写 RPC】使用netty手写一个RPC框架 结合新特性 虚拟线程

【手写RPC框架】如何使用netty手写一个RPC框架 结合新特性 虚拟线程

什么是RPC框架

RPC(Remote Procedure Call)远程过程调用,是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC框架是一种远程调用的框架,它可以让你像调用本地方法一样调用远程方法。

避免了开发人员自己去封装网络请求、连接管理、序列化、反序列化等操作,提高了开发效率。

Netty是什么?为什么使用Netty

Netty是一个基于NIO的客户、服务器端编程框架,使用Netty可以快速开发网络应用,例如服务器和客户端。Netty是一个高性能、异步事件驱动的网络应用框架,它简化了网络编程,提供了一种新的方式来处理网络通信。

大白话粗略理解:因为Java的NIO的API使用起来比较复杂,Netty是对NIO的封装,使用起来更加简单。

所以这也是为什么我们使用Netty来实现RPC框架的原因,netty也被很多框架证明了它的稳定性和性能。

Java虚拟线程

Java虚拟线程是一个轻量级的线程,它不需要操作系统的线程支持,可以在一个线程中运行多个虚拟线程。Java虚拟线程是一个用户态的线程,它不需要操作系统的线程支持,可以在一个线程中运行多个虚拟线程。

虚拟线程实际上是通过传统的线程来管理多个虚拟线程,在Java的平台上去调度这些虚拟线程,从而实现了轻量级的线程称为虚拟线程,想要了解更加细节的可以去看下我的另一篇文章:【虚拟线程】Java虚拟线程 VirtualThread 是什么黑科技

虚拟线程的优势:

  1. 轻量级:虚拟线程是轻量级的线程,可以在一个线程中运行多个虚拟线程。
  2. 高效:虚拟线程是用户态的线程,不需要操作系统的线程支持,可以在一个线程中运行多个虚拟线程,线程的切换不涉及内核态和用户态的切换,效率更高。

适合的场景:

  1. 高并发:虚拟线程适合高并发的场景,可以在一个线程中运行多个虚拟线程,减少线程的创建和销毁,提高性能。
  2. IO密集型:虚拟线程适合IO密集型的场景,可以在一个线程中运行多个虚拟线程,减少线程的创建和销毁,提高性能。
  3. 任务短暂:虚拟线程适合任务短暂的场景,可以在一个线程中运行多个虚拟线程,减少线程的创建和销毁,提高性能。

写一个RPC框架需要哪些步骤

既然我们要写一个RPC框架,那么我们需要明确一下我们需要做哪些事情。
我们是从A服务调用B服务,那么就代表我们的服务A是客户端,服务B是服务端。但是我们的系统正常来说要调用别的服务,也会被别的服务调用,
所以我们的服务A也是服务端,服务B也是客户端。所以我们的系统要同时具备客户端和服务端的功能。

  • 客户端的功能:发现服务、请求(负载均衡、发起连接、发送请求)、接收响应、关闭连接。
  • 服务端的功能:注册服务、接收请求(接收连接、接收请求)、发送响应、关闭连接。

其实根据上面可以发现,服务端和客户端所做的事情是对应的,是一个镜像的关系。所以我们就是对应放在一起讲。

注意注意注意⚠️:

  1. 示例中的代码为了方便理解,我只摘取了主要逻辑,且做了简略,具体的实现可以看我放在最后的项目源码。
  2. 这里我们只是简单的实现一个RPC框架,所以我们只是实现了最基本的功能,实际的RPC框架还有很多功能,比如:熔断、限流、监控等等,这些功能可以根据实陫的需求来实现扩展。

1 发现服务、注册服务

注册服务:服务端想告诉别人我提供了哪些服务(接口的方法),我的地址是什么。
发现服务:客户端需要知道我调用的一些服务(接口的方法)有哪些地址(ip + 端口)可以调用。

服务发现和注册的方式有很多种,比如:zookeeper、nacos、consul、etcd等等。本次我们以zookeeper为例。

注册服务代码示例:

```java
private static CuratorFramework client;

    // 这里使用Curator框架来操作zookeeper
    public ZookeeperRegistryCenter() {
       final var zookeeper = PROPERTIES_THREAD_LOCAL.get().getRegistry().getZookeeper();

       RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
       final var builder = CuratorFrameworkFactory.builder()
               .connectString(zookeeper.getAddress())
               .namespace(zookeeper.getRootPath());
       client = builder.build();
    }
    // 创建一个zk客户端
    private static void create(String path, CreateMode mode) throws Exception {
        client.create()
                .creatingParentsIfNeeded()
                .withMode(mode)
                .forPath(path);
    }
```

发现服务代码示例:

```java
// 发现服务,只要监听注册中心的变化
    public void watch() {

        // 观察者模式,监听注册中心的变化
        registryCenter.watch((change, providerInfo) -> {
            switch (change) {
                case Change.ADD -> addServiceAddress(providerInfo);
                case Change.UPDATE -> updateServiceAddress(providerInfo);
                case Change.REMOVE -> deleteServiceAddress(providerInfo);
            }
        });
    }

    private void addOrUpdateServiceAddress(String methodStr, Pair address) {
        // 这里使用SERVICE_ADDRESS_MAP(ConcurrentHashMap)本地缓存服务地址,key是接口名+方法名,value是服务地址
        SERVICE_ADDRESS_MAP.computeIfAbsent(methodStr, _ -> new CopyOnWriteArraySet<>())
                .add(address);
    }
```

2 请求、接收

请求代码示例:

```java
// 请求
    public Object send(RpcRequestMessage msg, Method method, Set> addressSet) throws LRPCTimeOutException {
        // 负载均衡选择服务地址
        final var address = clazzToAddress(method, addressSet);
        // 获取连接池
        final var channelPool = getChannelPool(address);
        // 在连接池中执行请求
        return channelManager.executeWithChannelPool(channelPool, channelExeFunction, msg);
    }
```

2.1 负载均衡

负载均衡:客户端在发现了服务的地址之后,可能有多个服务的地址,这时候需要做负载均衡,选择一个服务的地址来调用。

```java
// 选择服务地址,负载均衡
    private Pair clazzToAddress(Method method, Set> addressSet) {
        if (addressSet != null && !addressSet.isEmpty()) {
            // 若指定了服务地址,则在指定的服务地址中选择
            return loadBalancer.selectServiceAddress(method, addressSet);
        }
        addressSet = serviceManager.getServiceAddress(method);
        // 若未指定服务地址,则在注册中心的服务地址中选择
        return loadBalancer.selectServiceAddress(method, addressSet);
    }
```

2.2 发起连接、接收连接

因为我们的rpc的调用会比较频繁,所以我们需要保持长连接,避免频繁的创建连接和断开,这里我们使用连接池来管理连接。

发起连接:客户端在知道了服务的地址之后,需要和服务端建立连接,建立连接后,再发送请求。

接收连接:服务端需要接收客户端的连接,接收到连接后,再接收请求。

```java
private FixedChannelPool getChannelPool(Pair address) {
        final var host = address.left;
        final var port = address.right;
        return serviceManager.getChannelPool(address,
                // 创建连接池
                _ -> LrpcChannelPoolFactory.createFixedChannelPool(host, port, lrpcProperties.getClient().getAddressMaxConnection()));
    }
    public FixedChannelPool getChannelPool(Pair address, Function mappingFunction) {
        final var host = address.left;
        final var port = address.right;
        return ADDRESS_POOL_MAP.computeIfAbsent(host + ":" + port, mappingFunction);
    }
```

接收连接其实就是bossGroup的处理逻辑,这里就不贴代码了,可以看最后我贴的项目源码。

2.3 发送请求、接收请求

发送请求 :客户端在建立连接后,在调用服务的方法时,需要发送报文体,发送本地需要保存请求ID和Promise(用于接收调用结果,netty包装一层的future)的映射关系,用来接收响应时,根据请求ID找到对应的请求。

接收请求 :服务端在接收到客户端的连接后,需要接收到客户端的请求,解析请求,调用对应的方法。

我们本次使用自定义协议,所以需要约定好报文体的格式

```
报文体:16字节协议约定内容 + 请求体;

16字节协议约定内容:

  (1):4个字节的长度来表示协议的魔数:就是一个固定的值,用来标识这是我们自定义的协议,这里使用'L'、'R'、'P'、'C'。

  (2):1个字节的版本号:标识这个协议的版本号,这里因为是第一个版本,所以使用1。

  (3):1个字节的序列化算法:标识这个协议使用的序列化算法,对应了序列化算法在枚举中的数组下标,这里使用的是0,表示使用JSON序列化。

  (4):4个字节的请求ID:标识这个请求的ID,用来标识这个请求的唯一性,这里使用UUID生成,可以在客户端和服务端都保存一个Map,用来保存请求ID和请求的映射关系。

  (5):1个字节的消息类型:标识这个消息的类型,是请求还是响应,这里使用1表示请求消息,2表示响应消息。

  (6):4个字节的请求体的长度:使用Integer类型,表示请求体的长度,在接收请求时,根据这个长度来解析请求体。

  (7):1个字节的补充位;无实际意义,只是为了对齐16字节。

请求体:序列化后转成字节数组,内容有:接口名 + 方法名 + 返回参数类型 + 请求参数类型数组 + 请求参数值数组。
```

按刚刚上面约定好的协议格式解析,然后将请求体的内容反序列化,得到消息类型,使用LengthFieldBasedFrameDecoder解码器,解决粘包和拆包问题,得到请求体的字节数组,然后反序列化,

得到消息后,获取到接口名、方法名、返回参数类型、请求参数类型数组、请求参数值数组,使用动态代理调用对应的方法,得到返回值。

```java
public  T getProxy(Class clazz, Set> serviceAddress) {
        // 使用代理的方式,调用方法
        final var proxyInstance = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, (proxy, method, args) -> {
            RpcRequestMessage msg = buildRpcRequestMessage(clazz, method, args);
            return consumerManager.send(msg, method, serviceAddress);
        });
        return clazz.cast(proxyInstance);
    }

    public Object executeWithChannelPool(ChannelPool channelPool,
                                                  BiFunction> function,
                                                  RpcRequestMessage msg) throws LRPCTimeOutException {
        // 1. 从连接池中获取连接,等待超市时间,未获取连接则抛出异常
        final Future future = channelPool.acquire();
        Channel channel = future.get();
        final var promise = function.apply(channel, msg);
        try {
            return getResult(promise, msg.getMessageId());
        } finally {
            // 这里的释放需要放在拿到结果之后,否则会导臃连接被释放
            channelPool.release(channel);
        }
    }

    private static BiFunction> channelExeFunction() {
        // 发送请求,且处理写失败
        return (channel, msg) -> {
            final var promise = new DefaultPromise<>(channel.eventLoop());
            RpcRespHandler.addPromise(msg.getMessageId(), promise);
            // 发送请求,且处理写失败
            final var channelFuture = channel.writeAndFlush(msg);
            channelFuture.addListener(processAftermath(promise, msg));
            return promise;
        };
    }
```

接收处理请求

```java
@Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage msg) {

        log.info("接收到消息 {}", JSON.toJSON(msg));
        final var interfaceName = msg.getInterfaceName();
        final var methodName = msg.getMethodName();

        // 根据接口名获取服务的本地实例
        final var service = serviceManager.getService(interfaceName);

        final var response = new RpcResponseMessage();
        response.setMessageId(msg.getMessageId());
        try {
            // 使用反射调用方法
            final Class aClass = service.getClass();
            final var method = aClass.getMethod(methodName, msg.getParameterTypes());
            final var result = method.invoke(service, msg.getParameterValues());
            response.setReturnValue(result);
        } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
            log.error("e : ", e);
            response.setExceptionValue(new Error(e.getCause().getMessage()));
        }

        // 以下属于发送响应的逻辑
        ctx.writeAndFlush(response).addListener(future -> {
            if (future.isSuccess()) {
                log.info("消息响应成功 {}", JSON.toJSON(msg));
                return;
            }
            log.error("发送消息时有错误发生: ", future.cause());
        });
    }
```

3 发送响应、接收响应

得到第6步的返回值后,需要将返回值封装成响应报文体,发送给客户端。

这里发送响应的方式其实是和发送请求的方式是一样的,只是消息类型不一样,这里是响应消息。
客户端接收到响应后,根据请求ID找到对应的请求,将响应的内容返回给调用方。

发送响应

```java
// 在刚刚接收请求处理的channelRead0函数中,处理发送响应的逻辑
        ctx.writeAndFlush(response).addListener(future -> {
            if (future.isSuccess()) {
                log.info("消息响应成功 {}", JSON.toJSON(msg));
                return;
            }
            log.error("发送消息时有错误发生: ", future.cause());
        });
```

接收响应

```java
private static Object getResult(Promise promise, Integer messageId) throws LRPCTimeOutException {
        try {
            // 超时等待
            if (promise.await(5, TimeUnit.SECONDS)) {
                if (promise.isSuccess()) {
                    return promise.getNow();
                } else {
                    throw new RuntimeException(promise.cause());
                }
            } else {
                throw new LRPCTimeOutException("请求超时");
            }
        } catch (InterruptedException e) {
            throw new RuntimeException("操作被中断", e);
        } finally {
            // 确保 promise 被移除
            RpcRespHandler.removePromise(messageId);
        }
    }
```

4 关闭连接

关闭连接:客户端和服务端在完成请求和响应后,会把连接放回连接池,等待下一次的调用,等连接池关闭时,会关闭连接,服务端感应到连接关闭,会关闭连接。

怎么将虚拟线程和Netty结合起来

分析

前面我们说过,虚拟线程适合高并发、IO密集型的场景,可以在一个线程中运行多个虚拟线程,减少线程的创建和销毁,提高性能。

看一下netty的服务端网络通信的架构简图:

【手写 RPC】使用netty手写一个RPC框架 结合新特性 虚拟线程

在netty中,一个NioEventLoop中有一个Selector,一个Selector可以注册多个Channel,一个Channel对应一个连接,一个线程可以处理多个连接,这就是netty的高性能的原因。
在每次循环中,Selector就会阻塞监听Channel的事件,当有事件发生时,就会处理这个事件。
所以在这过程中,线程的数量,影响着Selector的数量,影响着Channel的数量,但是在传统的线程中,线程的数量是有限的,所以这就限制了Selector的数量,影响着Channel的数量,影响着性能,
所以我们可以使用虚拟线程来解决这个问题,虚拟线程可以在一个线程中运行多个虚拟线程,且虚拟线程会在其中一个虚拟线程阻塞时,会切换到其他虚拟线程,且没有系统级别的上下文切换,所以可以带来更高的性能。
所以我们这里主要是改变workerGroup的线程模型,使用虚拟线程来代替workerGroup里的传统的线程。

实现

根据netty的NioEventGroup的源码,线程来自三个地方:

  1. 构造函数的入参的线程工厂;
  2. 构造参数的入参的executor;
  3. 父类io.netty.channel.MultithreadEventLoopGroup#newDefaultThreadFactory()方法返回的线程工厂;
    这里我们以重写父类的newDefaultThreadFactory()方法为例,来实现虚拟线程。

    ```java
    private NioEventLoopGroup getWorker() {
    final var workerMax = lrpcProperties.getServer().getWorkerMax();
    // 创建workerGroup
    return new NioEventLoopGroup(workerMax) {
    // 直接在创建的时候重写newDefaultThreadFactory()方法
    @Override
    protected ThreadFactory newDefaultThreadFactory() {
    return new VirtualThreadFactory(NioEventLoopGroup.class, Thread.MAX_PRIORITY);
    }
    };
    }

    // 这里是重写的ThreadFactory
    public class VirtualThreadFactory extends DefaultThreadFactory {
    public VirtualThreadFactory(Class poolType, int priority) {
    super(poolType, priority);
    }

    @Override
    protected Thread newThread(Runnable r, String name) {
    // 这里使用FastThreadLocalThread,是因为FastThreadLocalThread是netty提供的一个线程,里面的方法有些功能,所以我们这里直接继承它,然后重写start()方法
    return new FastThreadLocalThread(threadGroup, r, name){
    // 这里的Thread.ofVirtual().unstarted(this)是创建一个虚拟线程
    @Override
    public void start() {
    final var unstarted = Thread.ofVirtual().unstarted(this);
    unstarted.setName(this.getName());
    unstarted.start();
    }
    };
    }
    }
    ```

总结

本次我们实现了一个简单的RPC框架,使用了netty作为底层通信框架,使用了zookeeper作为服务发现和注册中心,使用了虚拟线程代替服务端的workerGroup的线程模型,扩展了可管控的Selector的数量,且在线程的切换上,没有系统级别的上下文切换,提高了性能。

这里只是一个简单的实现,实际的RPC框架还有很多功能,比如:熔断、限流、监控等等,这些功能可以根据实陫的需求来实现,而且在实际的实现过程中,还会遇到很多问题,比如:序列化和反序列化扩展、线程安全问题等等,都值得我们去深入研究。
这里分享一下我的实现的代码,麻烦老哥们帮忙点个star 😭 ,谢谢!有问题可以留言,我会在第一有空闲的时间回复。
项目地址:JGZHAN/lrpc 戳这里去点star
【手写 RPC】使用netty手写一个RPC框架 结合新特性 虚拟线程

文章整理自互联网,只做测试使用。发布者:Lomu,转转请注明出处:https://www.it1024doc.com/5887.html

(0)
LomuLomu
上一篇 2025 年 1 月 11 日
下一篇 2025 年 1 月 11 日

相关推荐

  • Discord技术架构调研(IM即时通讯技术架构分析)

    一、目标 调研 discord 的整体架构,发掘可为所用的设计思想 二、调研背景 Discord作为目前比较火的一个在线聊天和语音通信平台且具有丰富的功能。另外其 “超级”群 概念号称可支持百万级群聊 以及 永久保留用户聊天记录。探究其相关技术架构与技术实现 三、产品介绍 目前广泛使用的在线聊天和语音通信平台。最初于2015年发布,旨在为游戏社区提供一个交流…

    2025 年 1 月 14 日
    29400
  • 全网最适合入门的面向对象编程教程:60 Python面向对象综合实例-传感器数据实时绘图器

    全网最适合入门的面向对象编程教程:60 Python 面向对象综合实例-传感器数据实时绘图器 摘要: 本文将结合之前内容实现模拟一个传感器系统软件,包括三个线程:传感器线程生成数据并通过串口发送给主机进程;主机进程通过串口接收指令,进行数据滤波和处理后,将处理结果发送给绘图线程;绘图线程负责接收数据并绘制更新数据曲线。 原文链接: FreakStudio的博…

    2024 年 12 月 24 日
    23900
  • Java刷题常见的集合类,各种函数的使用以及常见的类型转化等等

    目录 前言 集合类 ArrayList 1. 创建和初始化 ArrayList 2.添加元素 add 3.获取元素 get 4.删除元素 remove 5.检查元素 6.遍历 ArrayList LinkedList Stack 1. 创建Stack对象 2. 压入元素 (push) 3. 弹出元素 (pop) 4. 查看栈顶元素 (peek) 5. 检查栈…

    2025 年 1 月 1 日
    49500
  • ORM框架与数据库交互

    — title: ORM框架与数据库交互 date: 2024/12/22 updated: 2024/12/22 author: cmdragon excerpt: 对象关系映射(ORM)框架是连接数据库与编程语言的桥梁,它极大地简化了两者之间的交互。通过ORM,开发者能够以面向对象的方式处理数据库操作,避免了直接编写SQL语句的繁琐,从而提升开发效率…

    未分类 2024 年 12 月 27 日
    27500
  • JetBrains 官方正版账号,全家桶全版本全平台都可激活

    官方授权,激活你自己的JetBrains账号,1年只要66元! 平均一个月不到6块钱!(共享单车一个月都要9.9元了!) 有一个账号有什么优势? 激活您自己的专属账号(官网可查) 不限制任何版本,所有版本登录账号即可使用 不限制任何产品,无论是IDEA、DataGrip、PyCharm、WebStorm等17个工具都直接使用 不需要任何配置,登录你的账号就可…

    2024 年 6 月 22 日
    1.9K00

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信