分享内容如下
ChannelPipeline和ChannelHandler要点概要ChannelPipeline功能分析DefaultChannelHandlerContext分析findContextOutbound,findContextInbound分析DefaultChannelHandlerInvoker无锁化分析HeadHandler和tailHandler分析ChannelPipeline提供类似AOP功能的责任链实现,用来管理ChannelHandler,按照handler顺序和是否是IO操作来顺序执行ChannelHandler.
用户定义的ChannelHandler可以终止传递,执行业务逻辑do nothing即可,如果要继续传递者需要调用相应方法。
责任链:可理解为 多个hander顺序执行,每个hander关心自要处理的部分业务即可。ChannelPipeline依赖DefaultChannelHandlerContext实现责任链。
DefaultChannelHandlerContext:handler执行的环境, 用于构建责任链的双向链表,无锁化支持,DefaultChannelHandlerInvoker为其提供无锁化支持。
上面的内容全部描述清晰比较困难,可以将 DefaultChannelHandlerContext,DefaultChannelHandlerInvoker理解为管理和执行 handler的工具类。ChannelHandler的执行顺序如下图。
IO操作从TailHandler向前传递,channel事件从HeadHandler向后传递。
TailHandler和HeadHandler是责任链中用于托底的Hander,在DefaultChannelPipeline构造方法中被创建,后面详细分析。
实现类:io.netty.channel.DefaultChannelPipeline
主要属性
final AbstractChannel channel;
final DefaultChannelHandlerContext head;
final DefaultChannelHandlerContext tail;
构造方法
public DefaultChannelPipeline(AbstractChannel channel) {
if (channel == null) {
throw new NullPointerException("channel");
}
this.channel = channel;
//由HeadHandler和TailHandler构建双向链表
TailHandler tailHandler = new TailHandler();
tail = new DefaultChannelHandlerContext(this, null, generateName(tailHandler), tailHandler);
HeadHandler headHandler = new HeadHandler(channel.unsafe());//注意此处传入的unsafe
head = new DefaultChannelHandlerContext(this, null, generateName(headHandler), headHandler);
head.next = tail;
tail.prev = head;
}
主要方法: fireXXX 和覆盖原生NIO方法bind(),connect(),flush().
addXXX:将invoker加入双向链表
public ChannelPipeline addFirst(ChannelHandlerInvoker invoker, final String name, ChannelHandler handler) {
synchronized (this) {
checkDuplicateName(name);
//构建链表辅助类DefaultChannelHandlerContext
DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, invoker, name, handler);
//将newCtx加入链表,放在heah后
addFirst0(name, newCtx);
}
return this;
}
//真正放入链表的方法
private void addFirst0(String name, DefaultChannelHandlerContext newCtx) {
checkMultiplicity(newCtx);
DefaultChannelHandlerContext nextCtx = head.next;
newCtx.prev = head;
newCtx.next = nextCtx;
head.next = newCtx;
nextCtx.prev = newCtx;
name2ctx.put(name, newCtx);
callHandlerAdded(newCtx);
}
addFirst0将Handler放在HeadHandler后面, addLast0将Handler放在TailHandler前面。
addBefore将Handler放在指定handler前,addAfter将Handler放在指定handler后。
channel事件fireXXX:从头到尾执行handler链表
@Override
public ChannelPipeline fireChannelRead(Object msg) {
head.fireChannelRead(msg);
return this;
}
@Override
public ChannelPipeline fireChannelActive() {
head.fireChannelActive();
if (channel.config().isAutoRead()) {
channel.read();
}
return this;
}
//IO操作:从尾到头执行handler链表
@Override
public ChannelFuture bind(SocketAddress localAddress) {
return tail.bind(localAddress);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress) {
return tail.connect(remoteAddress);
}
小结:ChannelPipeline构建了双向链表,实现了插入handler的方法,执行IO事件和channel事件的方法。
目前观察事件的执行都是找到一个实现对应处理的handler就执行,没有对事件继续传递?
事件传递在Unsafe里面实现。
ChannelPipeline依赖DefaultChannelHandlerContext实现,下面分析一下DefaultChannelHandlerContext
主要结构如下图
DefaultChannelHandlerInvoker 和ChannelHanler是DefaultChannelHandlerContext主要成员,
DefaultChannelHandlerContext的执行依赖DefaultChannelHandlerInvoker ,ChannelHanler在DefaultChannelHandlerInvoker 中执行
主要属性:
volatile DefaultChannelHandlerContext next;
volatile DefaultChannelHandlerContext prev;// next,prev用于构建双向链表
final ChannelHandlerInvoker invoker;
主要方法如下
事件传递方法,如:
public ChannelHandlerContext fireChannelRegistered() {
DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_REGISTERED);
next.invoker.invokeChannelRegistered(next);
return this;
}
IO操作方法,如:
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
//取出实现bind()的handller执行
DefaultChannelHandlerContext next = findContextOutbound(MASK_BIND);
next.invoker.invokeBind(next, localAddress, promise);
return promise;
}
以bind()执行为例。
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
// MASK_BIND=1024
DefaultChannelHandlerContext next = findContextOutbound(MASK_BIND);
next.invoker.invokeBind(next, localAddress, promise);
return promise;
}
// 下面(1)的逻辑不理解
private DefaultChannelHandlerContext findContextOutbound(int mask) {
DefaultChannelHandlerContext ctx = this;//从当前的ctx 开始查找
do {
// ctx是pipeline中的属性, ctx 组成了环形链表,这里ctx最后取得的是 包含HeadHanler的DefaultChannelHandlerContext
ctx = ctx.prev;
} while ((ctx.skipFlags & mask) != 0);//(1)
return ctx;
}
跟踪ctx.skipFlags
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, ChannelHandlerInvoker invoker, String name, ChannelHandler handler) {
//。。。省略代码
skipFlags = skipFlags(handler);
//。。。省略代码
}
//有handlerType对应的flag值则从缓存中取出,没有则获取并放入缓存中
private static int skipFlags(ChannelHandler handler) {
WeakHashMap<Class<?>, Integer> cache =
skipFlagsCache[(int) (Thread.currentThread().getId() % skipFlagsCache.length)];
Class<? extends ChannelHandler> handlerType = handler.getClass();
int flagsVal;
synchronized (cache) {
Integer flags = cache.get(handlerType);
if (flags != null) {
flagsVal = flags;
} else {
flagsVal = skipFlags0(handlerType);
cache.put(handlerType, Integer.valueOf(flagsVal));
}
}
return flagsVal;
}
skipFlags0 是真正获取skipFlags 的方法。
代码如下:
private static int skipFlags0(Class<? extends ChannelHandler> handlerType) {
int flags = 0;
try {
//如果ChannelHandler中handlerAdded方法有@Skip标识,则合并代表该方法的标准位值
if (handlerType.getMethod(
"handlerAdded", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {
flags |= MASK_HANDLER_ADDED;
}
//。。。。。代码省略。。。。
if (handlerType.getMethod(
"flush", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {
flags |= MASK_FLUSH;
}
}
} catch (Exception e) {
// Should never reach here.
PlatformDependent.throwException(e);
}
return flags;
}
自定义handler需要继承 ChannelHandlerAdapter,ChannelHandlerAdapter 的所有方法都有 @Skip注解。重写其中的方法就相当于remove了@Skip注解。
//每个handler中的方法都有不同的标识值,都是2的N次右移值,逐个判断是否含有@Skip标识,通过异或运算后就能获得包含每一个方法是否跳过的标识位值 flags。
(ctx.skipFlags & mask)!=0则该handle没重写过mask代表的方法,需要跳过,这就是(1)的作用
经过综合分析 flags是ChannelHandler 中方法是需要跳过的标志位组合值。 需要结合findContextOutbound()和findContextInbound()理解
通过上述分析 得出findContextOutbound的处理逻辑
给 findContextOutbound(传递不同的) mask值就能获得相应包含不需要跳过的mask对应方法的handler的context.
findContextOutbound 和findContextInbound对应 outbound和inbound ,在pipeline上的环形链表中获取的方向相反
inbound:用于处理触发的事件,如channelRead,channelActive, 链路建立,链路关闭,读事件,异常通知。由IO处理触发
outbound:用于执行IO处理,如bind(),read(), 连接,绑定,写事件,由用户触发
//flags=0的情况怎么处理?暂不分析 因为有HeadHandler的存在
下面列出 inbound,outbound方法。
调用findContextInbound(int)的方法:
fireChannelActive()
fireChannelInactive()
fireChannelRead(Object)
fireChannelReadComplete()
fireChannelRegistered()
fireChannelWritabilityChanged()
fireExceptionCaught(Throwable)
fireUserEventTriggered(Object)
调用findContextOutbound(int)的方法
bind(SocketAddress, ChannelPromise)
close(ChannelPromise)
connect(SocketAddress, SocketAddress, ChannelPromise)
disconnect(ChannelPromise)
flush()
read()
write(Object, ChannelPromise)
writeAndFlush(Object, ChannelPromise)
要注意 findContextOutbound() findContextInbound() 从当前hander位置查找
小结:DefaultChannelHandlerContext提供了构建链表,查找链表中指定事件并执行的功能。
DefaultChannelHandlerInvoker 根据情况同步执行或提交任务执行的功能
用比较专业的说法来讲解 就是提供了无锁化执行的功能。
看一下DefaultChannelHandlerInvoker-invokeBind()的代码
逻辑比较简单,就是取出ChannelHandlerContext的handler执行无锁化处理。
@Override
public void invokeChannelRegistered(final ChannelHandlerContext ctx) {
if (executor.inEventLoop()) {
invokeChannelRegisteredNow(ctx);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
invokeChannelRegisteredNow(ctx);//代码A
}
});
}
}
@Override
public void invokeBind(
final ChannelHandlerContext ctx, final SocketAddress localAddress, final ChannelPromise promise) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
validatePromise(ctx, promise, false);
if (executor.inEventLoop()) {
invokeBindNow(ctx, localAddress, promise);
} else {
safeExecuteOutbound(new Runnable() {
@Override
public void run() {
invokeBindNow(ctx, localAddress, promise);
}
}, promise);
}
}
代码A中executer的类型是NioEventLoop,executor.inEventLoop() 判断 NioEventLoop中记录的线程时否是当前线程,如果为false说明,该代码在用户提供的线程中执行,此时要进行无锁化处理。
把任务提交给NioEventLoop执行, NioEventLoop中run方法执行此任务, 分析过后,得知 该任务与IO事件串行执行。执行完IO事件,则执行提交任务。在NioEventLoop中会详细分析 循环select的代码。
其中多处有代码A的处理,即调用executor.execute();这样处理的原因?
初步判断这样处理,将并行处理变成了串行处理,实现了无锁化。
netty无锁化设计理念
大多数场景下,并发多线程处理可以提高系统的性能,但是,对共享资源的的使用不当,会带来严重的锁竞争,大量线程的的上下文切换也会消耗系统资源,最终导致性能下降。保证多个事件的执行顺序。netty本身是多线程处理,再增加线程,对提升性能帮助不大,Netty采用了串行无锁化设计,在IO线程内部进行串行操作,避免多线程竞争导致的性能下降。查询资料得知,在handler中的具体业务逻辑,如果耗时较长,会影响netty的吞吐量,有以下两种方式
在handler中异步处理,通常使用一个线程池。使用一个专门的EventExecutor来执行它(ChannelPipeline提供了带有EventExecutorGroup参数的addXXX()方法,该方法可以将传入的ChannelHandler绑定到你传入的EventExecutor之中),这样它就会在另一条线程中执行,与其他任务隔离.小结:由于网络通信场景的特殊性, Netty采用了局部串行无锁化设计来提高性能。需要结合生产场景来分析。
下面分析一下两个关键的handler-HeadHandler和tailHandler
//HeadHandler:覆盖了原生NIO的方法,如bind(),connect(),disconnect(),read(),write(),close(),flush()完全依赖Unsafe实现。
//Unsafe调用原生NIO API实现并触发事件。
static final class HeadHandler extends ChannelHandlerAdapter {
protected final Unsafe unsafe;
protected HeadHandler(Unsafe unsafe) {
this.unsafe = unsafe;
}
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
//使用原生ServerSocketChannel 执行bind(),并触发相应事件
unsafe.bind(localAddress, promise);
}
@Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
unsafe.disconnect(promise);
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
unsafe.close(promise);
}
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
unsafe.flush();
}
}
// A special catch-all handler that handles both bytes and messages.
//忽略所有事件,没看出有什么用
//覆盖了触发的事件处理方法
static final class TailHandler extends ChannelHandlerAdapter {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception { }
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { }
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { }
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { }
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { }
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.warn(
"An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
"It usually means the last handler in the pipeline did not handle the exception.", cause);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { }
}
这样实现HeadHandler和tailHandler的原因是什么?
用于托底,HeadHandler在没有用户自定义覆盖NIO的方法的handler情况下,提供基于Unsafe的处理,tailHandler在没有用户自定义覆盖IO事件方法的handler情况下,提供空处理。
outBound和inBound中的bound含义是什么
IO操作和channel事件 之间的界限
pipeline中fireXXX都是执行head.fireXXX,覆盖原生NIO多个方法都是执行tail.XXX() 不知原因是啥?
fireXXX 是InBound,IO操作方法是outBound.
个人猜测,演变而来,实际意义不大
小结:HeadHandler和tailHandler由于托底,提供默认实现。
总结:ChannelPipeline和ChannelHandler提供类似AOP功能的责任链实现,方便了用户在不同的IO操作触发后添加自己的业务逻辑。