Netty源码解析之Pipeline解析

mac2024-11-25  65

首先初始化Pipeline如下 创建channel会调用到如下方法 protected AbstractChannel(Channel parent) {     this.parent = parent;     id = newId();     unsafe = newUnsafe();     pipeline = newChannelPipeline(); }   跟踪newChannelPipeline()源码如下 // DefaultChannelPipeline.java protected DefaultChannelPipeline(Channel channel) {     this.channel = ObjectUtil.checkNotNull(channel, "channel");     succeededFuture = new SucceededChannelFuture(channel, null);     voidPromise = new VoidChannelPromise(channel, true);     // 可见初始化pipeline时,分别实例化了tail和head     tail = new TailContext(this);     head = new HeadContext(this);       head.next = tail;     tail.prev = head; }   跟踪代码可看见TailContext与HeadContext均继承了AbstractChannelHandlerContext(其中HeadContext比TailContext多实现了一个ChannelOutboundHandler 接口,其包含了一些连接、读写的方法声明) final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {       TailContext(DefaultChannelPipeline pipeline) {         super(pipeline, null, TAIL_NAME, true, false);         setAddComplete();     } ................... -- final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {       private final Unsafe unsafe;       HeadContext(DefaultChannelPipeline pipeline) {         super(pipeline, null, HEAD_NAME, false, true);         unsafe = pipeline.channel().unsafe();         setAddComplete();     } .................. 分析:AbstractChannelHandlerContext的方法结构如下: 由上可见定义注册、解绑、传播(fireChannelActive)、异常捕获、读写等事件。 总结:以上就是pipeline的简单初始化过程,下面会继续深入分析。
添加ChannelHandler 1 首先会调用addLast()方法添加 // DefaultChannelPipeLine.java public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {     final AbstractChannelHandlerContext newCtx;     // 这里看出添加channelHandler是同步阻塞的     synchronized (this) {         // 1.1 判断channelHandler是否可被重复添加         checkMultiplicity(handler);         // 1.2 创建节点并添加到链表         newCtx = newContext(group, filterName(name, handler), handler);           addLast0(newCtx);           if (!registered) {             newCtx.setAddPending();             callHandlerCallbackLater(newCtx, true);             return this;         }         // 1.3 添加用户回调事件 callHandlerAdded0(newCtx)         EventExecutor executor = newCtx.executor();         if (!executor.inEventLoop()) {             newCtx.setAddPending();             executor.execute(new Runnable() {                 @Override                 public void run() {                     callHandlerAdded0(newCtx);                 }             });             return this;         }     }     callHandlerAdded0(newCtx);     return this; }   1.1 是否可被重复复添加 解析 首先跟踪checkMultiplicity(handler)方法如下: // DefaultChannelPipeLine.java private static void checkMultiplicity(ChannelHandler handler) {     if (handler instanceof ChannelHandlerAdapter) {  // 判断当前实例类型必须是ChannelHandlerAdapter         ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;         if (!h.isSharable() && h.added) {  // 判断是否可被添加   (非共享 && 已被添加过)  就抛出异常             throw new ChannelPipelineException(h.getClass().getName() + " is not a @Sharable handler, so can't be added or removed multiple times.");         }         // 被添加过之后会设置标识为已被添加过         h.added = true;     } } 跟着上面的h.isSharable()看看什么类型可被重复添加 public boolean isSharable() {     // 主要逻辑:判断当前channelHandler是否有Sharable注解,如果有就表示可共享。可被重复添加,反之不行     Class<?> clazz = getClass();     Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();     Boolean sharable = cache.get(clazz);     if (sharable == null) {         sharable = clazz.isAnnotationPresent(Sharable.class);         cache.put(clazz, sharable);     }     return sharable; }   1.2 创建节点并添加到链表 首先创建一个channelHandlerContext类 newCtx = newContext(group, filterName(name, handler), handler); 其中filterName(name, handler)会遍历链表检查当前name是否重复,如果不重复返回(否则抛异常)。 然后通过addLast0(newCtx);添加到链表中,其过程如下: private void addLast0(AbstractChannelHandlerContext newCtx) {     AbstractChannelHandlerContext prev = tail.prev;     newCtx.prev = prev;     newCtx.next = tail;     prev.next = newCtx;     tail.prev = newCtx; } 其大致流程是把newCtx插入到尾结点tail的前面,流程图如下   1.3 回调添加用户完成事件 callHandlerAdded0(newCtx),回调之后移除掉 // DefaultChannelPipeline.java     ........................     ctx.handler().handlerAdded(ctx);     ctx.setAddComplete(); } catch (Throwable t) {     boolean removed = false;     try {         remove0(ctx); ........................      2 channelHandler的删除 channelHandler的删除与上面类型,直接从链表中移除即可,其下是源码跟踪 // DefaultChannelPipeline.java public final ChannelPipeline remove(ChannelHandler handler) {     remove(getContextOrDie(handler));     return this; } 删除的方法解析如下 private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {     assert ctx != head && ctx != tail;       synchronized (this) {         // 2.1 双向链表的方式直接删除,与上面添加类似         remove0(ctx);           if (!registered) {             callHandlerCallbackLater(ctx, false);             return ctx;         }           EventExecutor executor = ctx.executor();         if (!executor.inEventLoop()) {             executor.execute(new Runnable() {                 @Override                 public void run() {                     callHandlerRemoved0(ctx);                 }             });             return ctx;         }     }     // 2.2 执行删除成功后的事件回调     callHandlerRemoved0(ctx);     return ctx; }  
其他: channelInBound与channelOutBound区别见文章: https://blog.csdn.net/u010013573/article/details/85222110 InBound事件顺序执行(head->tail),outBound事件逆序执行(tail->head)   pipeline还有一个异常链,可通过上面的In/OutBound添加对应的异常处理。 异常链的传播顺序(tail->head)                                        
最新回复(0)