netty源码分析2-1-server启动

mac2024-05-27  36

本文包括以下内容

initAndRegister-createChannel分析initAndRegister-init分析initAndRegister-register分析doBind0分析

sever端启动主要处理都在bind()处理中,其中主要代码如下

AbstractBootstrap

private ChannelFuture doBind(final SocketAddress localAddress) {

final ChannelFuture regFuture = initAndRegister();

final Channel channel = regFuture.channel();

if (regFuture.cause() != null) {

return regFuture;

}

 

final ChannelPromise promise;

if (regFuture.isDone()) {

promise = channel.newPromise();

doBind0(regFuture, channel, localAddress, promise);

} else {

// Registration future is almost always fulfilled already, but just in case it's not.

promise = new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE);

regFuture.addListener(new ChannelFutureListener() {

@Override

public void operationComplete(ChannelFuture future) throws Exception {

doBind0(regFuture, channel, localAddress, promise);

}

});

}

 

return promise;

}

 

bind最主要的两个逻辑处理是 initAndRegister()和 doBind0(), 先来分析一下initAndRegister(),下篇文章将分析doBind0()。

 

分析initAndRegister方法

final ChannelFuture initAndRegister() {

Channel channel;

try {

//创建channel,并配置其主要属性

channel = createChannel();

} catch (Throwable t) {

return VoidChannel.INSTANCE.newFailedFuture(t);

}

 

try {

//初始化channel

init(channel);

} catch (Throwable t) {

channel.unsafe().closeForcibly();

return channel.newFailedFuture(t);

}

//执行 register

ChannelPromise regFuture = channel.newPromise();

channel.unsafe().register(regFuture);

if (regFuture.cause() != null) {

if (channel.isRegistered()) {

channel.close();

} else {

channel.unsafe().closeForcibly();

}

}

return regFuture;

}

initAndRegister主要分为以下部分

createChannelinitregister

下面逐个进行分析

1.createChannel分析

时序图如下

createChannel 就是 创建NioServerSocketChannel的过程,下面简单分析一下它创建过中调用的构造方法

,它详细的各个父类讲解 参照我的netty 系列 netty源码分析-NioServerSocketChannel,NioSocketChannel

 

ServerBootstrap:

Channel createChannel() {

EventLoop eventLoop = group().next();

return channelFactory().newChannel(eventLoop, childGroup);

}

 

ServerBootstrapChannelFactory

@Override

public T newChannel(EventLoop eventLoop, EventLoopGroup childGroup) {

try {

//class=NioSocketChannel.class,调用它的构造方法。

Constructor<? extends T> constructor = clazz.getConstructor(EventLoop.class, EventLoopGroup.class);

return constructor.newInstance(eventLoop, childGroup);

} catch (Throwable t) {

throw new ChannelException("Unable to create Channel from class " + clazz, t);

}

}

//调用NioServerSocketChannel构造方法创建实例

public NioServerSocketChannel(EventLoop eventLoop, EventLoopGroup childGroup) {

//调用父类构造方法

super(null, eventLoop, childGroup, newSocket(), SelectionKey.OP_ACCEPT);

//创建默认配置

config = new DefaultServerSocketChannelConfig(this, javaChannel().socket());

}

 

protected AbstractNioMessageServerChannel(

Channel parent, EventLoop eventLoop, EventLoopGroup childGroup, SelectableChannel ch, int readInterestOp) {

//调用父类构造方法

//此时parent=null 。ch=ServerSocketChannelImp(java原生的)

super(parent, eventLoop, ch, readInterestOp);

//childGroup赋值

this.childGroup = childGroup;

}

protected AbstractNioMessageChannel(

Channel parent, EventLoop eventLoop, SelectableChannel ch, int readInterestOp) {

//调用父类构造方法

super(parent, eventLoop, ch, readInterestOp);

}

 

protected AbstractNioChannel(Channel parent, EventLoop eventLoop, SelectableChannel ch, int readInterestOp) {

//调用父类构造方法

super(parent, eventLoop);

// 赋值 java SocketChannelImpl

this.ch = ch;

this.readInterestOp = readInterestOp;

try {

//配置java.nio.ServerSocketChannel为非阻塞

ch.configureBlocking(false);

} catch (IOException e) {

//异常处理暂时不关注

try {

ch.close();

} catch (IOException e2) {

if (logger.isWarnEnabled()) {

logger.warn(

"Failed to close a partially initialized socket.", e2);

}

}

 

throw new ChannelException("Failed to enter non-blocking mode.", e);

}

}

最后调用

创建unsafe,pipeline,保存父Channel,保存parentGroup 中 的eventLoop

protected AbstractChannel(Channel parent, EventLoop eventLoop) {

this.parent = parent;// 保存父Channel, 当前 Channel==null

this.eventLoop = validate(eventLoop);// 非空和兼容校验,保存parentGroup 中 的eventLoop

unsafe = newUnsafe();//初始化unsafe××重要×××

pipeline = new DefaultChannelPipeline(this);//初始化pipeline××重要×××

}

 

AbstractNioChannel

//兼容校验

protected boolean isCompatible(EventLoop loop) {

return loop instanceof NioEventLoop;

}

小结:createChannel创建了 unsafe,pipeline,java.nio.ServerSocketChannel,然后为其属性eventLoop,childGroup,readInterestOp,config 赋值。

2.init分析

void init(Channel channel) throws Exception {

final Map<ChannelOption<?>, Object> options = options();

//options value= {SO_BACKLOG=128}

synchronized (options) {

channel.config().setOptions(options);

}

final Map<AttributeKey<?>, Object> attrs = attrs();

//attrs={}

synchronized (attrs) {

for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {

@SuppressWarnings("unchecked")

AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();

channel.attr(key).set(e.getValue());

}

}

ChannelPipeline p = channel.pipeline();

if (handler() != null) {

p.addLast(handler());

}

final ChannelHandler currentChildHandler = childHandler;

final Entry<ChannelOption<?>, Object>[] currentChildOptions;

final Entry<AttributeKey<?>, Object>[] currentChildAttrs;

//childOptions={SO_KEEPALIVE=true}

//获取 ServerBootstrap的配置参数

synchronized (childOptions) {

currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));

}

//childAttrs={}

synchronized (childAttrs) {

currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));

}

//ChannelInitializer的作用:定义initChannel方法,便于添加handler。

//ServerBootstrapAcceptor的主要方法channelRead获取了ServerBootstrap的参数,貌似ServerBootstrapAcceptor作用只是获取ServerBootstrap的参数

p.addLast(new ChannelInitializer<Channel>() {

@Override

public void initChannel(Channel ch) throws Exception {

ch.pipeline().addLast(new ServerBootstrapAcceptor(currentChildHandler, currentChildOptions,

currentChildAttrs));

}

});

}

init方法 收集了ServerBootstrap中 配置的参数和childHander,传递给ServerBootstrapAcceptor。

 

ServerBootstrapAcceptor

@Override

@SuppressWarnings("unchecked")

public void channelRead(ChannelHandlerContext ctx, Object msg) {

Channel child = (Channel) msg;

//child是 NioSocketChannel

child.pipeline().addLast(childHandler);//在child的pipeline中加入handler

// 获取ServerBootstrap中配置的属性

for (Entry<ChannelOption<?>, Object> e: childOptions) {

try {

if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {

logger.warn("Unknown channel option: " + e);

}

} catch (Throwable t) {

logger.warn("Failed to set a channel option: " + child, t);

}

}

// 获取ServerBootstrap中配置的属性

for (Entry<AttributeKey<?>, Object> e: childAttrs) {

child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());

}

//初始化注册 ,然后 执行 更新网络标识位

child.unsafe().register(child.newPromise());

}

 

ServerBootstrapAcceptor的channelRead方法将在客户端channel初始过程 获取ServerBootstrap中配置的参数和自定义的childHandler,并执行客户端channel 的注册 。

currentChildHandler 就是自定义的childHandler ,在一个标准的 childHandler 对应下面部分

 

ChannelInitializer分析

public abstract class ChannelInitializer<C extends Channel> extends ChannelHandlerAdapter {

//子类实现 通常用于添加handler

protected abstract void initChannel(C ch) throws Exception;

//在chnnel执行register()时调用

@Override

@SuppressWarnings("unchecked")

public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {

ChannelPipeline pipeline = ctx.pipeline();

boolean success = false;

try {

//调用子类实现方法, 通常用于添加handler

initChannel((C) ctx.channel());

//使用完毕 移除当前handler

pipeline.remove(this);

//传递ChannelRegistered事件,调用其他处理ChannelRegistered事件的handler

//实现事件传递关键的步骤,如果不写,则事件终止

ctx.fireChannelRegistered(); // 最终执行了TailHandler.channelRegistered(),什么也没做。

success = true;

} catch (Throwable t) {

logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);

} finally {

if (pipeline.context(this) != null) {

pipeline.remove(this);

}

if (!success) {

ctx.close();

}

}

}

}

 

ChannelInitializer在server启动-initAndRegister-init()中添加ServerBootstrapAcceptor和用户添加自定义handler时用到.

ChannelInitializer:用于添加handler,在channel执行register()时调用

小结:init()将ServerBootstrap传递给channel(),并添加了一个访问ServerBootstrap属性的handler

 

3.register分析

 

看一下这两行代码

//创建ChannelPromise,主要用于判断异步执行是否完成

ChannelPromise regFuture = channel.newPromise();

//注册channel到EventLoop上,需要重点分析。

channel.unsafe().register(regFuture);

调用 AbstractChannel.register()

@Override

public final void register(final ChannelPromise promise) {

if (eventLoop.inEventLoop()) {// 判断reactor线程是否启动

register0(promise);

} else {

try {//sever启动 在这里执行。

eventLoop.execute(new Runnable() {

@Override

public void run() {

register0(promise);

}

});

} catch (Throwable t) {

//...

}

}

}

//register处理逻辑 的真正 实现

private void register0(ChannelPromise promise) {

try {

//...

doRegister();//由子类实现

registered = true;

promise.setSuccess();//记录执行成功

pipeline.fireChannelRegistered();//触发注册事件,使用pipeline 传递

if (isActive()) {

pipeline.fireChannelActive();//触发active事件,使用pipeline传递

}

} catch (Throwable t) {

//...

}

}

分析得出 register方法的执行流程图

sever端启动第一次执行register时,reactor线程没有启动。注意Register任务在当前reactor线程中是与IO事件交替串行执行(会在NioEventLoop篇详细讲解)。

首次执行register方法,如果当前reactor线程是否启动,执行线程启动, 添加Register任务,首先调用原生NIO API执行register,初始化网络标识位值为0(如SelectionKey.OP_ACCEPT),接着 触发ChannelRegistered事件,这个事件最后将 init()中 的ServerBootstrapAcceptor 添加到了 server channel 的pipeline 中。 最后根据isActive()判断是否触发 channelActive事件。

isActive()在serve启动的时候 检查是否执行了bind(),在初始化客户端channel时检查 是否打开并连接。

在这里 channelActive事件不会触发,稍后bind()中会触发,再详讲。

ChannelRegistered事件的执行流程如下。

 

小结: register启动了reactor线程,提交了register任务,该任务 执行注册,初始化网络标识位为0,触发了ChannelRegistered,将 init()中 的ServerBootstrapAcceptor 添加到了 server channel 的pipeline 中,

 

OK initAndRegister 分析完毕

总结:initAndRegister 创建了NioServerSocketChannel,初始化了ServerBootstrapAcceptor,注册channel到parentGroup 的 EventLoop上 。

4.doBind()分析

来看AbstractBootstrap-doBind()

private ChannelFuture doBind(final SocketAddress localAddress) {

final ChannelFuture regFuture = initAndRegister();

final Channel channel = regFuture.channel();

if (regFuture.cause() != null) {

return regFuture;

}

final ChannelPromise promise;

//isDone()判断的是register是否执行完

//如果执行完成则同步执行doBind0,否则添加handler等待 触发。

//创建promise并返回,方法后续同步,异步,添加监听器等操作

if (regFuture.isDone()) {

promise = channel.newPromise();

doBind0(regFuture, channel, localAddress, promise);

} else {

// Registration future is almost always fulfilled already, but just in case it's not.

promise = new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE);

//这里添加了一个handler用于执行doBind0,具体调用以后分析。

regFuture.addListener(new ChannelFutureListener() {

@Override

public void operationComplete(ChannelFuture future) throws Exception {

doBind0(regFuture, channel, localAddress, promise);

}

});

}

return promise;

}

 

 

//跟踪代码发现,调用了AbstractUnsafe的bind()方法

调用信息:AbstractChannel$AbstractUnsafe$2.<init>(AbstractChannel$AbstractUnsafe)

@Override

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {

boolean wasActive = isActive();

try {

//由子类实现

doBind(localAddress);//代码A

} catch (Throwable t) {

promise.setFailure(t);

closeIfClosed();

return;

}

if (!wasActive && isActive()) {

invokeLater(new Runnable() {

@Override

public void run() {

//最后调用HeadHandler.read,更新了 interestOps=SelectionKey.OP_ACCEPT。

pipeline.fireChannelActive();//只有绑定后才会执行

}

});

}

promise.setSuccess();

}

代码A调用NioServerSocketChannel-doBind(SocketAddress)

protected void doBind(SocketAddress localAddress) throws Exception {

javaChannel().socket().bind(localAddress, config.getBacklog());

}

很明显 调用了原生NIO API的bind。

分析得出服务端channel bind 流程图,如下。

 

在NioServerSocketChannel.doBind中 调用原生NIO API执行了bind(),然后触发了ChannelActive事件。

ChannelActive事件的执行流程如下

ChannelActive事件 触发后最主要的处理就是把网络标志位从0修改为SelectionKey.OP_ACCEPT或SelectionKey.OP_READ。当完成这步后,就可以接受客户端连接了。

小结:doBind调用原生NIO API执行了bind(),触发ChannelActive事件,把网络标志位从0修改为SelectionKey.OP_ACCEPT。

OK到此netry server启动部分就分析完了。

 

整理出netry server启动流程如下

总结:server启动整个流程, 先创建了NioServerSocketChannel,启动主reactor 也就是NioEventLoop,执行注册,初始化网络标识位为0,触发ChannelRegistered事件,添加ServerBootstrapAcceptor handler,然后执行bind(),调用原生NIO API执行了bind(),触发ChannelActive事件,把网络标志位从0修改为SelectionKey.OP_ACCEPT。

 

 

 

 

最新回复(0)