Netty源码分析-PoolThreadCache

mac2025-05-26  31

 

 

final class PoolThreadCache { final PoolArena<byte[]> heapArena; final PoolArena<ByteBuffer> directArena; //分别存储堆外和堆内的内存,MemoryRegionCache内部结构是链表 private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches; //默认长度32 private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches; //默认长度4 private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches; //默认长度32 private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;//默认长度4 private final MemoryRegionCache<byte[]>[] normalHeapCaches; //默认长度3 private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches; //默认长度3 // Used for bitshifting when calculate the index of normal caches later //移位使用的偏移量,在计算normalCaches数组下标时使用 private final int numShiftsNormalDirect; //13 private final int numShiftsNormalHeap; //13 private final int freeSweepAllocationThreshold; //8192 private final AtomicBoolean freed = new AtomicBoolean(); private int allocations; PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena, int tinyCacheSize, int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity, int freeSweepAllocationThreshold) { if (maxCachedBufferCapacity < 0) { throw new IllegalArgumentException("maxCachedBufferCapacity: " + maxCachedBufferCapacity + " (expected: >= 0)"); } this.freeSweepAllocationThreshold = freeSweepAllocationThreshold; this.heapArena = heapArena; this.directArena = directArena; if (directArena != null) { tinySubPageDirectCaches = createSubPageCaches( tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny); smallSubPageDirectCaches = createSubPageCaches( smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small); numShiftsNormalDirect = log2(directArena.pageSize); normalDirectCaches = createNormalCaches( normalCacheSize, maxCachedBufferCapacity, directArena); directArena.numThreadCaches.getAndIncrement(); } else { // No directArea is configured so just null out all caches tinySubPageDirectCaches = null; smallSubPageDirectCaches = null; normalDirectCaches = null; numShiftsNormalDirect = -1; } if (heapArena != null) { // Create the caches for the heap allocations tinySubPageHeapCaches = createSubPageCaches( tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny); smallSubPageHeapCaches = createSubPageCaches( smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small); numShiftsNormalHeap = log2(heapArena.pageSize); normalHeapCaches = createNormalCaches( normalCacheSize, maxCachedBufferCapacity, heapArena); heapArena.numThreadCaches.getAndIncrement(); } else { // No heapArea is configured so just null out all caches tinySubPageHeapCaches = null; smallSubPageHeapCaches = null; normalHeapCaches = null; numShiftsNormalHeap = -1; } // Only check if there are caches in use. if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null || tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null) && freeSweepAllocationThreshold < 1) { throw new IllegalArgumentException("freeSweepAllocationThreshold: " + freeSweepAllocationThreshold + " (expected: > 0)"); } } //创建缓存数组,numCaches为数组长度,cacheSize为队列长度 private static <T> MemoryRegionCache<T>[] createSubPageCaches( int cacheSize, int numCaches, SizeClass sizeClass) { if (cacheSize > 0 && numCaches > 0) { //初始化数组 MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches]; for (int i = 0; i < cache.length; i++) { //数组元素赋值,SubPageMemoryRegionCache内部包含一个队列,存储PoolChunk //cacheSize=队列容量,sizeClass=为Tiny或Small cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass); } return cache; } else { return null; } } //cacheSize默认64 maxCachedBufferCapacity默认32768(8192*4) private static <T> MemoryRegionCache<T>[] createNormalCaches( int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) { if (cacheSize > 0 && maxCachedBufferCapacity > 0) { //max=32768 int max = Math.min(area.chunkSize, maxCachedBufferCapacity); //arraySize=3, log2(N),其实就是 N除以2的结果 int arraySize = Math.max(1, log2(max / area.pageSize) + 1); MemoryRegionCache<T>[] cache = new MemoryRegionCache[arraySize]; for (int i = 0; i < cache.length; i++) { //cacheSize=64默认,为队列的容量 cache[i] = new NormalMemoryRegionCache<T>(cacheSize); } return cache; } else { return null; } } //val除以2的结果 private static int log2(int val) { int res = 0; while (val > 1) { val >>= 1; res++; } return res; } boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) { return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity); } boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) { return allocate(cacheForSmall(area, normCapacity), buf, reqCapacity); } boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) { return allocate(cacheForNormal(area, normCapacity), buf, reqCapacity); } //分配缓存 private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) { if (cache == null) { // no cache found so just return false here return false; } //分配缓存 boolean allocated = cache.allocate(buf, reqCapacity); //分配次数>=8192 if (++ allocations >= freeSweepAllocationThreshold) { allocations = 0; //释放掉缓存 trim(); } return allocated; } //把PoolChunk根据大小不同,选择对应的MemoryRegionCache,然后加入到队列 boolean add(PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass) { MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass); if (cache == null) { return false; } return cache.add(chunk, handle); } //根据请求大小,选择对应的缓存对象 private MemoryRegionCache<?> cache(PoolArena<?> area, int normCapacity, SizeClass sizeClass) { switch (sizeClass) { case Normal: return cacheForNormal(area, normCapacity); case Small: return cacheForSmall(area, normCapacity); case Tiny: return cacheForTiny(area, normCapacity); default: throw new Error(); } } //当前Cache被垃圾回收时会被调用 @Override protected void finalize() throws Throwable { try { super.finalize(); } finally { free(); } } //释放所有资源 void free() { //释放所有资源,线程安全判断,只释放一次 if (freed.compareAndSet(false, true)) { int numFreed = free(tinySubPageDirectCaches) + free(smallSubPageDirectCaches) + free(normalDirectCaches) + free(tinySubPageHeapCaches) + free(smallSubPageHeapCaches) + free(normalHeapCaches); //directArena线程引用计数器减一 if (directArena != null) { directArena.numThreadCaches.getAndDecrement(); } //heapArena线程引用计数器减一 if (heapArena != null) { heapArena.numThreadCaches.getAndDecrement(); } } } //释放缓存数组 private static int free(MemoryRegionCache<?>[] caches) { if (caches == null) { return 0; } int numFreed = 0; for (MemoryRegionCache<?> c: caches) { numFreed += free(c); } return numFreed; } //把缓存内部队列中所有chunk释放掉 private static int free(MemoryRegionCache<?> cache) { if (cache == null) { return 0; } return cache.free(); } //把所有类型的缓存数组全部释放一遍,具体释放逻辑看其实现 void trim() { trim(tinySubPageDirectCaches); trim(smallSubPageDirectCaches); trim(normalDirectCaches); trim(tinySubPageHeapCaches); trim(smallSubPageHeapCaches); trim(normalHeapCaches); } //释放caches数组 private static void trim(MemoryRegionCache<?>[] caches) { if (caches == null) { return; } //循环每个cache元素释放 for (MemoryRegionCache<?> c: caches) { trim(c); } } private static void trim(MemoryRegionCache<?> cache) { if (cache == null) { return; } cache.trim(); } private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) { //normCapacity >>> 4; 根据normCapacity计算出tiny缓存数组下标 //数组长度32,每个元素对应不用的大小,从16个字节大小开始,每次增加16个字节。 int idx = PoolArena.tinyIdx(normCapacity); if (area.isDirect()) { return cache(tinySubPageDirectCaches, idx); } return cache(tinySubPageHeapCaches, idx); } private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int normCapacity) { //根据normCapacity计算small缓存数组下标,数组长度为4,[512,1024,2048,4096] int idx = PoolArena.smallIdx(normCapacity); if (area.isDirect()) { return cache(smallSubPageDirectCaches, idx); } return cache(smallSubPageHeapCaches, idx); } private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int normCapacity) { if (area.isDirect()) { //numShiftsNormalDirect默认为13,相当于除以8192 //这里如果idx不大于3的话,那么normCapacity最大为5*8192 int idx = log2(normCapacity >> numShiftsNormalDirect); return cache(normalDirectCaches, idx); } int idx = log2(normCapacity >> numShiftsNormalHeap); return cache(normalHeapCaches, idx); } private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) { //如果下标越界返回null if (cache == null || idx > cache.length - 1) { return null; } return cache[idx]; } //TINY or SMALL 大小的缓存,使用Subpage来分配 private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> { SubPageMemoryRegionCache(int size, SizeClass sizeClass) { super(size, sizeClass); } @Override protected void initBuf( PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity) { chunk.initBufWithSubpage(buf, handle, reqCapacity); } } //Normal大小的缓存,使用chunk.initBuf按整块分配 private static final class NormalMemoryRegionCache<T> extends MemoryRegionCache<T> { NormalMemoryRegionCache(int size) { super(size, SizeClass.Normal); } //按整块分配 @Override protected void initBuf( PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity) { chunk.initBuf(buf, handle, reqCapacity); } } //缓存元素,内部一个队列存储每个PoolChunk private abstract static class MemoryRegionCache<T> { private final int size; private final Queue<Entry<T>> queue; private final SizeClass sizeClass; private int allocations; MemoryRegionCache(int size, SizeClass sizeClass) { //以2的次方进行增长,比如传入3返回4,传入5返回8,传入9返回16。 this.size = MathUtil.safeFindNextPositivePowerOfTwo(size); //定长队列 queue = PlatformDependent.newFixedMpscQueue(this.size); //缓存类型 tiny small normal this.sizeClass = sizeClass; } //初始化PooledByteBuf,子类实现,根据tiny,small或Normal有不同的处理方式 protected abstract void initBuf(PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity); //把chunk加入队列,如果队列满了返回false public final boolean add(PoolChunk<T> chunk, long handle) { //创建Entry Entry<T> entry = newEntry(chunk, handle); //队列满了返回false boolean queued = queue.offer(entry); if (!queued) { //加入失败回收entry entry.recycle(); } return queued; } //分配内存 public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) { //获取并移除元素,元素不存在返回false Entry<T> entry = queue.poll(); if (entry == null) { return false; } //元素存在则一定分配成功 initBuf(entry.chunk, entry.handle, buf, reqCapacity); //回收entry entry.recycle(); //分配数量增加 ++ allocations; return true; } //把队列中所有元素释放 public final int free() { return free(Integer.MAX_VALUE); } //释放固定次数 private int free(int max) { int numFreed = 0; //释放固定次数,假设队列30个元素,如果max=10,那就把队列中前10个元素释放掉 for (; numFreed < max; numFreed++) { Entry<T> entry = queue.poll(); if (entry != null) { freeEntry(entry); } else { return numFreed; } } return numFreed; } //这个方法是用于优化目的,把不常用的缓存释放掉 //allocations这个值是一段时间当前缓存被使用的次数 public final void trim() { int free = size - allocations; allocations = 0; if (free > 0) { free(free); } } //真正释放缓存 private void freeEntry(Entry entry) { PoolChunk chunk = entry.chunk; long handle = entry.handle; //entry对象是池化的,回收它。 entry.recycle(); //真正释放,堆内被GC,堆外使用UNSAFE释放内存空间 chunk.arena.freeChunk(chunk, handle, sizeClass); } //Entry 一个池化对象,可以重复利用,内部存储PoolChunk和handle static final class Entry<T> { final Handle<Entry<?>> recyclerHandle; PoolChunk<T> chunk; long handle = -1; Entry(Handle<Entry<?>> recyclerHandle) { this.recyclerHandle = recyclerHandle; } void recycle() { chunk = null; handle = -1; recyclerHandle.recycle(this); } } //从池中获取Entry private static Entry newEntry(PoolChunk<?> chunk, long handle) { Entry entry = RECYCLER.get(); entry.chunk = chunk; entry.handle = handle; return entry; } private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() { @SuppressWarnings("unchecked") @Override protected Entry newObject(Handle<Entry> handle) { return new Entry(handle); } }; } }

 

最新回复(0)