本文包括以下内容
initAndRegister-createChannel分析initAndRegister-init分析initAndRegister-register分析doBind0分析sever端启动主要处理都在bind()处理中,其中主要代码如下
AbstractBootstrap
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
final ChannelPromise promise;
if (regFuture.isDone()) {
promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
promise = new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doBind0(regFuture, channel, localAddress, promise);
}
});
}
return promise;
}
bind最主要的两个逻辑处理是 initAndRegister()和 doBind0(), 先来分析一下initAndRegister(),下篇文章将分析doBind0()。
分析initAndRegister方法
final ChannelFuture initAndRegister() {
Channel channel;
try {
//创建channel,并配置其主要属性
channel = createChannel();
} catch (Throwable t) {
return VoidChannel.INSTANCE.newFailedFuture(t);
}
try {
//初始化channel
init(channel);
} catch (Throwable t) {
channel.unsafe().closeForcibly();
return channel.newFailedFuture(t);
}
//执行 register
ChannelPromise regFuture = channel.newPromise();
channel.unsafe().register(regFuture);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
initAndRegister主要分为以下部分
createChannelinitregister下面逐个进行分析
时序图如下
createChannel 就是 创建NioServerSocketChannel的过程,下面简单分析一下它创建过中调用的构造方法
,它详细的各个父类讲解 参照我的netty 系列 netty源码分析-NioServerSocketChannel,NioSocketChannel
ServerBootstrap:
Channel createChannel() {
EventLoop eventLoop = group().next();
return channelFactory().newChannel(eventLoop, childGroup);
}
ServerBootstrapChannelFactory
@Override
public T newChannel(EventLoop eventLoop, EventLoopGroup childGroup) {
try {
//class=NioSocketChannel.class,调用它的构造方法。
Constructor<? extends T> constructor = clazz.getConstructor(EventLoop.class, EventLoopGroup.class);
return constructor.newInstance(eventLoop, childGroup);
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
//调用NioServerSocketChannel构造方法创建实例
public NioServerSocketChannel(EventLoop eventLoop, EventLoopGroup childGroup) {
//调用父类构造方法
super(null, eventLoop, childGroup, newSocket(), SelectionKey.OP_ACCEPT);
//创建默认配置
config = new DefaultServerSocketChannelConfig(this, javaChannel().socket());
}
protected AbstractNioMessageServerChannel(
Channel parent, EventLoop eventLoop, EventLoopGroup childGroup, SelectableChannel ch, int readInterestOp) {
//调用父类构造方法
//此时parent=null 。ch=ServerSocketChannelImp(java原生的)
super(parent, eventLoop, ch, readInterestOp);
//childGroup赋值
this.childGroup = childGroup;
}
protected AbstractNioMessageChannel(
Channel parent, EventLoop eventLoop, SelectableChannel ch, int readInterestOp) {
//调用父类构造方法
super(parent, eventLoop, ch, readInterestOp);
}
protected AbstractNioChannel(Channel parent, EventLoop eventLoop, SelectableChannel ch, int readInterestOp) {
//调用父类构造方法
super(parent, eventLoop);
// 赋值 java SocketChannelImpl
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
//配置java.nio.ServerSocketChannel为非阻塞
ch.configureBlocking(false);
} catch (IOException e) {
//异常处理暂时不关注
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
最后调用
创建unsafe,pipeline,保存父Channel,保存parentGroup 中 的eventLoop
protected AbstractChannel(Channel parent, EventLoop eventLoop) {
this.parent = parent;// 保存父Channel, 当前 Channel==null
this.eventLoop = validate(eventLoop);// 非空和兼容校验,保存parentGroup 中 的eventLoop
unsafe = newUnsafe();//初始化unsafe××重要×××
pipeline = new DefaultChannelPipeline(this);//初始化pipeline××重要×××
}
AbstractNioChannel
//兼容校验
protected boolean isCompatible(EventLoop loop) {
return loop instanceof NioEventLoop;
}
小结:createChannel创建了 unsafe,pipeline,java.nio.ServerSocketChannel,然后为其属性eventLoop,childGroup,readInterestOp,config 赋值。
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options();
//options value= {SO_BACKLOG=128}
synchronized (options) {
channel.config().setOptions(options);
}
final Map<AttributeKey<?>, Object> attrs = attrs();
//attrs={}
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
if (handler() != null) {
p.addLast(handler());
}
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
//childOptions={SO_KEEPALIVE=true}
//获取 ServerBootstrap的配置参数
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
//childAttrs={}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
//ChannelInitializer的作用:定义initChannel方法,便于添加handler。
//ServerBootstrapAcceptor的主要方法channelRead获取了ServerBootstrap的参数,貌似ServerBootstrapAcceptor作用只是获取ServerBootstrap的参数
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ServerBootstrapAcceptor(currentChildHandler, currentChildOptions,
currentChildAttrs));
}
});
}
init方法 收集了ServerBootstrap中 配置的参数和childHander,传递给ServerBootstrapAcceptor。
ServerBootstrapAcceptor
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
Channel child = (Channel) msg;
//child是 NioSocketChannel
child.pipeline().addLast(childHandler);//在child的pipeline中加入handler
// 获取ServerBootstrap中配置的属性
for (Entry<ChannelOption<?>, Object> e: childOptions) {
try {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + child, t);
}
}
// 获取ServerBootstrap中配置的属性
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
//初始化注册 ,然后 执行 更新网络标识位
child.unsafe().register(child.newPromise());
}
ServerBootstrapAcceptor的channelRead方法将在客户端channel初始过程 获取ServerBootstrap中配置的参数和自定义的childHandler,并执行客户端channel 的注册 。
currentChildHandler 就是自定义的childHandler ,在一个标准的 childHandler 对应下面部分
public abstract class ChannelInitializer<C extends Channel> extends ChannelHandlerAdapter {
//子类实现 通常用于添加handler
protected abstract void initChannel(C ch) throws Exception;
//在chnnel执行register()时调用
@Override
@SuppressWarnings("unchecked")
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ChannelPipeline pipeline = ctx.pipeline();
boolean success = false;
try {
//调用子类实现方法, 通常用于添加handler
initChannel((C) ctx.channel());
//使用完毕 移除当前handler
pipeline.remove(this);
//传递ChannelRegistered事件,调用其他处理ChannelRegistered事件的handler
//实现事件传递关键的步骤,如果不写,则事件终止
ctx.fireChannelRegistered(); // 最终执行了TailHandler.channelRegistered(),什么也没做。
success = true;
} catch (Throwable t) {
logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
} finally {
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
if (!success) {
ctx.close();
}
}
}
}
ChannelInitializer在server启动-initAndRegister-init()中添加ServerBootstrapAcceptor和用户添加自定义handler时用到.
ChannelInitializer:用于添加handler,在channel执行register()时调用
小结:init()将ServerBootstrap传递给channel(),并添加了一个访问ServerBootstrap属性的handler
看一下这两行代码
//创建ChannelPromise,主要用于判断异步执行是否完成
ChannelPromise regFuture = channel.newPromise();
//注册channel到EventLoop上,需要重点分析。
channel.unsafe().register(regFuture);
调用 AbstractChannel.register()
@Override
public final void register(final ChannelPromise promise) {
if (eventLoop.inEventLoop()) {// 判断reactor线程是否启动
register0(promise);
} else {
try {//sever启动 在这里执行。
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
//...
}
}
}
//register处理逻辑 的真正 实现
private void register0(ChannelPromise promise) {
try {
//...
doRegister();//由子类实现
registered = true;
promise.setSuccess();//记录执行成功
pipeline.fireChannelRegistered();//触发注册事件,使用pipeline 传递
if (isActive()) {
pipeline.fireChannelActive();//触发active事件,使用pipeline传递
}
} catch (Throwable t) {
//...
}
}
分析得出 register方法的执行流程图
sever端启动第一次执行register时,reactor线程没有启动。注意Register任务在当前reactor线程中是与IO事件交替串行执行(会在NioEventLoop篇详细讲解)。
首次执行register方法,如果当前reactor线程是否启动,执行线程启动, 添加Register任务,首先调用原生NIO API执行register,初始化网络标识位值为0(如SelectionKey.OP_ACCEPT),接着 触发ChannelRegistered事件,这个事件最后将 init()中 的ServerBootstrapAcceptor 添加到了 server channel 的pipeline 中。 最后根据isActive()判断是否触发 channelActive事件。
isActive()在serve启动的时候 检查是否执行了bind(),在初始化客户端channel时检查 是否打开并连接。
在这里 channelActive事件不会触发,稍后bind()中会触发,再详讲。
ChannelRegistered事件的执行流程如下。
小结: register启动了reactor线程,提交了register任务,该任务 执行注册,初始化网络标识位为0,触发了ChannelRegistered,将 init()中 的ServerBootstrapAcceptor 添加到了 server channel 的pipeline 中,
OK initAndRegister 分析完毕
总结:initAndRegister 创建了NioServerSocketChannel,初始化了ServerBootstrapAcceptor,注册channel到parentGroup 的 EventLoop上 。
来看AbstractBootstrap-doBind()
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
final ChannelPromise promise;
//isDone()判断的是register是否执行完
//如果执行完成则同步执行doBind0,否则添加handler等待 触发。
//创建promise并返回,方法后续同步,异步,添加监听器等操作
if (regFuture.isDone()) {
promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
promise = new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE);
//这里添加了一个handler用于执行doBind0,具体调用以后分析。
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doBind0(regFuture, channel, localAddress, promise);
}
});
}
return promise;
}
//跟踪代码发现,调用了AbstractUnsafe的bind()方法
调用信息:AbstractChannel$AbstractUnsafe$2.<init>(AbstractChannel$AbstractUnsafe)
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
boolean wasActive = isActive();
try {
//由子类实现
doBind(localAddress);//代码A
} catch (Throwable t) {
promise.setFailure(t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
//最后调用HeadHandler.read,更新了 interestOps=SelectionKey.OP_ACCEPT。
pipeline.fireChannelActive();//只有绑定后才会执行
}
});
}
promise.setSuccess();
}
代码A调用NioServerSocketChannel-doBind(SocketAddress)
protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
很明显 调用了原生NIO API的bind。
分析得出服务端channel bind 流程图,如下。
在NioServerSocketChannel.doBind中 调用原生NIO API执行了bind(),然后触发了ChannelActive事件。
ChannelActive事件的执行流程如下
ChannelActive事件 触发后最主要的处理就是把网络标志位从0修改为SelectionKey.OP_ACCEPT或SelectionKey.OP_READ。当完成这步后,就可以接受客户端连接了。
小结:doBind调用原生NIO API执行了bind(),触发ChannelActive事件,把网络标志位从0修改为SelectionKey.OP_ACCEPT。
OK到此netry server启动部分就分析完了。
整理出netry server启动流程如下
总结:server启动整个流程, 先创建了NioServerSocketChannel,启动主reactor 也就是NioEventLoop,执行注册,初始化网络标识位为0,触发ChannelRegistered事件,添加ServerBootstrapAcceptor handler,然后执行bind(),调用原生NIO API执行了bind(),触发ChannelActive事件,把网络标志位从0修改为SelectionKey.OP_ACCEPT。