netty源码分析4-ChannelPipeline和ChannelHandler

mac2024-05-25  51

分享内容如下

ChannelPipeline和ChannelHandler要点概要ChannelPipeline功能分析DefaultChannelHandlerContext分析findContextOutbound,findContextInbound分析DefaultChannelHandlerInvoker无锁化分析HeadHandler和tailHandler分析

ChannelPipeline和ChannelHandler要点概要

ChannelPipeline提供类似AOP功能的责任链实现,用来管理ChannelHandler,按照handler顺序和是否是IO操作来顺序执行ChannelHandler.

用户定义的ChannelHandler可以终止传递,执行业务逻辑do nothing即可,如果要继续传递者需要调用相应方法。

责任链:可理解为 多个hander顺序执行,每个hander关心自要处理的部分业务即可。ChannelPipeline依赖DefaultChannelHandlerContext实现责任链。

DefaultChannelHandlerContext:handler执行的环境, 用于构建责任链的双向链表,无锁化支持,DefaultChannelHandlerInvoker为其提供无锁化支持。

上面的内容全部描述清晰比较困难,可以将 DefaultChannelHandlerContext,DefaultChannelHandlerInvoker理解为管理和执行 handler的工具类。ChannelHandler的执行顺序如下图。

 

IO操作从TailHandler向前传递,channel事件从HeadHandler向后传递。

TailHandler和HeadHandler是责任链中用于托底的Hander,在DefaultChannelPipeline构造方法中被创建,后面详细分析。

 

ChannelPipeline功能分析

实现类:io.netty.channel.DefaultChannelPipeline

 

主要属性

final AbstractChannel channel;

final DefaultChannelHandlerContext head;

final DefaultChannelHandlerContext tail;

构造方法

public DefaultChannelPipeline(AbstractChannel channel) {

if (channel == null) {

throw new NullPointerException("channel");

}

this.channel = channel;

//由HeadHandler和TailHandler构建双向链表

TailHandler tailHandler = new TailHandler();

tail = new DefaultChannelHandlerContext(this, null, generateName(tailHandler), tailHandler);

 

HeadHandler headHandler = new HeadHandler(channel.unsafe());//注意此处传入的unsafe

head = new DefaultChannelHandlerContext(this, null, generateName(headHandler), headHandler);

head.next = tail;

tail.prev = head;

}

主要方法: fireXXX 和覆盖原生NIO方法bind(),connect(),flush().

addXXX:将invoker加入双向链表

public ChannelPipeline addFirst(ChannelHandlerInvoker invoker, final String name, ChannelHandler handler) {

synchronized (this) {

checkDuplicateName(name);

//构建链表辅助类DefaultChannelHandlerContext

DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, invoker, name, handler);

//将newCtx加入链表,放在heah后

addFirst0(name, newCtx);

}

return this;

}

//真正放入链表的方法

private void addFirst0(String name, DefaultChannelHandlerContext newCtx) {

checkMultiplicity(newCtx);

DefaultChannelHandlerContext nextCtx = head.next;

newCtx.prev = head;

newCtx.next = nextCtx;

head.next = newCtx;

nextCtx.prev = newCtx;

name2ctx.put(name, newCtx);

callHandlerAdded(newCtx);

}

addFirst0将Handler放在HeadHandler后面, addLast0将Handler放在TailHandler前面。

addBefore将Handler放在指定handler前,addAfter将Handler放在指定handler后。

channel事件fireXXX:从头到尾执行handler链表

@Override

public ChannelPipeline fireChannelRead(Object msg) {

head.fireChannelRead(msg);

return this;

}

@Override

public ChannelPipeline fireChannelActive() {

head.fireChannelActive();

if (channel.config().isAutoRead()) {

channel.read();

}

 

return this;

}

//IO操作:从尾到头执行handler链表

@Override

public ChannelFuture bind(SocketAddress localAddress) {

return tail.bind(localAddress);

}

@Override

public ChannelFuture connect(SocketAddress remoteAddress) {

return tail.connect(remoteAddress);

}

小结:ChannelPipeline构建了双向链表,实现了插入handler的方法,执行IO事件和channel事件的方法。

 

DefaultChannelHandlerContext分析

目前观察事件的执行都是找到一个实现对应处理的handler就执行,没有对事件继续传递?

事件传递在Unsafe里面实现。

 

ChannelPipeline依赖DefaultChannelHandlerContext实现,下面分析一下DefaultChannelHandlerContext

主要结构如下图

DefaultChannelHandlerInvoker 和ChannelHanler是DefaultChannelHandlerContext主要成员,

DefaultChannelHandlerContext的执行依赖DefaultChannelHandlerInvoker ,ChannelHanler在DefaultChannelHandlerInvoker 中执行

 

主要属性:

volatile DefaultChannelHandlerContext next;

volatile DefaultChannelHandlerContext prev;// next,prev用于构建双向链表

final ChannelHandlerInvoker invoker;

主要方法如下

事件传递方法,如:

public ChannelHandlerContext fireChannelRegistered() {

DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_REGISTERED);

next.invoker.invokeChannelRegistered(next);

return this;

}

IO操作方法,如:

public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {

//取出实现bind()的handller执行

DefaultChannelHandlerContext next = findContextOutbound(MASK_BIND);

next.invoker.invokeBind(next, localAddress, promise);

return promise;

}

findContextOutbound,findContextInbound分析

以bind()执行为例。

public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {

// MASK_BIND=1024

DefaultChannelHandlerContext next = findContextOutbound(MASK_BIND);

next.invoker.invokeBind(next, localAddress, promise);

return promise;

}

// 下面(1)的逻辑不理解

private DefaultChannelHandlerContext findContextOutbound(int mask) {

DefaultChannelHandlerContext ctx = this;//从当前的ctx 开始查找

do {

// ctx是pipeline中的属性, ctx 组成了环形链表,这里ctx最后取得的是 包含HeadHanler的DefaultChannelHandlerContext

ctx = ctx.prev;

} while ((ctx.skipFlags & mask) != 0);//(1)

return ctx;

}

 

跟踪ctx.skipFlags

DefaultChannelHandlerContext(

DefaultChannelPipeline pipeline, ChannelHandlerInvoker invoker, String name, ChannelHandler handler) {

//。。。省略代码

skipFlags = skipFlags(handler);

//。。。省略代码

}

//有handlerType对应的flag值则从缓存中取出,没有则获取并放入缓存中

private static int skipFlags(ChannelHandler handler) {

WeakHashMap<Class<?>, Integer> cache =

skipFlagsCache[(int) (Thread.currentThread().getId() % skipFlagsCache.length)];

Class<? extends ChannelHandler> handlerType = handler.getClass();

int flagsVal;

synchronized (cache) {

Integer flags = cache.get(handlerType);

if (flags != null) {

flagsVal = flags;

} else {

flagsVal = skipFlags0(handlerType);

cache.put(handlerType, Integer.valueOf(flagsVal));

}

}

return flagsVal;

}

 

skipFlags0 是真正获取skipFlags 的方法。

代码如下:

private static int skipFlags0(Class<? extends ChannelHandler> handlerType) {

int flags = 0;

try {

//如果ChannelHandler中handlerAdded方法有@Skip标识,则合并代表该方法的标准位值

if (handlerType.getMethod(

"handlerAdded", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {

flags |= MASK_HANDLER_ADDED;

}

//。。。。。代码省略。。。。

if (handlerType.getMethod(

"flush", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {

flags |= MASK_FLUSH;

}

}

} catch (Exception e) {

// Should never reach here.

PlatformDependent.throwException(e);

}

 

return flags;

}

自定义handler需要继承 ChannelHandlerAdapter,ChannelHandlerAdapter 的所有方法都有 @Skip注解。重写其中的方法就相当于remove了@Skip注解。

//每个handler中的方法都有不同的标识值,都是2的N次右移值,逐个判断是否含有@Skip标识,通过异或运算后就能获得包含每一个方法是否跳过的标识位值 flags。

(ctx.skipFlags & mask)!=0则该handle没重写过mask代表的方法,需要跳过,这就是(1)的作用

经过综合分析 flags是ChannelHandler 中方法是需要跳过的标志位组合值。 需要结合findContextOutbound()和findContextInbound()理解

 

通过上述分析 得出findContextOutbound的处理逻辑

给 findContextOutbound(传递不同的) mask值就能获得相应包含不需要跳过的mask对应方法的handler的context.

 findContextOutbound 和findContextInbound对应 outbound和inbound ,在pipeline上的环形链表中获取的方向相反

inbound:用于处理触发的事件,如channelRead,channelActive, 链路建立,链路关闭,读事件,异常通知。由IO处理触发

outbound:用于执行IO处理,如bind(),read(), 连接,绑定,写事件,由用户触发

//flags=0的情况怎么处理?暂不分析 因为有HeadHandler的存在

下面列出 inbound,outbound方法。

调用findContextInbound(int)的方法:

fireChannelActive()

fireChannelInactive()

fireChannelRead(Object)

fireChannelReadComplete()

fireChannelRegistered()

fireChannelWritabilityChanged()

fireExceptionCaught(Throwable)

fireUserEventTriggered(Object)

 

调用findContextOutbound(int)的方法

bind(SocketAddress, ChannelPromise)

close(ChannelPromise)

connect(SocketAddress, SocketAddress, ChannelPromise)

disconnect(ChannelPromise)

flush()

read()

write(Object, ChannelPromise)

writeAndFlush(Object, ChannelPromise)

要注意 findContextOutbound() findContextInbound() 从当前hander位置查找

小结:DefaultChannelHandlerContext提供了构建链表,查找链表中指定事件并执行的功能。

 

DefaultChannelHandlerInvoker 根据情况同步执行或提交任务执行的功能

用比较专业的说法来讲解 就是提供了无锁化执行的功能。

DefaultChannelHandlerInvoker无锁化分析

看一下DefaultChannelHandlerInvoker-invokeBind()的代码

逻辑比较简单,就是取出ChannelHandlerContext的handler执行无锁化处理。

@Override

public void invokeChannelRegistered(final ChannelHandlerContext ctx) {

if (executor.inEventLoop()) {

invokeChannelRegisteredNow(ctx);

} else {

 

executor.execute(new Runnable() {

@Override

public void run() {

invokeChannelRegisteredNow(ctx);//代码A

}

});

}

}

 

@Override

public void invokeBind(

final ChannelHandlerContext ctx, final SocketAddress localAddress, final ChannelPromise promise) {

if (localAddress == null) {

throw new NullPointerException("localAddress");

}

validatePromise(ctx, promise, false);

 

if (executor.inEventLoop()) {

invokeBindNow(ctx, localAddress, promise);

} else {

safeExecuteOutbound(new Runnable() {

@Override

public void run() {

invokeBindNow(ctx, localAddress, promise);

}

}, promise);

}

}

代码A中executer的类型是NioEventLoop,executor.inEventLoop() 判断 NioEventLoop中记录的线程时否是当前线程,如果为false说明,该代码在用户提供的线程中执行,此时要进行无锁化处理。

把任务提交给NioEventLoop执行, NioEventLoop中run方法执行此任务, 分析过后,得知 该任务与IO事件串行执行。执行完IO事件,则执行提交任务。在NioEventLoop中会详细分析 循环select的代码。

其中多处有代码A的处理,即调用executor.execute();这样处理的原因?

初步判断这样处理,将并行处理变成了串行处理,实现了无锁化。

netty无锁化设计理念

大多数场景下,并发多线程处理可以提高系统的性能,但是,对共享资源的的使用不当,会带来严重的锁竞争,大量线程的的上下文切换也会消耗系统资源,最终导致性能下降。保证多个事件的执行顺序。netty本身是多线程处理,再增加线程,对提升性能帮助不大,Netty采用了串行无锁化设计,在IO线程内部进行串行操作,避免多线程竞争导致的性能下降。

查询资料得知,在handler中的具体业务逻辑,如果耗时较长,会影响netty的吞吐量,有以下两种方式

在handler中异步处理,通常使用一个线程池。使用一个专门的EventExecutor来执行它(ChannelPipeline提供了带有EventExecutorGroup参数的addXXX()方法,该方法可以将传入的ChannelHandler绑定到你传入的EventExecutor之中),这样它就会在另一条线程中执行,与其他任务隔离.

小结:由于网络通信场景的特殊性, Netty采用了局部串行无锁化设计来提高性能。需要结合生产场景来分析。

HeadHandler和tailHandler分析

下面分析一下两个关键的handler-HeadHandler和tailHandler

//HeadHandler:覆盖了原生NIO的方法,如bind(),connect(),disconnect(),read(),write(),close(),flush()完全依赖Unsafe实现。

//Unsafe调用原生NIO API实现并触发事件。

static final class HeadHandler extends ChannelHandlerAdapter {

protected final Unsafe unsafe;

protected HeadHandler(Unsafe unsafe) {

this.unsafe = unsafe;

}

@Override

public void bind(

ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)

throws Exception {

//使用原生ServerSocketChannel 执行bind(),并触发相应事件

unsafe.bind(localAddress, promise);

}

@Override

public void connect(

ChannelHandlerContext ctx,

SocketAddress remoteAddress, SocketAddress localAddress,

ChannelPromise promise) throws Exception {

unsafe.connect(remoteAddress, localAddress, promise);

}

 

@Override

public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {

unsafe.disconnect(promise);

}

 

@Override

public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {

unsafe.close(promise);

}

 

@Override

public void read(ChannelHandlerContext ctx) {

unsafe.beginRead();

}

 

@Override

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {

unsafe.write(msg, promise);

}

 

@Override

public void flush(ChannelHandlerContext ctx) throws Exception {

unsafe.flush();

}

}

 

 

// A special catch-all handler that handles both bytes and messages.

//忽略所有事件,没看出有什么用

//覆盖了触发的事件处理方法

static final class TailHandler extends ChannelHandlerAdapter {

 

@Override

public void channelRegistered(ChannelHandlerContext ctx) throws Exception { }

 

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception { }

 

@Override

public void channelInactive(ChannelHandlerContext ctx) throws Exception { }

 

@Override

public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { }

 

@Override

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { }

 

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

logger.warn(

"An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +

"It usually means the last handler in the pipeline did not handle the exception.", cause);

}

 

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

try {

logger.debug(

"Discarded inbound message {} that reached at the tail of the pipeline. " +

"Please check your pipeline configuration.", msg);

} finally {

ReferenceCountUtil.release(msg);

}

}

 

@Override

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { }

}

这样实现HeadHandler和tailHandler的原因是什么?

用于托底,HeadHandler在没有用户自定义覆盖NIO的方法的handler情况下,提供基于Unsafe的处理,tailHandler在没有用户自定义覆盖IO事件方法的handler情况下,提供空处理。

 

outBound和inBound中的bound含义是什么

IO操作和channel事件 之间的界限

pipeline中fireXXX都是执行head.fireXXX,覆盖原生NIO多个方法都是执行tail.XXX() 不知原因是啥?

fireXXX 是InBound,IO操作方法是outBound.

个人猜测,演变而来,实际意义不大

小结:HeadHandler和tailHandler由于托底,提供默认实现。

总结:ChannelPipeline和ChannelHandler提供类似AOP功能的责任链实现,方便了用户在不同的IO操作触发后添加自己的业务逻辑。

 

最新回复(0)