什么是AQS
我们常用的j.u.c包里,提供了许多强大的同步工具,例如ReentrantLock,Semphore,ReentrantReadWriteLock等,但当这些工具难以满足某个场景的需求时,我们就需要定制化我们自己的同步器,这时,我们可能会想,如果能有一个像Servlet这种只要重写某几个方法就能完成一把定制锁的实现的就好了!! 没错,AQS就是提供了这样一种功能,它如果要实现一个同步器的大部分通用功能都帮我们实现好了,然后提供出抽象函数供我们重写来定制化自己想要的同步器。 实际上,上面所说的ReentrantLock,Semphore,ReentrantReadWriteLock等juc包中同步工具的实现,也都是在AQS的辅助下进行的“二次开发”。 例如在ReentrantLock继承了Lock接口,然后利用定制化了的继承了AQS的类,来去实现Lock接口。
AQS提供了什么功能
同步器一般会包括两种方法,一种是acquire方法, 另一种是release方法; acquire方法是尝试获取锁操作,如果获取不到就阻塞(park)当前线程,并将其放入等待队列中;release方法是释放锁操作,然后会从等待队列中出队一个或多个被acquire阻塞的线程并将其唤醒(unpark).
j.u.c包中并没有对同步器的API做一个统一的定义。因此,有一些类定义了通用的接口(如Lock),而另外一些则定义了其专有的版本。因此在不同的类中,acquire和release操作的名字和形式会各有不同。例如:Lock.lock,Semaphore.acquire,CountDownLatch.await和FutureTask.get,在这个框架里,这些方法都是acquire操作。但是,J.U.C为支持一系列常见的使用选项,在类间都有个一致约定。在有意义的情况下,每一个同步器都支持下面的操作:
- 阻塞(例如:acquire)和非阻塞(例如:tryAcquire)同步。
- 可选的超时设置,让调用者可以放弃等待
- 通过中断实现的任务取消,通常是分为两个版本,一个acquire可取消,而另一个不可以(例如ReentrantLock中的
lockInterruptibly()
就是可在阻塞等待中被中断的,而lock()
是阻塞等待中不可被中断的)。
读源码之前需要知道的知识
AQS的内部队列
在AQS中,被阻塞的线程会被打包成一个Node然后放到等待队列中,head指向队列头结点,tail指向尾结点,队列不存在时(未初始化时)的样子为:head==tail==null
,初始化之后,队列为空的情况为:head==tail==dummy头结点
,如下图所示:
head指向dummy头结点,这个头结点存在的意义是为了方便队列操作,并且里面保存的thread恒为null。下面来看一下node每个字段的意思
Node
为了抓住重点学习,这里只介绍Node里的重要成员:
- thread :当前结点里保存的线程
- prev,next:当前结点的前后指针,这里队列的实现是带有头结点的双向链表。 prev是靠近头结点那一端的,next是靠近尾结点那一端的。
- waitStatus:初始状态为0。为-1时,表示存在正在阻塞等待的线程,结点入队之后,会自旋一次来再次尝试tryAcquire,如果依然失败,才会进入阻塞,自旋的这一次就是把waitStatus字段CAS成-1。 这一字段取值范围如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
// 当前结点为-1, 则说明后一个结点需要park阻塞
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
AQS源码解读
这里先更新一下独占式的部分。。共享式的日后再看.
独占式代码部分
先有个宏观上的理解,如下图:
其中tryRelease,tryAcquire是非阻塞式获取锁。 有了宏观上的框架,再去看一下实现的细节。
acquire
1 | public final void acquire(int arg) { |
这里使用了短路原理
, 如果tryAcquire
成功的话,就直接跳出if了; 如果 tryAcquire
失败,那么会先执行addWaiter
把当前线程打包成一个node放入等待队列, 然后再执行acquireQueued
尝试一次自旋,如果依然无法获取到锁,就进入阻塞。
addWaiter
1 | private Node addWaiter(Node mode) { |
将当前线程打包成一个node, 然后将这个node入队,如果入队失败则有2种情况:
- 队列还不存在(队列还没初始化)
- 在入队时,出现了同步问题。(这里的队列也是临界资源,如果CAS失败说明资源竞争失败)
当入队失败时,进入enq
函数,这一函数的作用是:初始化队列并自旋入队操作。
enq
1 | private Node enq(final Node node) { |
如果队列未初始化,那么就初始化队列,如果已经初始化了,就将当前结点自旋入队,该方法一定返回true.
线程被打包成结点,然后入队之后,会进入acquireQueued进行一次自旋try,如果依然失败就阻塞
acquireQueued
1 | final boolean acquireQueued(final Node node, int arg) { |
先判断前驱结点是不是head,因为head指向的是dummy结点,因此,如果前驱结点就是head了,那么当前结点就是队首了!! 然后只有队首的结点才有资格在第一次自旋的时候进行tryAcquire
每一个结点不会改变自己的waitStatus, 只会改变在队列中前驱结点的waitStatus , 因此,如果前驱结点是0,则通过CAS操作将其变为-1,然后自旋一次,如果前驱结点是-1,则说明已经自旋过一次了,然后才能进入 parkAndCheckInterrupt
函数,也就是将当前结点的线程阻塞。
这个函数里的几个细节,如果队首元素成功tryAcquire,则需要进行出队操作,把当前结点设置成dummy结点就可以了。
在setHead的时候。 会将thread设置成null 也是用于help gc 。 同时也要手动让前驱结点的next设置为null, 方便gc回收..
到此位置,线程就会被卡在parkAndCheckInterrupt
这个函数中,等待被唤醒
release
1 | public final boolean release(int arg) { |
release的实现就更短了,如果tryRelease成功的话,就看是否还存在阻塞等待的线程,if (h != null && h.waitStatus != 0)
这句话的判断就是判断否还存在阻塞等待的线程。 如果h是null的话,则说明队列根本就不存在,更别说等待的线程了,如果h.waitStatus不是0的话,则说明队列里存在等待的线程node。
如果存在正在等待的线程的话,就unparkSuccessor
, 即唤醒这个正在等待的队首线程.
unparkSuccessor
1 | private void unparkSuccessor(Node node) { |
其中,s是下一个需要被唤醒的node结点,然后后面会对其进行unpark(唤醒)操作。
AQS的使用
到目前位置,只是简单过完了一遍AQS的独占式的acquire和release操作, 它帮我们完成了一部分同步状态管理事情,但是最关键的tryAcquire
和tryRelease
其实它是一个需要我们去重写的方法:
1 | protected boolean tryAcquire(int arg) { |
需要做的事情
在使用AQS的时候,往往需要我们自己去重写:
- tryAcquire
- tryRelease
- tryAcquireShared
- tryReleaseShared
- isHeldExclusively:如果对于当前(正调用的)线程,同步是以独占方式进行的,则返回 true。此方法只是 abstractqueuedsynchronizer.conditionobject 方法内进行内部调用,因此,如果不使用条件,则不需要定义它。
在实现tryAcquire的时候,我们需要对内部的status进行操作,AQS也提供给了我们关于Status操作接口,分别是:
- getState()
- setState(int)
- compareAndSetState(int, int)
源码实现如下:
1 | protected final int getState() { |
AQS在使用的时候,往往是使用一个内部类继承AQS,然后重写上述提到的方法,然后就可以在当前类中使用这个内部类的acquire / release来实现同步了
使用AQS完成信号量的功能
1 | class Mutex implements Lock, java.io.Serializable { |
了解了AQS的原理之后,可以来趁热打铁的看一下ReentrantLock的加锁实现
ReentrantLock的原理
这里主要详细介绍一下ReentrantLock对AQS的两种实现方式:
- 公平锁(FairSync)
- 非公平锁(NonfairSync)
其中Sync是公平锁和非公平锁的抽象基类,里面已经初步实现了一些方法,但其中的lock()
方法和tryAcquire()
方法依然是抽象的,需要子类去进行实现,而公平锁和非公平锁的主要区别也主要在这两个函数中,下面来看一下。
公平锁与非公平锁的实现区别
lock操作:
非公平锁
1 | final void lock() { |
公平锁
1 | final void lock() { |
可以看到,非公平锁在lock的时候会进行一次CAS操作,如果直接获取到锁了的话,那么就直接继续执行。 在临界区的执行速度比较快的情况下,非公平锁会比公平锁要更快,因为在唤醒阻塞线程的过程中,有可能有其他线程已经取得锁然后执行完并释放了。。
tryAcquire操作:
非公平锁:
1 | protected final boolean tryAcquire(int acquires) { |
公平锁
1 | protected final boolean tryAcquire(int acquires) { |
可以看出,在tryAcquire时,公平锁会先判断是否存在比当前线程等待的更久的线程,如果不存在这样的线程,才能进行CAS尝试获取锁; 而非公平锁是直接进行CAS获取锁。
关于Interrupt
我们知道, thread1.interrupt()就是将thread1的中断标志位置为1(Thread.interrupted()是检测并清除中断标志,thread1.isInterrupted()是仅仅检测thread1的中断标志但不清除).
ReentrantLock()
的lock()
方法,thread因等待资源而被阻塞在等待队列中的时候,不会被打断,而是先将这个中断标记位记下来,然后当获取到锁资源之后,执行selfInterrupt()
, 也就是在获得锁资源后打断自己!! 如果希望在阻塞队列中依然可以被打断的话,应该使用lockInterruptibly
, 这个lock操作是可以允许线程在阻塞等待时被中断的!
到此为止,我们看到了在ReentrantLock中对tryAcquire和tryRelease的实现,分别实现了公平竞争和非公平竞争的场景,因为这里的ReentrantLock是独占式的锁(也就是说资源只允许被一个线程获取,也可以理解成01信号量),所以并没有实现 tryAcquireShared
和tryReleaseShared
这两个方法。 实际上,我们在使用的时候也是,需要哪种模式就实现对应模式的acquire和release.
对于 tryAcquireShared
和tryReleaseShared
这两个方法的实现例子,可以去看看Semphore的源码,它就是只重写了tryAcquireShared
和tryReleaseShared
,理解完上面分析的代码之后,去看Semphore的源码也不会很困难了。。日后有时间再写Semphore
的源码记录把。。