分布式网络通信框架Netty

mac2026-02-14  11

                                   分布式网络通信框架Netty

                            基于Netty+Zookeeper纯手写RPC远程调用框架

                                              1 Dubbo服务器-服务注册实现

                                                                                                                                                                                                田超凡

                                                                                                                                                                                  2019年11月2日

转载请注明原作者

1 Dubbo实现原理 

什么是Dubbo

是一款高性能、轻量级的开源Java RPC框架,它提供了三大核心能力:面向接口的远程方法调用,智能容错和负载均衡,以及服务自动注册和发现,基于Zookeeper作为注册中心实现服务治理等。

Dubbo官网:http://dubbo.apache.org/zh-cn/

 

 

Dubbo底层实现原理

服务提供者(Provider):暴露服务的服务提供方,服务提供者在启动时,向注册中心注册自己提供的服务。

服务消费者(Consumer): 调用远程服务的服务消费方,服务消费者在启动时,向注册中心订阅自己所需的服务,服务消费者,从提供者地址列表中,基于软负载均衡算法,选一台提供者进行调用,如果调用失败,再选另一台调用。

注册中心(Registry):注册中心返回服务提供者地址列表给消费者,如果有变更,注册中心将基于长连接推送变更数据给消费者

监控中心(Monitor):服务消费者和提供者,在内存中累计调用次数和调用时间,定时每分钟发送一次统计数据到监控中心

 

完全手写Dubbo需要用到的相关技术知识

RPC远程调用的概念和实现原理,主流RPC远程调用框架:

dubbo、dubbox、HttpClient、grpc、Feign和REST、AlibabaCloud(Naxio)

Zookeeper注册中心实现原理

Netty实现dubbo生产者和消费者(服务器和客户端),基于TCP长连接实现RPC远程调用

多线程、网络编程基础、Socket网络通信、IO处理模型(BIO/NIO/AIO/IOM/ROM)

观察者模式、策略模式优化代码实现细节,提高复用性和扩展性

TCP粘包拆包、网络传输对象数据序列化和反序列化(JSON/MessagePack/Netty编码解码器/Google ProtoBuf)

Java反射和类加载器

 

快速搭建Dubbo+Zookeeper基本环境的步骤

创建dubbo生产者Provider

创建dubbo消费者Consumber

启动zookeeper注册中心,连接dubbo服务

实现服务注册和发现,实现RPC远程调用

 

Dubbo整体架构实现原理分析

Provider生产者,发布注册服务到zookeeper注册中心

Consumer消费者,通过观察者模式订阅zookeeper注册中心,获取zookeeper注册中心的服务,实现服务调用

Registry注册中心,主要使用zookeeper注册中心,实现服务注册与服务治理

Monitor监控中心,监控服务调用次数和失败错误信息

 

基于Netty+Zookeeper手写RPC远程调用框架,实现服务注册

搭建Dubbo Provider - Netty服务器端

搭建Dubbo Consumer - Netty客户端

搭建Registration - 基于反射实现服务注册和访问地址拼接,注册到zookeeper

 

Dubbo支持那些协议

Dubbo支持dubbo、rmi、hessian、http、webservice、thrift、redis等多种协议,但是Dubbo官网是推荐我们使用Dubbo协议的。

 

2 Dubbo环境搭建

模块划分:

member_provider_api --- 会员服务提供公共api

member_provider_impl--- 会员服务提供公共实现

member_impl_consumer---订单服务消费者

API接口

public interface UserService {    String getUserName(Long userId);}

生产者

public class UserServiceImpl implements UserService {    public String getUserName(Long userId) {        return "mayikt";    }}

 

生产者配置文件 provider.xml

<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"       xsi:schemaLocation="http://www.springframework.org/schema/beans       http://www.springframework.org/schema/beans/spring-beans.xsd       http://code.alibabatech.com/schema/dubbo       http://code.alibabatech.com/schema/dubbo/dubbo.xsd">    <!--定义了提供方应用信息,用于计算依赖关系;在 dubbo-admin 或 dubbo-monitor 会显示这个名字,方便辨识 -->    <dubbo:application name="mayiktp"/>    <!--使用 zookeeper 注册中心暴露服务,注意要先开启 zookeeper -->    <dubbo:registry address="zookeeper://localhost:2181"/>    <!-- 用dubbo协议在20880端口暴露服务 -->    <dubbo:protocol name="dubbo" port="20880"/>    <!--使用 dubbo 协议实现定义好的 api.PermissionService 接口 -->    <dubbo:service interface="com.mayikt.api.service.UserService"                   ref="userService" protocol="dubbo"/>    <!--具体实现该接口的 bean -->    <bean id="userService" class="com.mayikt.api.service.impl.UserServiceImpl"/></beans>

 

 

启动生产者

public class Provider {    public static void main(String[] args) throws Exception {        System.setProperty("java.net.preferIPv4Stack", "true");        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"provider.xml"});        context.start();        System.out.println("Provider started.");        System.in.read(); // press any key to exit    }}

 

消费者

public class Consumer {    public static void main(String[] args) throws Exception {        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"consumer.xml"});        context.start();        //获取远程服务代理        UserService userService = (UserService) context.getBean("userService");        // 执行远程方法        String userName = userService.getUserName(1L);        // 现实调用结果        System.out.println(userName);    }}

 

 

消费者配置文件:consumer.xml

<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd       http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd">    <dubbo:application name="order-consumer"/>    <!--向 zookeeper 订阅 provider 的地址,由 zookeeper 定时推送 -->    <dubbo:registry address="zookeeper://localhost:2181"/>    <!--使用 dubbo 协议调用定义好的 api.PermissionService 接口 -->    <dubbo:reference id="userService"                     interface="com.mayikt.api.service.UserService"/></beans>

 

 

3 基于Netty+Zookeeper手写Dubbo服务器

3.1 手写Dubbo整体步骤流程

将dubbo-admin放入到tomcat webapps目录下 运行  修改dubbo.properties zk连接地址即可。

 

3.2 手写Dubbo整体步骤流程

创建Netty工程

netty-dubbo-common--- Netty服务器端

  ----自定义@RpcAnnotation

  --- 服务注册与发现ServiceRegistration

  ---  rpc MayiktRpcServer

netty-dubbo-client ---Netty客户端

 

member-service-provider-api  ----生产者

member-provider-impl  ----消费者

 

netty-dubbo-server

RpcAnnotation

@Documented@Inherited@Target({ElementType.TYPE})@Retention(RetentionPolicy.RUNTIME)public @interface RpcAnnotation {    Class value();}

 

 

ServiceRegistration

 

public interface ServiceRegistration {     /**     * 服务注册     *     * @param serverName     * @param serviceClassAddres     */    void regist(String serverName, String serviceClassAddres);}

 

 

 

public class ServiceRegistrationImpl implements ServiceRegistration {     /**     * zk连接地址     */    private final String zkServers = "127.0.0.1";     /**     * 会话时间     */    private final int connectionTimeout = 5000;     /***     * zkClient     */    private ZkClient zkClient;     /**     * dubbo服务注册根路径名称     */    private String serviceParentName = "/mayikt_dubbo";     /**     * 注册我们的providers     */    private String providers = serviceParentName + "/providers";     public ServiceRegistrationImpl() {         // 初始化zk连接        zkClient = new ZkClient(zkServers, connectionTimeout);    }     public void regist(String serverName, String serviceClassAddres) {         //在zk上创建dubbo节点  /dubbo        if (!zkClient.exists(serviceParentName)) {             // 创建持久根节点            zkClient.createPersistent(serviceParentName);        }         // 注册我们的服务接口 /dubbo/com.mayikt.service.api.UserService        String zkserviceClassAddresPath = serviceParentName + "/" + serverName;         if (!zkClient.exists(zkserviceClassAddresPath)) {             zkClient.createPersistent(zkserviceClassAddresPath);        }         // 注册我们的服务地址  /dubbo/com.mayikt.service.api.UserService/providers        String providersNodePath = zkserviceClassAddresPath + "/providers";         if (!zkClient.exists(providersNodePath)) {             zkClient.createPersistent(providersNodePath);        }         // 在providersNodePath下创建我们的服务发布的地址   需要将url实现转移        String serviceNodePath = providersNodePath + "/" + URLEncoder.encode(serviceClassAddres);         if (zkClient.exists(serviceNodePath)) {             // 将该节点删除            zkClient.delete(serviceNodePath);        }         // 创建我们的临时节点        zkClient.createEphemeral(serviceNodePath);    }}

 

 

RpcServer

 

public class RpcServer{     /**     * 注册服务     */    private ServiceRegistration serviceRegistration;     /**     * 服务注册端口号     */    private int port;     /**     * 服务地址     */    private String host;     public MayiktRpcServer(String host, int port) {         serviceRegistration = new ServiceRegistrationImpl();         this.host = host;         this.port = port;    }     public void start(Object services) {        System.out.println("services:" + services);         // 绑定服务        bindService(services);         //发布服务        releaseService();    }     /**     * 发布服务     */    private void releaseService() {         // 用于接受客户端连接的请求 (并没有处理请求)        NioEventLoopGroup bossGroup = new NioEventLoopGroup();         // 用于处理客户端连接的读写操作        NioEventLoopGroup workGroup = new NioEventLoopGroup();         // 用于创建我们的ServerBootstrap        ServerBootstrap serverBootstrap = new ServerBootstrap();        serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)                .childHandler(new ChannelInitializer<SocketChannel>() {                     @Override                     protected void initChannel(SocketChannel socketChannel) throws Exception {                    }                });         //  绑定我们的端口号码        try {             // 绑定端口号,同步等待成功            ChannelFuture future = serverBootstrap.bind(port).sync();            System.out.println("服务注册成功" + port);             // 等待服务器监听端口            future.channel().closeFuture().sync();        } catch (Exception e) {            e.printStackTrace();        } finally {             // 优雅的关闭连接            bossGroup.shutdownGracefully();            workGroup.shutdownGracefully();        }    }     /**     * 绑定服务     *     * @param service     */    private void bindService(Object service) {        UserService userService = new UserService();         RpcAnnotation declaredAnnotation = userService.getClass().getDeclaredAnnotation(RpcAnnotation.class);         if (declaredAnnotation != null) {            String value = declaredAnnotation.value().toString().replace("class ", "");            String serviceClassAddres = "mayikt://" + host + ":" + port + "//" + value;             serviceRegistration.regist(value, serviceClassAddres);        }    }}

 

member-service-provider

public interface UserService {    String getUserName();}

 

 

@RpcAnnotation(UserService.class)public class UserServiceImpl implements UserService {     public String getUserName() {         return "memberInfo";    }}

 

 

补充说明:

1.Dubbo整体架构实现原理步骤可以分为两步:

(1).生产者启动的时候能够实现被其他服务调用,Dubbo服务器使用Netty作为服务器端,当前的地址注册到Zookeeper上,底层实际上还是dubbo协议:

Dubbo:// ip地址+端口号+com.api.service.UserService.getUser(Long)

Netty服务器的端口号设置为20880,这是因为此处Dubbo服务器就是使用Netty进行网络通信的,所以Dubbo服务器就是自定义的Netty服务器,Netty服务器端口即为手写实现的Dubbo服务器端口,实现服务注册。

.消费者从Zookeeper上获取注册地址即可实现服务调用。

2.Zookeeper注册中心实现了对已经注册的服务进行服务治理,起到了类似服务容器的作用,既可以注册和调用服务,也可以进行服务治理。一般情况下Zookeeper和Dubbo服务器都需要搭建集群的,这样做的目的是提高系统容灾性和弹性,不会因为某一个或一些服务出现问题导致其他所有服务都无法使用,采用主从机制,从机定时同步主机信息,通过zkid作为标志用来标识节点数据同步的优先级,zkid值越大表示当前Zookeeper节点的数据状态是最新的。

Zookeeper基于节点和事件处理机制,实现了服务治理。

转载请注明原作者

最新回复(0)