0%

Netty源码阅读笔记(五)ByteBuf

概述

在对多路IO复用进行实现的时候,ByteBuf这个结构是必不可少的,尽管NIO中也实现了ByteBuf,但使用起来并不是特别方便,而且有可以优化的地方。 因此,Netty在ByteBuf这一块完全是自己的一套方案,不仅使用起来方便,有着丰富的API,而且性能也是非常的出色,本篇将对ByteBuf这一组件进行学习。

ByteBuf结构

1
2
3
4
5
6
/*     +-------------------+------------------+------------------+
* | discardable bytes | readable bytes | writable bytes |
* +-------------------+------------------+------------------+
* | | | |
* 0 <= readerIndex <= writerIndex <= capacity
*/

源码的注释很形象,这里就直接拿来用了,ByteBuf分为下面三个区间:

  1. [0, readerIndex) 这个区间是已经读过的数据,
  2. [readerIndex, writerIndex) 这个区间是可读数据区间
  3. [writeIndex, capacity] 这段区间是可写区间
  4. 还有一个变量maxCapacity, 即如果capacity不够写了,则可以扩容,最大不超过maxCapacity,默认Integer.MAX_VALUE

ByteBuf的高频API,readXxx, writeXxx, markXxx, resetXxx… 限于篇幅这里就不过多的记录用法了…

ByteBuf的分类

bytebuf

上面的继承关系图中间删掉了几层关系,这样看起来比较方便理解(好像也没方便到哪去…名字一个个的都那么长

从三个角度对ByeBuf分类

下面从三个角度来对ByteBuf进行分类:

  1. Pooled/Unpooled(是否池化)
  2. Unsafe(是否使用Unsafe来进行数据读写)
  3. Heap/Direct(是堆中内存还是直接内存)

上面各种ByteBuf的命名 = (1) + (2) + (3) + “ByteBuf”
例如 UnpooledUnsafeHeapByeBuf , 这样看就好记多了!

1. 是否池化

池化的目的就是为了减少资源分配的开销,netty使用串行无锁化的方法解决了池化分配资源时的并发问题,在稍后会详细介绍。

2. 是否使用Unsafe

在创建ByteBuf的时候,会判断当前JDK是否提供了Unsafe支持,如果支持的化,就优先创建基于Unsafe的ByteBuf,若没有的话,则创建不带Unsafe的ByteBuf.

1
2
3
4
5
@Override
protected PooledByteBuf<byte[]> newByteBuf(int maxCapacity) {
return HAS_UNSAFE ? PooledUnsafeHeapByteBuf.newUnsafeInstance(maxCapacity)
: PooledHeapByteBuf.newInstance(maxCapacity);
}

Unsafe主要是用于对读写操作进行实现。

如果Unsafe可用,则使用Unsafe来实现ByteBuf的读写操作(不管是Heap还是Direct都是能用Unsafe则都用Unsafe):

1
2
3
4
5
6
7
8
9
// 直接使用 首地址 + 偏移量 来访问byte数组中的数据
static byte getByte(byte[] data, int index) {
return UNSAFE.getByte(data, BYTE_ARRAY_BASE_OFFSET + index);
}
static void putByte(byte[] data, int index, byte value) {
UNSAFE.putByte(data, BYTE_ARRAY_BASE_OFFSET + index, value);
}
。。。
。。。

使用Unsafe的好处就是:因为是直接访问的内存,所以速度更快。

当然,如果当前JDK不支持Unsafe的话,就直接是通过JDK的方式来访问了:

不支持Unsafe的情况

  1. 对Heap的访问:

    1
    2
    3
    static byte getByte(byte[] memory, int index) {
    return memory[index];
    }
  2. 对Direct的访问:

    1
    2
    3
    4
    5
    ByteBuffer buffer; // JDK中实现的对直接内存的访问
    @Override
    protected byte _getByte(int index) {
    return buffer.get(index);
    }

ByteBuf和AbstractByteBuf

在从三个角度了解完ByeBuf之后,再回过头来看它的一个层次实现(逻辑很清晰)

ByteBuf定义了ByteBuf需要提供的API提供了顶层的接口规范。

而AbstractByteBuf则定义了实现ByteBuf所需要的变量,实现了ByteBuf的骨架,并用模版模式主要让子类去实现newHeapBuffer(...)newDirectBuffer(...) , 对于这两种方法,提供了Pool和Pooled两种不同的实现:
newHeapBuf
newHeapBuf

最后,根据是否存在Unsafe来创建ByteBuf

1
2
3
4
5
@Override
protected PooledByteBuf<byte[]> newByteBuf(int maxCapacity) {
return HAS_UNSAFE ? PooledUnsafeHeapByteBuf.newUnsafeInstance(maxCapacity)
: PooledHeapByteBuf.newInstance(maxCapacity);
}

ByteBuf分配器——ByteBufAllocator分析

在netty中ByteBufAllocator提供了创建ByteBuf的手段,而ByteBufAllocator主要在Unpooled/Pooled的维度上有着非常大的区别,因此,下面也是从是否池化的角度来分析ByteBufAllocator.

非池化分配——UnPooledByteBufAllocator分析

比起池化的分配来说,非池化的分配比较简单。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public final class UnpooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider {

...
...

@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
return PlatformDependent.hasUnsafe() ?
new InstrumentedUnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
new InstrumentedUnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}

@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
final ByteBuf buf;
if (PlatformDependent.hasUnsafe()) {
buf = noCleaner ? new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) :
//大多情况:
new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
}
}

可以看到,非池化的Bytebuf分配情况是基于Unsafe来直接new一个ByteBuf, 而对于Heap和Direct类型的ByteBuf,其底层维护的一个是Byte数组,一个是JDK实现的ByteBuf(用于访问直接内存)

1
2
3
4
// Heap的方式
protected byte[] allocateArray(int initialCapacity) {
return new byte[initialCapacity];
}
1
2
3
4
5
// Direct的方式
protected ByteBuffer allocateDirect(int initialCapacity) {
//此处调用JDK的allocateDirect来分配堆外内存
return ByteBuffer.allocateDirect(initialCapacity);
}

池化分配——PooledByteBufAllocator分析

池化分配器要比非池化的分配方式复杂的多,在Netty中,池化资源的分配宏观上分为两个部分:

  1. 全局池资源(由PooledArena进行管理)
  2. 线程私有的“池资源缓存”(池资源缓存在线程的ThreadLocal中)

前者的Arena通过sychronized来保证了线程安全,后者则天然保证了线程安全.

池资源的分配单位

这里的池资源主要就是一块空闲可用的内存区域, 因为不管是全局池资源也好,还是线程私有的池资源也好,它们都是公用同一套存储结构的。

unit

对于池资源的分配单位,有以下几个:

  1. Chunk: 大小为16M, 会以多个ChunkList的形式保存在Arena里, 后面会说.
  2. Page: 大小为8K, 一个Chunk由2048(16M/8K)个Page组成。
  3. SubPage: 大小由table[idx]决定, 即由ByteBuf的size类型来决定,Tiny对应的table为[N*16B], Small对应的table为[512B, 1K, 2K, 4K]。 一个Page由多个SubPage组成。

待分配资源大小的类型

netty同时也对待分配资源的大小做了一个分类:

  1. Tiny: 本次要分配的资源大小 <= 512B
  2. Small:本次要分配的资源大小 <= 8K(一个Page的大小)
  3. Normal:本次要分配的资源大小 <= 16M(一个Chunk的大小)
  4. Huge: 本次要分配的资源大小 > 16M

线程私有的“池资源缓存”

线程私有“池资源缓存”顾名思义,当我们需要从池子中申请一个ByteBuf的时候,会先去这个池资源缓存中查看是否有合适的资源能够分配,毕竟这一块是线程私有的,不会涉及到并发安全问题;同时,当我们使用完一个ByteBuf并想要对其进行资源释放的时候,也是优先考虑看能不能将其释放到这个线程私有的缓存中,如果可以的话则优先考虑采用这种方案。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class PooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider {
....
....
private final PoolArena<byte[]>[] heapArenas;
private final PoolArena<ByteBuffer>[] directArenas;
private final int tinyCacheSize;
private final int smallCacheSize;
private final int normalCacheSize;
private final List<PoolArenaMetric> heapArenaMetrics;
private final List<PoolArenaMetric> directArenaMetrics;
private final PoolThreadLocalCache threadCache;
private final int chunkSize;

public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
boolean useCacheForAllThreads, int directMemoryCacheAlignment) {
super(preferDirect);
// 初始化池子资源
threadCache = new PoolThreadLocalCache(useCacheForAllThreads);
// 设置每种类型大小的page的队列长度
this.tinyCacheSize = tinyCacheSize;
this.smallCacheSize = smallCacheSize;
this.normalCacheSize = normalCacheSize;
chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);

// other code ...

int pageShifts = validateAndCalculatePageShifts(pageSize);

if (nHeapArena > 0) {
heapArenas = newArenaArray(nHeapArena);
// other code ...
} else {
// other code ...
}
if (nDirectArena > 0) {
directArenas = newArenaArray(nDirectArena);
// other code ...
} else {
// other code ...
}
// other code ...
}
}

这里所说的线程私有池资源的抽象在源码中就是PoolThreadCache,它要通过PoolThreadLocalCache来获取, 从也名字上就能看出,PoolThreadLocalCache这是一个FastThreadLocal类型的池资源,FastThreadLocal是Netty自己实现的线程封闭类,功能于ThreadLocal一样,性能更好。

下面先来看一下PoolThreadCachePoolThreadLocalCache

PoolThreadLocalCache

PoolThreadLocalCache是一个ThreadLocal,通过它来获取PoolThreadCache。

1
2
3
4
5
6
7
8
9
10
11
12
13
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
...
@Override
protected synchronized PoolThreadCache initialValue() {
final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);

// other code ...

// No caching so just use 0 as sizes.
return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
}
}

PoolThreadCache

PoolThreadCache是对池资源的抽象

在私有池资源缓存中进行资源分配

netty中将分配单位划分了3个等级

1
2
3
4
5
enum SizeClass {
Tiny,
Small,
Normal
}
  1. 如果一次申请的资源大小在(0, 512B] 那么就从Tiny池中为其分配资源(单位为SubPage)
  2. 如果一次申请的资源大小在(512B, 8K], 那么就从Small池中为其分配资源 (单位为SubPage)
  3. 如果一次申请的资源大小在[8K, 16M], 那么就从Nomarl池中为其分配资源(单位为Page)

上面提到的Tiny池,Small池,Nomarl池,在netty中被抽象成了一组MemoryRegionCache,而MemoryRegionCache的结构为下图所示:

MemoryRegionCache

这里解释一下上面的size变量,sizeClass只是规定了一个分配种类的区间,而对于每一个分配种类其分配单位也不同(Tiny对应SubPage,Small对应Page,Nomarl对应Chunk),而对于相同的分配单位,其大小也有所不同,Tiny:[ N16B ], Small:[512B, 1K, 2K, 4K], Normal:[8K, 16K, 32K],*size可以取不同的值,但每一个队列中的size都是相同的,例如对于size=16B来说,队列中的每一个空闲Entry的大小都是16B**

因此,这里以Tiny池为例,真正的样子是下面这个样子(这也解释了上面为什么说是一组MemoryRegionCache了)

tinyCache

对于Small池和Normal池也是类似

例子:

  • 若要从线程私有池资源缓存中取一个大小为17B的ByteBuf:
    1. 17B < 512B 是Tiny
    2. 17B向上”取整”到32B,于是到size为32B的MemoryRegionCache的Queue中取一个空闲的Region予以分配.

PoolThreadCache源码

在了解完上面的存储结构之后,再来看源码会好很多

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
final class PoolThreadCache {
...
...
// Hold the caches for the different size classes, which are tiny, small and normal.
private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
private final MemoryRegionCache<byte[]>[] normalHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;

PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
this.heapArena = heapArena;
this.directArena = directArena;
if (directArena != null) {
// 在这里createSubPageCaches就是创建上面所说的Tiny池
tinySubPageDirectCaches = createSubPageCaches(
tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
// 创建Small池
smallSubPageDirectCaches = createSubPageCaches(
smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);

numShiftsNormalDirect = log2(directArena.pageSize);
// 创建Normal池
normalDirectCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, directArena);
directArena.numThreadCaches.getAndIncrement();
} else {
// No directArea is configured so just null out all caches
tinySubPageDirectCaches = null;
smallSubPageDirectCaches = null;
normalDirectCaches = null;
numShiftsNormalDirect = -1;
}

// Heap的情况和Direct的情况极为相似,这里不作过多阐述
if (heapArena != null) {
// Create the caches for the heap allocations
tinySubPageHeapCaches = createSubPageCaches(
tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
smallSubPageHeapCaches = createSubPageCaches(
smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);

numShiftsNormalHeap = log2(heapArena.pageSize);
normalHeapCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, heapArena);

heapArena.numThreadCaches.getAndIncrement();
} else {
// No heapArea is configured so just null out all caches
tinySubPageHeapCaches = null;
smallSubPageHeapCaches = null;
normalHeapCaches = null;
numShiftsNormalHeap = -1;
}
// other code ...
}
}

下面以createSubPageCaches为例,跟踪一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private static <T> MemoryRegionCache<T>[] createSubPageCaches(
int cacheSize, int numCaches, SizeClass sizeClass) {
if (cacheSize > 0 && numCaches > 0) {
// 创建MemoryRegionCache数组!! 就是上图画的那个
@SuppressWarnings("unchecked")
MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
for (int i = 0; i < cache.length; i++) {
// 对于每一种大小的size都创建一个对应的MemoryRegionCache
cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
}
return cache;
} else {
return null;
}
}

最后再来看一下MemoryRegionCache源码

1
2
3
4
5
6
7
8
9
10
11
12
private abstract static class MemoryRegionCache<T> {
private final int size;
private final Queue<Entry<T>> queue;
private final SizeClass sizeClass;
private int allocations;

MemoryRegionCache(int size, SizeClass sizeClass) {
this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
queue = PlatformDependent.newFixedMpscQueue(this.size);
this.sizeClass = sizeClass;
}
}

到此位置就了解了ByteBuf在池化情况下的存储结构,下面是它的分配逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
final class PoolThreadCache {
...
...
@SuppressWarnings({ "unchecked", "rawtypes" })
private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
if (cache == null) {
// no cache found so just return false here
return false;
}
boolean allocated = cache.allocate(buf, reqCapacity);
if (++ allocations >= freeSweepAllocationThreshold) {
allocations = 0;
trim();
}
return allocated;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private abstract static class MemoryRegionCache<T> {
....
/**
* Allocate something out of the cache if possible and remove the entry from the cache.
*/
public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
Entry<T> entry = queue.poll();
if (entry == null) {
return false;
}
// 使用entry初始化buf
initBuf(entry.chunk, entry.nioBuffer, entry.handle, buf, reqCapacity);

// 对entry对象的池化,减少GC
entry.recycle();

// allocations is not thread-safe which is fine as this is only called from the same thread all time.
++ allocations;
return true;
}
....
}

以上就从宏观上过了一遍从 线程私有的池资源缓存中 分配资源的一个原理。

但我们知道,缓存中的内容都是先访问过之后,才会被存到缓存中,在最开始的时候缓存都是空的,那么在这种情况下该如何分配池资源呢。 这时就要从全局池资源中来分配资源(其实是假的全局资源),下面来分析一下。

全局池资源分配

因为HeapByteBuf和DirectByteBuf的分配流程总体上几乎一样,因此下面就跟踪DirectByteBuf为例。

在跟代码之前,先介绍一下Arena。

PoolArena

arena

实际上,在netty里是使用PoolArena来分配池化的内存资源的,即使是上面分析了一通的(线程私有池化缓存)也是PoolArena在分配资源中的一个部分,这一点在之后会看到。

poolArena是如何解决并发安全问题

因为池化的内存资源是一个临界资源,因此在进行资源分配的时候其实是一个线程不安全的操作,Netty这里的做法是尽最大可能的减少了(默认情况实际可以说是完全避免了)并发访问PoolArena的情况,而这一做法就是把PoolArena线程私有化,即:

  1. ByteAllocator创建了一组PoolArena(默认创造2*CPU个数个)
  2. 然后通过PoolThreadLocalCache获取PoolThreadCache
  3. 在初始化PoolThreadCache时,会从这一组PoolArena中选择一个被引用次数最少的PoolArena来持有,这样,PoolArena就被保存在ThreadLocal中了。 默认情况下,PoolArena和NioEventLoop的创建个数是一样的,因此能够保证一一对应,这种情况,每一个PoolArena都仅被一个线程持有,也就不会出现线程安全问题了。(当前,这只是默认情况,并不能保证PoolArena一定只被一个线程持有, 所以实际在做分配操作的时候,还是用了synchronized)

看完上面的解释之后,再来看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class PooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider {

...
private final PoolArena<byte[]>[] heapArenas;
private final PoolArena<ByteBuffer>[] directArenas;
private final PoolThreadLocalCache threadCache;
...
...
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
boolean useCacheForAllThreads, int directMemoryCacheAlignment) {
super(preferDirect);
// 1. 初始化threadCache
threadCache = new PoolThreadLocalCache(useCacheForAllThreads);
this.tinyCacheSize = tinyCacheSize;
this.smallCacheSize = smallCacheSize;
this.normalCacheSize = normalCacheSize;

// other code...

if (nDirectArena > 0) {
// 2. 创建一组PoolArena,数组默认大小 = 2*CPU核数 = NioEventLoopGroup默认情况下NioEventLoop的个数
directArenas = newArenaArray(nDirectArena);
List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
for (int i = 0; i < directArenas.length; i ++) {
PoolArena.DirectArena arena = new PoolArena.DirectArena(
this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment);
directArenas[i] = arena;
metrics.add(arena);
}
directArenaMetrics = Collections.unmodifiableList(metrics);
} else {
directArenas = null;
directArenaMetrics = Collections.emptyList();
}
// other code...
}

@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
// 3. 获取ThreadLocal的相关信息
PoolThreadCache cache = threadCache.get();
// 4. 获取当前线程的Arena
PoolArena<ByteBuffer> directArena = cache.directArena;

final ByteBuf buf;
if (directArena != null) {
// 5. 使用Arena进行从池中分配一个buffer
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = PlatformDependent.hasUnsafe() ?
UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}

return toLeakAwareBuffer(buf);
}

}

PoolThreadLocalCache内初始化PoolArena:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {

// other code...

@Override
protected synchronized PoolThreadCache initialValue() {
// leastUsedArena将返回 PoolArena数组中 最少被引用的那个 PoolArena
// 因为PoolThreadLocalCache是 PooledByteBufAllocator的内部类,因此可以直接访问到其成员变量(PoolArena数组)
final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);

// other code...

// No caching so just use 0 as sizes.
return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
}
}

PoolArena内部维护的重要属性

  1. SubpagePools

PoolArena会缓存tinySubpagePools和smallSubpagePools两种SubpagePools,因为其原理作用极其类似,这里就以tinySubpagePools为例子
tinySubpagePools

1
2
3
4
// 缓存了给Tiny类型来分配Subpage的Page (因为一个Page能分配多个SubPage,这里存的实际上是没有使用完的Page...)(之所以是一个数组的形式,是因为对应Subpage不同的size类型)
private final PoolSubpage<T>[] tinySubpagePools;
// 同上,只不过是对Small类型的
private final PoolSubpage<T>[] smallSubpagePools;

SubpagePools里维护的其实是一个Page,只不过将Page给按固定size给分成SubPage,然后就成了SubPagePool,而这个size则由待分配资源的大小来决定.

SubpagePool在初始化的时候都是空的,并且当Page中的Subpage分配完的时候,也会将这个Page移除。

对于Tiny和Small类型的资源,会优先去Cache中分配(线程私有池资源缓存),如果没分配到,那么就从SubpagePool中去分配,如果还是没有分配到,那么就从ChunkList中去分配,如果ChunkList里也没有可用的Chunk,那么就new一个Chunk然后添加到ChunkList中,下面来看看ChunkList的样子。

  1. PoolChunkList

chunkList

1
2
3
4
5
6
7
// 这里是根据使用率的不同而维护的一个ChunkList
private final PoolChunkList<T> q050;
private final PoolChunkList<T> q025;
private final PoolChunkList<T> q000;
private final PoolChunkList<T> qInit;
private final PoolChunkList<T> q075;
private final PoolChunkList<T> q100;

在chunkList中分配资源的代码段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
// 遍历所有的chunkList,尝试去分配
if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
q075.allocate(buf, reqCapacity, normCapacity)) {
return;
}

// 如果所有的chunkList都没有一个chunk能用的,就new 一个chunk
// Add a new chunk.
PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
// 使用当前new的这个chunk去分配(注意,这里只能分配Normal大小的资源,即<=chunkSize, 即<=16M)
// 大于16M的资源走的是另一个逻辑
boolean success = c.allocate(buf, reqCapacity, normCapacity); // chunk的allocate!!!
assert success;
qInit.add(c);
}

跟进上面注释的chunk的allocate:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
 boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
final long handle;
if ((normCapacity & subpageOverflowMask) != 0) { // 是否 >= pageSize(8K)
handle = allocateRun(normCapacity); // 分配>=1个page, Run的另一意思是连续的.. 即分配多个page
} else {
handle = allocateSubpage(normCapacity); // 分配一个page, 用于分解成subPage, 之后会挂载到subPagePool上
}

if (handle < 0) {
return false;
}
ByteBuffer nioBuffer = cachedNioBuffers != null ? cachedNioBuffers.pollLast() : null;
initBuf(buf, nioBuffer, handle, reqCapacity);
return true;
}

以上便是从chunkList中取一个chunk来分配资源,若不存在能用的chunk则new一个。 若存在能用的chunk,则根据待分配资源的大小从chunk中分配一个或多个page来去分配。


到此位置,就分析完了Arena池化资源的一个逻辑,了解了上面的内容以后,再看下面的代码就非常清楚了!(代码里加了详尽的注释)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
abstract class PoolArena<T> implements PoolArenaMetric {
// 是否能用Unsafe对象
static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe();

// 待分配资源的size类型,前面说过了
enum SizeClass {
Tiny,
Small,
Normal
}

// 缓存了给Tiny类型来分配Subpage的Page (因为一个Page能分配多个SubPage,这里存的实际上是没有使用完的Page...)(之所以是一个数组的形式,是因为对应Subpage不同的size类型)
private final PoolSubpage<T>[] tinySubpagePools;
// 同上,只不过是对Small类型的
private final PoolSubpage<T>[] smallSubpagePools;


// 这里是根据使用率的不同而维护的一个ChunkList
private final PoolChunkList<T> q050;
private final PoolChunkList<T> q025;
private final PoolChunkList<T> q000;
private final PoolChunkList<T> qInit;
private final PoolChunkList<T> q075;
private final PoolChunkList<T> q100;

// 其他一些很重要的成员变量...这里不一一细说

private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
// 向上取整数(2的整数次幂)
final int normCapacity = normalizeCapacity(reqCapacity);
// 判断是否小于一个pageSize
if (isTinyOrSmall(normCapacity)) { // capacity < pageSize(8K)
int tableIdx;
PoolSubpage<T>[] table;
boolean tiny = isTiny(normCapacity);
if (tiny) { // < 512 , tiny的情况
// 先尝试从cache里分配!!!!! 如果能拿到并分配就直接返回了
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
// 从cache里没有拿到,则尝试从SubpagePool中去分配!!!!!
// TinySizeTable对应tiny的table[0, 16, 32, 48, 64, ...] , idx对应table中的idx
// 例如:reqCapacity=24, 则 24->32 对应[0, 16, 32, 48, 64, ...]中的idx为2
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else { // 512 <= 当前需要分配的byteBuf的大小 < 8K(8192)
// 先尝试从cache里分配!!!!! 如果能拿到并分配就直接返回了
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
// // 从cache里没有拿到,则尝试从SubpagePool中去分配!!!!!
// SmallSizeTable对应tiny的table[512,1024,2048,4096] , idx对应table中的idx
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}

// 获取到对应Size的SubPage
final PoolSubpage<T> head = table[tableIdx];

/**
* Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
* {@link PoolChunk#free(long)} may modify the doubly linked list as well.
*/
synchronized (head) {
final PoolSubpage<T> s = head.next;
if (s != head) {
assert s.doNotDestroy && s.elemSize == normCapacity;
long handle = s.allocate();
assert handle >= 0;
s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
incTinySmallAllocation(tiny);
return;
}
}

// 如果从cache, subpagePool里都没拿到,则从chunkList中取一个chunk来分配!!!!!
//(尽管每次都选引用次数最少的那个Arena来分配给线程,但还是可能会出现一个Arena被分配给多个线程的情况)
// 所以在资源分配的时候加锁
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
}

incTinySmallAllocation(tiny);
return;
}

if (normCapacity <= chunkSize) {
// 如果待分配资源的大小 <= chunkSize, 那么依然可以从chunk中去分配!!!!!
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
++allocationsNormal;
}
} else {
// Huge的分配逻辑,单独走
// Huge allocations are never served via the cache so just call allocateHuge
allocateHuge(buf, reqCapacity);
}
}
}