Netty源码解析 - 服务端启动流程

mac2025-12-14  6

1. 概述

Netty是一个稳定、高性能NIO通信框架,它对JDK NIO的使用做了很好的封装,对使用者屏蔽了NIO通信的底层细节,对使用NIO降低业务开发工作量,降低开发难度

2. Netty IO Reactor模型

3. Netty服务端启动流程

4. Netty组件分析

4.1 EventLoopGroup

Netty处理IO请求线程池,管理一组线程处理IO请求。启动一个Netty服务,会初始化两个线程组,主线程组和工作线程组。主线程组会启动一个线程,接收客户端请求,将接收到的连接注册到工作线程组一个线程,交给工作组线程处理具体IO读写操作;工作线程组会启动一组线程,处理具体IO读写工作。

线程组线程初始化工作是在构造函数中完成,通过指定线程组大小,初始化线程组管理线程数。

protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); }

初始化的线程会委托给EventExecutorChooser管理,对于IO请求,它会按照一定的策略将IO请求转交给一个具体线程来处理。

EventLoopGroup实际是管理一组EventLoop线程,具体IO处理是交给EventLoop类来处理的。

4.2 NioEventLoop

NioEventLoop是实际处理IO读写的线程,每个NioEventLoop会启动一个线程,循环遍历,处理IO读写任务。NioEventLoop有一个Selector,IO channel会被注册到Selector上。NioEventLoop会维护一个taskQueue队列,分配到的Runnable会放入taskQueue队列中,在循环遍历线程中,会取出Runnable进行处理。

对于每个EventLoop都有一个线程运行,这个线程通过startThread()启动。这个线程的启动有点绕,每个EventLoop持有一个ThreadPerTaskExecutor,ThreadPerTaskExecutor.execute()执行 DefaultThreadFactory.newThread(command).start()启动线程。这个线程会启动一个循环遍历,监听IO读写和顺序执行队列任务。

4.3 NioServerSocketChannel

Channel组件是IO读写通道,NioServerSocketChannel接收客户端连接请求,NioSocketChannel处理连接读写请求。Channel处理IO读写任务委托给Unsafe和ChannelPipeline组件,Unsafe处理Channel读写任务,ChannelPipeline是读写请求处理链,可以在处理链中添加处理逻辑。

protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }

Unsafe是Channel的内部类,可以方便使用Channel持有的资源。对于NioServerSocketChannel,NioMessageUnsafe.read()接收客户端连接请求,然后调用pipeline.fireChannelRead()触发调用链执行,ServerBootstrapAcceptor处理客户端连接请求,将客户端请求连接注册到工作线程组。对于NioSocketChannel,NioByteUnsafe.read()处理IO读写。

4.4 ServerBootstrap

组合使用Nettty各个组件,启动Netty服务。它定义了一个Netty使用的门面,屏蔽了内部复杂的逻辑,方便的启动一个Netty服务。

public final class EchoServer { static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); public static void main(String[] args) throws Exception { // Configure the server. EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); final EchoServerHandler serverHandler = new EchoServerHandler(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(serverHandler); } }); // Start the server. ChannelFuture f = b.bind(PORT).sync(); // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }

ServerBootstrap定义两个EventLoopGroup线程组,一个处理客户端连接请求,一个处理接收连接IO读写请求。调用bind()方法,完成服务端的启动。bind()依次会调用initAndRegister()和doBind0()。

final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); 【1】 init(channel); 【2】 } catch (Throwable t) { } ChannelFuture regFuture = config().group().register(channel); 【3】 if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; } 使用反射生成.channel(NioServerSocketChannel.class)方法指定的NioServerSocketChannel对象

设置channel参数,在channel pipeline中添加ServerBootstrapAcceptor处理逻辑,用于接收客户端请求处理

void init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); } final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0)); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0)); } p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }

将channel添加到EventLoopGroup线程组的NioEventLoop线程上,NioEventLoop线程启动循环遍历,处理channel连接请求。当接收到客户端连接请求,会执行ServerBootstrapAcceptor逻辑,将接收到的请求添加到worker线程组一个NioEventLoop线程上,NioEventLoop线程启动循环遍历处理客户端IO读写请求。每个NioEventLoop线程都有一个Selector,channel注册到Selector上。

4.5 ServerBootstrapAcceptor

作为NioServerSocketChannel处理链中的一个Handler,处理客户端连接请求。NioServerSocketChannel收到客户端连接请求,pipeline.fireChannelRead()调用处理链。 接收到的客户端请求channel会添加到工作线程组,处理IO读写请求。

public void channelRead(ChannelHandlerContext ctx, Object msg) { // 客户端连接channel final Channel child = (Channel) msg; // channel pipeline child.pipeline().addLast(childHandler); // 设置channel参数 setChannelOptions(child, childOptions, logger); for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { // 将channel添加到工作线程组 childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }

相关文档

Netty源码:服务端启动过程
最新回复(0)