0%

详解AQS

什么是AQS

我们常用的j.u.c包里,提供了许多强大的同步工具,例如ReentrantLock,Semphore,ReentrantReadWriteLock等,但当这些工具难以满足某个场景的需求时,我们就需要定制化我们自己的同步器,这时,我们可能会想,如果能有一个像Servlet这种只要重写某几个方法就能完成一把定制锁的实现的就好了!! 没错,AQS就是提供了这样一种功能,它如果要实现一个同步器的大部分通用功能都帮我们实现好了,然后提供出抽象函数供我们重写来定制化自己想要的同步器。 实际上,上面所说的ReentrantLock,Semphore,ReentrantReadWriteLock等juc包中同步工具的实现,也都是在AQS的辅助下进行的“二次开发”。 例如在ReentrantLock继承了Lock接口,然后利用定制化了的继承了AQS的类,来去实现Lock接口。


AQS提供了什么功能

同步器一般会包括两种方法,一种是acquire方法, 另一种是release方法; acquire方法是尝试获取锁操作,如果获取不到就阻塞(park)当前线程,并将其放入等待队列中;release方法是释放锁操作,然后会从等待队列中出队一个或多个被acquire阻塞的线程并将其唤醒(unpark).

j.u.c包中并没有对同步器的API做一个统一的定义。因此,有一些类定义了通用的接口(如Lock),而另外一些则定义了其专有的版本。因此在不同的类中,acquire和release操作的名字和形式会各有不同。例如:Lock.lock,Semaphore.acquire,CountDownLatch.await和FutureTask.get,在这个框架里,这些方法都是acquire操作。但是,J.U.C为支持一系列常见的使用选项,在类间都有个一致约定。在有意义的情况下,每一个同步器都支持下面的操作:

  • 阻塞(例如:acquire)和非阻塞(例如:tryAcquire)同步。
  • 可选的超时设置,让调用者可以放弃等待
  • 通过中断实现的任务取消,通常是分为两个版本,一个acquire可取消,而另一个不可以(例如ReentrantLock中的lockInterruptibly()就是可在阻塞等待中被中断的,而lock()是阻塞等待中不可被中断的)。

读源码之前需要知道的知识

AQS的内部队列

在AQS中,被阻塞的线程会被打包成一个Node然后放到等待队列中,head指向队列头结点,tail指向尾结点,队列不存在时(未初始化时)的样子为:head==tail==null ,初始化之后,队列为空的情况为:head==tail==dummy头结点,如下图所示:
在这里插入图片描述
head指向dummy头结点,这个头结点存在的意义是为了方便队列操作,并且里面保存的thread恒为null。下面来看一下node每个字段的意思

Node

为了抓住重点学习,这里只介绍Node里的重要成员:

  • thread :当前结点里保存的线程
  • prev,next:当前结点的前后指针,这里队列的实现是带有头结点的双向链表。 prev是靠近头结点那一端的,next是靠近尾结点那一端的。
  • waitStatus:初始状态为0。为-1时,表示存在正在阻塞等待的线程,结点入队之后,会自旋一次来再次尝试tryAcquire,如果依然失败,才会进入阻塞,自旋的这一次就是把waitStatus字段CAS成-1。 这一字段取值范围如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    /** waitStatus value to indicate thread has cancelled */
    static final int CANCELLED = 1;

    // 当前结点为-1, 则说明后一个结点需要park阻塞
    static final int SIGNAL = -1;

    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;

    /**
    * waitStatus value to indicate the next acquireShared should
    * unconditionally propagate
    */
    static final int PROPAGATE = -3;

AQS源码解读

这里先更新一下独占式的部分。。共享式的日后再看.

独占式代码部分

先有个宏观上的理解,如下图:
在这里插入图片描述
其中tryRelease,tryAcquire是非阻塞式获取锁。 有了宏观上的框架,再去看一下实现的细节。

acquire

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

这里使用了短路原理, 如果tryAcquire成功的话,就直接跳出if了; 如果 tryAcquire失败,那么会先执行addWaiter把当前线程打包成一个node放入等待队列, 然后再执行acquireQueued尝试一次自旋,如果依然无法获取到锁,就进入阻塞。

addWaiter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

将当前线程打包成一个node, 然后将这个node入队,如果入队失败则有2种情况:

  • 队列还不存在(队列还没初始化)
  • 在入队时,出现了同步问题。(这里的队列也是临界资源,如果CAS失败说明资源竞争失败)
    当入队失败时,进入enq函数,这一函数的作用是:初始化队列并自旋入队操作。

enq

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

如果队列未初始化,那么就初始化队列,如果已经初始化了,就将当前结点自旋入队,该方法一定返回true.

线程被打包成结点,然后入队之后,会进入acquireQueued进行一次自旋try,如果依然失败就阻塞

acquireQueued

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
final boolean acquireQueued(final Node node, int arg) {
booleanfailed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// (*)
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

先判断前驱结点是不是head,因为head指向的是dummy结点,因此,如果前驱结点就是head了,那么当前结点就是队首了!! 然后只有队首的结点才有资格在第一次自旋的时候进行tryAcquire

每一个结点不会改变自己的waitStatus, 只会改变在队列中前驱结点的waitStatus , 因此,如果前驱结点是0,则通过CAS操作将其变为-1,然后自旋一次,如果前驱结点是-1,则说明已经自旋过一次了,然后才能进入 parkAndCheckInterrupt函数,也就是将当前结点的线程阻塞。

这个函数里的几个细节,如果队首元素成功tryAcquire,则需要进行出队操作,把当前结点设置成dummy结点就可以了。
在setHead的时候。 会将thread设置成null 也是用于help gc 。 同时也要手动让前驱结点的next设置为null, 方便gc回收..

到此位置,线程就会被卡在parkAndCheckInterrupt这个函数中,等待被唤醒

release

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

release的实现就更短了,如果tryRelease成功的话,就看是否还存在阻塞等待的线程,if (h != null && h.waitStatus != 0) 这句话的判断就是判断否还存在阻塞等待的线程。 如果h是null的话,则说明队列根本就不存在,更别说等待的线程了,如果h.waitStatus不是0的话,则说明队列里存在等待的线程node。

如果存在正在等待的线程的话,就unparkSuccessor , 即唤醒这个正在等待的队首线程.

unparkSuccessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
 private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

其中,s是下一个需要被唤醒的node结点,然后后面会对其进行unpark(唤醒)操作。

AQS的使用

到目前位置,只是简单过完了一遍AQS的独占式的acquire和release操作, 它帮我们完成了一部分同步状态管理事情,但是最关键的tryAcquiretryRelease 其实它是一个需要我们去重写的方法:

1
2
3
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

需要做的事情

在使用AQS的时候,往往需要我们自己去重写:

  • tryAcquire
  • tryRelease
  • tryAcquireShared
  • tryReleaseShared
  • isHeldExclusively:如果对于当前(正调用的)线程,同步是以独占方式进行的,则返回 true。此方法只是 abstractqueuedsynchronizer.conditionobject 方法内进行内部调用,因此,如果不使用条件,则不需要定义它。

在实现tryAcquire的时候,我们需要对内部的status进行操作,AQS也提供给了我们关于Status操作接口,分别是:

  • getState()
  • setState(int)
  • compareAndSetState(int, int)

源码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
protected final int getState() {
return state;
}

protected final void setState(int newState) {
state = newState;
}

protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

AQS在使用的时候,往往是使用一个内部类继承AQS,然后重写上述提到的方法,然后就可以在当前类中使用这个内部类的acquire / release来实现同步了

使用AQS完成信号量的功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class Mutex implements Lock, java.io.Serializable {
// 自定义同步器
private static class Sync extends AbstractQueuedSynchronizer {
// 判断是否锁定状态
protected boolean isHeldExclusively() {
return getState() == 1;
}

// 尝试获取资源,立即返回。成功则返回true,否则false。
public boolean tryAcquire(int acquires) {
assert acquires == 1; // 这里限定只能为1个量
if (compareAndSetState(0, 1)) {//state为0才设置为1,不可重入!
setExclusiveOwnerThread(Thread.currentThread());//设置为当前线程独占资源
return true;
}
return false;
}

// 尝试释放资源,立即返回。成功则为true,否则false。
protected boolean tryRelease(int releases) {
assert releases == 1; // 限定为1个量
if (getState() == 0)//既然来释放,那肯定就是已占有状态了。只是为了保险,多层判断!
throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);//释放资源,放弃占有状态
return true;
}
}

// 真正同步类的实现都依赖继承于AQS的自定义同步器!
private final Sync sync = new Sync();

//lock<-->acquire。两者语义一样:获取资源,即便等待,直到成功才返回。
public void lock() {
sync.acquire(1);
}

//tryLock<-->tryAcquire。两者语义一样:尝试获取资源,要求立即返回。成功则为true,失败则为false。
public boolean tryLock() {
return sync.tryAcquire(1);
}

//unlock<-->release。两者语文一样:释放资源。
public void unlock() {
sync.release(1);
}

//锁是否占有状态
public boolean isLocked() {
return sync.isHeldExclusively();
}
}

了解了AQS的原理之后,可以来趁热打铁的看一下ReentrantLock的加锁实现

ReentrantLock的原理

这里主要详细介绍一下ReentrantLock对AQS的两种实现方式:

  • 公平锁(FairSync)
  • 非公平锁(NonfairSync)
    在这里插入图片描述
    其中Sync是公平锁和非公平锁的抽象基类,里面已经初步实现了一些方法,但其中的lock()方法和tryAcquire()方法依然是抽象的,需要子类去进行实现,而公平锁和非公平锁的主要区别也主要在这两个函数中,下面来看一下。

公平锁与非公平锁的实现区别

lock操作:

非公平锁

1
2
3
4
5
6
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

公平锁

1
2
3
final void lock() {
acquire(1);
}

可以看到,非公平锁在lock的时候会进行一次CAS操作,如果直接获取到锁了的话,那么就直接继续执行。 在临界区的执行速度比较快的情况下,非公平锁会比公平锁要更快,因为在唤醒阻塞线程的过程中,有可能有其他线程已经取得锁然后执行完并释放了。。

tryAcquire操作:

非公平锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 这里直接进行CAS , 尝试拿锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 重入时,给state加一个acquires偏移量,对应release时会减去一次
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

公平锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 这里会先判断是否存在比当前线程等待更久的线程!
// 只有不存在等待的线程的时候,才有资格去尝试获取锁资源(CAS)
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 重入时,给state加一个acquires偏移量,对应release时会减去一次
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

可以看出,在tryAcquire时,公平锁会先判断是否存在比当前线程等待的更久的线程,如果不存在这样的线程,才能进行CAS尝试获取锁; 而非公平锁是直接进行CAS获取锁。

关于Interrupt

我们知道, thread1.interrupt()就是将thread1的中断标志位置为1(Thread.interrupted()是检测并清除中断标志,thread1.isInterrupted()是仅仅检测thread1的中断标志但不清除).

ReentrantLock()lock()方法,thread因等待资源而被阻塞在等待队列中的时候,不会被打断,而是先将这个中断标记位记下来,然后当获取到锁资源之后,执行selfInterrupt(), 也就是在获得锁资源后打断自己!! 如果希望在阻塞队列中依然可以被打断的话,应该使用lockInterruptibly , 这个lock操作是可以允许线程在阻塞等待时被中断的!

到此为止,我们看到了在ReentrantLock中对tryAcquire和tryRelease的实现,分别实现了公平竞争和非公平竞争的场景,因为这里的ReentrantLock是独占式的锁(也就是说资源只允许被一个线程获取,也可以理解成01信号量),所以并没有实现 tryAcquireSharedtryReleaseShared 这两个方法。 实际上,我们在使用的时候也是,需要哪种模式就实现对应模式的acquire和release.

对于 tryAcquireSharedtryReleaseShared 这两个方法的实现例子,可以去看看Semphore的源码,它就是只重写了tryAcquireSharedtryReleaseShared,理解完上面分析的代码之后,去看Semphore的源码也不会很困难了。。日后有时间再写Semphore的源码记录把。。