首先初始化Pipeline如下
创建channel会调用到如下方法
protected AbstractChannel(Channel parent) {
this.
parent = parent
;
id = newId()
;
unsafe = newUnsafe()
;
pipeline = newChannelPipeline();
}
跟踪newChannelPipeline()源码如下
// DefaultChannelPipeline.java
protected DefaultChannelPipeline(Channel channel) {
this.
channel = ObjectUtil.checkNotNull(channel
, "channel")
;
succeededFuture =
new SucceededChannelFuture(channel
, null)
;
voidPromise =
new VoidChannelPromise(channel
, true)
;
// 可见初始化pipeline时,分别实例化了tail和head
tail =
new TailContext(
this)
;
head =
new HeadContext(
this)
;
head.
next =
tail;
tail.
prev =
head;
}
跟踪代码可看见TailContext与HeadContext均继承了AbstractChannelHandlerContext(其中HeadContext比TailContext多实现了一个ChannelOutboundHandler 接口,其包含了一些连接、读写的方法声明)
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME
, true, false);
setAddComplete();
}
...................
--
final class HeadContext
extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
private final Unsafe
unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline
, null, HEAD_NAME
, false, true)
;
unsafe = pipeline.channel().unsafe()
;
setAddComplete()
;
}
..................
分析:AbstractChannelHandlerContext的方法结构如下:
由上可见定义注册、解绑、传播(fireChannelActive)、异常捕获、读写等事件。
总结:以上就是pipeline的简单初始化过程,下面会继续深入分析。
添加ChannelHandler
1 首先会调用addLast()方法添加
// DefaultChannelPipeLine.java
public final ChannelPipeline
addLast(EventExecutorGroup group
, String name
, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx
;
// 这里看出添加channelHandler是同步阻塞的
synchronized (
this) {
// 1.1 判断channelHandler是否可被重复添加
checkMultiplicity(handler)
;
// 1.2 创建节点并添加到链表
newCtx = newContext(group
, filterName(name
, handler)
, handler)
;
addLast0(newCtx)
;
if (!
registered) {
newCtx.setAddPending()
;
callHandlerCallbackLater(newCtx
, true)
;
return this;
}
// 1.3 添加用户回调事件 callHandlerAdded0(newCtx)
EventExecutor executor = newCtx.executor()
;
if (!executor.inEventLoop()) {
newCtx.setAddPending()
;
executor.execute(
new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
})
;
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
1.1 是否可被重复复添加 解析
首先跟踪checkMultiplicity(handler)方法如下:
// DefaultChannelPipeLine.java
private static void checkMultiplicity(ChannelHandler handler) {
if (handler instanceof ChannelHandlerAdapter) { // 判断当前实例类型必须是ChannelHandlerAdapter
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler
;
if (!h.isSharable() && h.added) { // 判断是否可被添加 (非共享 && 已被添加过) 就抛出异常
throw new ChannelPipelineException(
h.getClass().getName() +
" is not a @Sharable handler, so can't be added or removed multiple times.")
;
}
// 被添加过之后会设置标识为已被添加过
h.
added =
true;
}
}
跟着上面的h.isSharable()看看什么类型可被重复添加
public boolean isSharable() {
// 主要逻辑:判断当前channelHandler是否有Sharable注解,如果有就表示可共享。可被重复添加,反之不行
Class<?> clazz = getClass()
;
Map<Class<?>
, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache()
;
Boolean sharable = cache.get(clazz)
;
if (sharable ==
null) {
sharable = clazz.isAnnotationPresent(
Sharable.
class)
;
cache.put(clazz
, sharable)
;
}
return sharable
;
}
1.2 创建节点并添加到链表
首先创建一个channelHandlerContext类
newCtx = newContext(group, filterName(name, handler), handler);
其中filterName(name, handler)会遍历链表检查当前name是否重复,如果不重复返回(否则抛异常)。
然后通过addLast0(newCtx);添加到链表中,其过程如下:
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev =
tail.
prev;
newCtx.
prev = prev
;
newCtx.
next =
tail;
prev.
next = newCtx
;
tail.
prev = newCtx
;
}
其大致流程是把newCtx插入到尾结点tail的前面,流程图如下
1.3 回调添加用户完成事件 callHandlerAdded0(newCtx),回调之后移除掉
// DefaultChannelPipeline.java
........................
ctx.handler().handlerAdded(ctx)
;
ctx.setAddComplete()
;
}
catch (Throwable t) {
boolean removed =
false;
try {
remove0(ctx)
;
........................
2 channelHandler的删除
channelHandler的删除与上面类型,直接从链表中移除即可,其下是源码跟踪
// DefaultChannelPipeline.java
public final ChannelPipeline
remove(ChannelHandler handler) {
remove(getContextOrDie(handler))
;
return this;
}
删除的方法解析如下
private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
assert ctx != head && ctx != tail;
synchronized (this) {
// 2.1 双向链表的方式直接删除,与上面添加类似
remove0
(ctx);
if (!registered) {
callHandlerCallbackLater(ctx, false);
return ctx;
}
EventExecutor executor = ctx.executor();
if (!executor.inEventLoop()) {
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerRemoved0(ctx);
}
});
return ctx;
}
}
// 2.2 执行删除成功后的事件回调
callHandlerRemoved0(ctx);
return ctx;
}
其他:
channelInBound与channelOutBound区别见文章: https://blog.csdn.net/u010013573/article/details/85222110
InBound事件顺序执行(head->tail),outBound事件逆序执行(tail->head)
pipeline还有一个异常链,可通过上面的In/OutBound添加对应的异常处理。
异常链的传播顺序(tail->head)