J.U.C(3)-AQS实现
AbstractQueuedSynchronizer是实现锁的关键,Doug Lea关于实现这个类的论文: The java.util.concurrent Synchronizer Framework
论文中的第3节描述了,AbstractQueuedSynchronizer 的设计和实现过程。
设计
对于一个同步器(synchronzier),最基本的操作有两个:acquired(获取锁) 和 release(释放锁)
acquired
1
2
3
4
5while (synchronization state does not allow acquire) {
enqueue current thread if not already queued;
possibly block current thread;
}
dequeue current thread if it was queued;release
1
2
3update synchronization state;
if (state may permit a blocked thread to acquire)
unblock one or more queued threads;
这实现上面的功能,则必须有三个基本的组件(component):
Atomically managing synchronization state
Blocking and unblocking threads
Maintaining queues
实现
Synchronization State
1 | /** |
state 字段是 volatile 的,保证的其内存可见性,使用 compareAndSetState 方法(CAS)来保证原子性,所以使用 state 是线程安全的。
对于改变 state 字段其实有两个方法:
setState
compareAndSetState
其实,提供两个更新 state 的方法是,基于 同步 和 性能 的考虑:当在确定当前线程是锁的拥有者时,直接使用 setState 方法可以提高性能(例如:tryRelease方法中,可以直接使用setState,因为此时,当前线程一定是锁的owner。还有就是请求锁重入的时候,也可以直接使用),当无法确定当前线程是否拥有锁时,使用 compareAndSetState,来保证 state 是同步的。
基于 AbstractQueuedSynchronizer 类的 同步器 的实现,必须 override 其 tryAcquire 和 tryRelease 方法。这两个方法都有一个 int 类型的参数,可以根据这个参数和当前 state 字段的值 来确定 同步器 对锁的获取和释放的行为,从而使用不同功能的同步器。
Blocking
在 jdk1.5 之前惟一能让一个线程 block 和 unblock 的方法就是:Thread.suspend 和 Thread.resume,但是这两个方法有可能导致线程死锁,所以已经不能使用了。
jdk1.5 中提供了一个支持线程挂起的类:
java.util.concurrent.locks.LockSupport
LockSupport会将使用过它(LockSupport)的线程都会关联一个 permit. 如果当前线程的 permit 是 available 的,则 park 方法的调用将立即返回, consuming it in the process; 否则线程会 block.
如果线程的 permit 不是 avilable 的,则 unpark 的调用则会使用 permit 变成 avilable 的。
park
block 当前线程
park有以下几种:
park(Object blocker)
parkNanos(Object blocker, long nanos)
parkUntil(Object blocker, long deadline)
park()
parkNanos(long nanos)
parkUntil(long deadline)
前三个 park 与 后三个惟一不同的就是:多一个 blocker 参数,这个参数的作用是:This object is recorded while the thread is blocked to permit monitoring and diagnostic tools to identify the reasons that threads are blocked. 这个参数用来给调试工具使用。文档中推荐使用前三个API。
调用这个方法
unpark(Thread thread)
unblock 指定的线程
使用指定的线程的 permit 变成 available 的。如果线程因为 park 而 block,则此时会 unblock.此外,由于 该线程的 permis is available, 所以 在线程上再次调用 park 将立即返回,而不是 block.
The park method also supports optional relative and absolute timeouts, and is integrated with JVM Thread.interrupt support — interrupting a thread unparks it.
park方法是可以被中断的。中断之后,线程会 unpark.
Queues
同步框架的核心就是 阻塞线程队列 的维护。这个队列是 FIFO的,同步框架不支持基于优先级的同步。
实现使用 CLH queue
Conditions Queues
ReentrantLock 实现
lock
java.util.concurrent.locks.ReentrantLock.NonfairSync.lock
1
2
3
4
5
6final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
首先,尝试立即获取锁,如果,注意参数 compareAndSetState(0, 1), 表示这个调用返回true的,惟一条件就是:这个锁lock的 state 字段是0,才会成功,否则,调用 acquire 获取锁。
- java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire
1
2
3
4
5public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这个方法接受一个int参数,这个参数对于AbstractQueuedSynchronizer来说并没有具体的含义,此参数会直接传递给,具体的 Synchronizer 实现类的 override 的 tryAcquire 方法。所以,这个参数其实就是由具体的实现类决定其含义的。
文档描述:以独占模式请求锁,忽略中断。至少调用一次 tryAcquire,如果成功则这个方法直接返回。否则,当前线程(调用acquire方法的线程)将会进入请求队列,并可能重复的 blocking and unblocking, 直到 tryAcquire 调用成功。
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
addWaiter 是 lock-free 的方法,所以对于 上面的调用,acuireQueued中 addWaiter(Node.EXCLUSIVE) 中返回的node其实有可能已经不是处于 tail 了(在 addWaiter 方法中始终将当前的node 放置到queue的 tail. 由于 addWaiter的并发,),所以当 acquireQueued 执行的时候传递进去的 node ,其实已经不在 tail 位置了。
tryAcquire
AbstractQueuedSynchronizer类定义了这个方法,但是没有实现,这个方法就是实现各个功能的锁的核心,这个方法的作用就是,判定当前线程是否应该获得锁 ,如果可以返回 true, 否则返回 false。子类通过不同的逻辑判定线程是否可以获取锁,就可以实现各种功能的锁。
ReentrantLock 的内部类Sync实现的 tryAcquire 直接调用了下面的方法
java.util.concurrent.locks.ReentrantLock.Sync.nonfairTryAcquire
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 尝试一次,是否可以直接获取
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 判断是否是同一个线程再次获取锁,支持可重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 由此可知在 ReentrantLock 的实现中,
// state 字段还表示,当前持有锁的线程的重入次数
setState(nextc);
return true;
}
// 上面两种尝试都未成功表示,线程获取锁失败
return false;
}
由上面的代码可知,ReentrantLock 支持可重入,所以对于 AbstractQueuedSynchronizer 所维护的 queue 中的 Node 和 所以 请求锁的线程是1:1(一一对应)。每一个Thread在queue只有惟一一个node和其对应。
由 acquire 的实现可知:
1 | if (!tryAcquire(arg) && |
如果 tryAcquire 方法获取锁成功,则返回 true, 上面的表达将不再执行,acquire方法就返回。如果锁获取失败,则将使用 addWaiter 方法将当前请求线程以 exclusive 模式的 Node 结点添加到 请求队列的尾部。然后,将返回代表当前线程的Node 对象,作为参数传递给 acquireQueued 方法,而这个方法将会使用当前线程在queue中等待获取锁。具体分析,如下:
java.util.concurrent.locks.AbstractQueuedSynchronizer.addWaiter
将当前线程添加到请求队列的 tail 处。整个添加过程使用 CAS 来确保线程安全。添加成功之后将这个 node 返回。
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued
这个方法有两个参数:
node: 这个 node 始终代表当前线程。node.thread == Thread.currentThread()
arg 传递给 tryAcquire
1 | final boolean acquireQueued(final Node node, int arg) { |
这个方法其实就是 CLH 锁(是一个 FIFO 的 queue)的一个实现:
获得当前 node 的前驱结点 p.
final Node p = node.predecessor();
判断 p == head, 如果相同,则表示,当前结点已经到了这个queue的头部,按照 CLH queue 的算法,这时候,这个线程应该就是锁的拥有者了,但是,存在一种可能就是,当前正有一个线程也在请求锁,但这个线程还未入queue(例如:一个线程正在执行,acquire 方法中的 tryAcquire去获取锁,而这个线程尚未入 queue),所以不能直接将锁交给 node, 而是调用 tryAcquire 去获取。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19if (p == head && tryAcquire(arg)) {
// 获取锁成功,当前 node 出队列,
setHead(node);
// 将 p 对 node 的引用 next 断开,for help GC
// 虽然,上面通过 setHead 将 node 设置为新的 head
// 但 这里 p 仍旧指向,以前的 head(也就是被移出队列的head)
// 把它的 next 设置成 null, 减少对 当前 head (node) 的引用
p.next = null; // help GC
// 获取锁,成功了,没失败,所以 failed = false
failed = false;
// interrupted 表示 阻塞返回的原因,也就是
// parkAndCheckInterrupt 调用返回的原因,当这个方法返回后到
// acquire中,acquire会根据这个状态,如果 ture,则中断当前线程,
// 否则,直接成功返回,由此可知锁的获取是可以被中断的。
return interrupted;
}根据前驱结点 p 的 waitStatus 状态位来判定是否park
shouldParkAfterFailedAcquire(p, node)
1 | private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { |
由于 shouldParkAfterFailedAcquire 是在 acquireQueued 的循环内部调用的,所以 shouldParkAfterFailedAcquire 这执行逻辑应该是这样的。
第一次循环,调用前继可能被取消了也就是 ws > 0 的情况,这时候,从前继向前遍历,将所以被取消的前继从队列中移除。这时候,其实 sync queue 发生了结构性修改,所有的 Node.CANCELLED 结点被移出
第二次循环,基本可以确定 ws <= 0了,其实在并发的情况下有可能,上次调整的结点也被取消了。所以,第二次循环的时候,有可能还会出现 ws > 0 的情况,那就继续循环呗。
如果上面返回 true 则表示,当前线程应该 park,调用parkAndCheckInterrupt方法,使得当前线程 block.
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
到此为止,所有在 queue 中,但没在头部的 node(线程者是并发执行的,而且Lock的实现是 lock-free 的,所以,所有的线程都开始进行自旋等待了) 都将进入 park 状态,直到头部的 node 的释放锁。
获取锁的流程:
尝试锁是否可以被获取,如果可以直接获取返回,否则,进入步骤2.
再次尝试是否可以直接被获取,可以则获取成功。
尝试是否,是同一个线程再次获取,如果是,则成功。
将当前线程添加到请求队列中。
依据 前一个结点 的状态位来,判定是否进行 park
直到 unpark, 然后返回
unlock
- java.util.concurrent.locks.AbstractQueuedSynchronizer.release
1
2
3
4
5
6
7
8
9public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
由于,lock方法返回的时候,已经将当前线程,设置为 head, 所以,如果一个线程将持有的锁释放了,就应该通知 queue 中的 head 元素,让其后继结点,参与到锁的竞争中来。由于 后继通常是在 blocking 状态,所以要 调用 unparkSuccessor 对后继结点进行 唤醒 操作。
- java.util.concurrent.locks.ReentrantLock.Sync.tryRelease
1
2
3
4
5
6
7
8
9
10
11
12protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
ReentrantLock 由于 支持可重入,所以 state 字段,也表示,锁的持有线程的对锁的重入次数,如上面所示,直到,重入为0,时才释放锁,否则unlock,只将是调整重入次数(state),并不释放锁。返回值代表,锁是否被成功释放。
unparkSuccessor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread. 如果有后继在当前 node 上等待,则这个状态应该是 Node.SIGNAL ,如果当前 node 没有后继,则这个 状态字段应该是 0. 但是对于这个操作是否成功已经不重要了,因为,随着下面的 unpark 的调用,这个 node 将被 永远地移除,因为它的作用已经完成了:0. 成功获取锁(node 由 head.next 变成 head) 1.下面 unpark 调用 将通知在其上 wait 的 node ( node将被其next取代,node被移除 ), 这两个功能完成,这个 node 就 head 处移除。
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor. 如果后继结点是null或者后继被取消了,则从同步队列的 tail 处开始 前向遍历 直到 node 处,找到 node 的真正的后继结点。
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 最终经过上面的查找,如果后继不为空,则通知后继可以unpark了。后继 unpark 之后,再一次参与到锁的竞争中去。
if (s != null)
LockSupport.unpark(s.thread);
}cancelAcquire
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
// help for GC
node.thread = null;
// Skip cancelled predecessors
// 跳过已经被取消的前继,直到 pred.waitStatus <= 0
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 上面的代码执行完成之后,node.prev 被直接改成了 pred,
// 而这个 pred 的 next 元素该如果处理。
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
// 如果按照上面的循环处理的话,其实 predNex这个
// 结点本身是被取消(CANCELLED)的,但是其前继却
// 是 <= 0 的,所以这个元素,需要特殊处理。
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference(干扰) from other threads.
// 将 node 结点设置成 取消的。
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
// 如果 node 是 最后一个结点,则直接将 pred
// 设置成 tail,然后,将其 tail 的 next 域
// 设置成 null
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
// 由于 node 不是 tail,所以其后面还有结点
// 需要被通知,所以 1. 设置 pred 的域为
// Node.SIGNAL,
// 2. 将 node 的 next 接到 pred 的 next 域
// 中
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 唤醒后继
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
Node
Node 中的 waitStatus 字段,5个字段。
SIGNAL(-1): 表示线程的successor线程需要被unparking
表示这个node的后继者(successor)是(将会是)blocked的(通过使用 park 方法),所以当前node在release或cancel锁的时候,必须unpark它的successor。
To avoid races, acquire methods must first indicate they need a signal, then retry the atomic acquire, and then, on failure, block.
CANCELLED(1): 表示线程被取消
因为超时或者中断,这个node被取消了。
CONDITION(-2): 表示线程在条件上等待
这个node当前在一个condition 队列中。它不会被当作同步队列结点(sync queue node),直到被 transferred(转移,调动),与此同时这个结点的 waitStatus 会被设置成 0.
PROPAGATE(传播,-3): 表示下一个 acquireShared 应该被无条件的传播。
Status field, taking on only the values:
SIGNAL: The successor of this node is (or will soon be) blocked (via park), so the current node must unpark its successor when it releases or cancels. To avoid races, acquire methods must first indicate they need a signal, then retry the atomic acquire, and then, on failure, block.
SIGNAL: 当前结点的后继结点正在(或将会)被阻塞(通过park),所以当 当前结点 在 release 或者 cancel 锁时,必须 unpark 其后继结点。
CANCELLED: This node is cancelled due to timeout or interrupt. Nodes never leave this state. In particular, a thread with cancelled node never again blocks.
CANCELLED: 这个结点因为超时或者中断被取消了。结点一般不会保留这个状态状态。事实上,被取消的线程将不会再被 block.
CONDITION: This node is currently on a condition queue. It will not be used as a sync queue node until transferred, at which time the status will be set to 0. (Use of this value here has nothing to do with the other uses of the field, but simplifies mechanics.)
CONDITION: 这个结点当前处于条件队列中。在被 transferred 之前,它不会被当前 同步队列 中的结点。transferred 之后,这个状态位会被设置成 0
PROPAGATE: A releaseShared should be propagated to other nodes. This is set (for head node only) in doReleaseShared to ensure propagation continues, even if other operations have since intervened.
PROPAGATE:
0: None of the above
The values are arranged numerically to simplify use. Non-negative values mean that a node doesn’t need to signal. So, most code doesn’t need to check for particular values, just for sign.
waitStatus的取值都是数字类型是为了简化操作。当一个结点的该值为 waitStatus > 0 时,表示这个结点不需要被 signal. 所以在代码中一般不需要检查某个特殊的值,只需要判断其符号即可。
The field is initialized to 0 for normal sync nodes, and CONDITION for condition nodes. It is modified using CAS (or when possible, unconditional volatile writes).
对于普通的 sync node 这个字段的默认初始值是 0.
对于 condition node 这个字段的默认初始值是 CONDITION. 这个字段在被修改时使用CAS,保证原子性。
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject
1 | public class ConditionObject implements Condition, java.io.Serializable |
这个类实现了 Condition 接口
java.util.concurrent.locks.Condition
Condition 接口将原来的 Object 的 monitor 方法(wait, notify and notifyAll)分离了出来,使得其和 Lock 对象独立。Lock 接口替代了 synchronized 方法和语句块,而 Condition 接口替代了 Object 对象的 monitor methods。
Conditions(也被称为 condition queue 或者 condition variables)提供了一种挂起线程的操作,直到其它的线程通知它可以继续了。
The key property that waiting for a condition provides is that it atomically releases the associated lock and suspends the current thread, just like Object.wait.
当在一个 condition 等待(wait)的时候,它其自动释放相关联的锁,然后挂起线程。就像 Object.wait 的行为那样。
condition的实例是内置于 lock中的。所以要获取一个condition,就需要调用 Lock 的 newCondition 方法。
对象结构
ConditionObject在使用时是天然线程安全的,因为它在lock的包围之中
关于 ConditionObject 的结构:ConditionObject 一个的使用在锁内部使用的,当一个对象调用 await 进行等待时,当前线程会放弃锁,而其它获得锁的线程,也有可以调用 锁的 ConditionObject 的方法,这样就会有多个线程在一个 条件对象 上等待,那这多个 等待对象 如果安置呢? Node 对象有一个
Node nextWaiter;
这个字段用来,构成一个单向链表,按时间顺序保存所有在这个条件对象上等待的线程。
ConditionObject对象中的
firstWaiter 指向 First node of condition queue.
lastWaiter 指向 Last node of condition queue.
await
使用当前线程处于,等待状态。
1 | public final void await() throws InterruptedException { |
- addConditionWaiter
Node node = new Node(Thread.currentThread(), Node.CONDITION);
其中,新添加的 node 的 waitStates 是 Node.CONDITION
- fullyRelease
核心代码如下:1
2
3int savedState = getState();
release(savedState)
return savedState
release通过传递参数 savedState,使得当前线程拥有的锁释放。注意,如果是重入的锁,这个调用也会将其release所以这个方法名为: fullyRelase.
- isOnSyncQueue
判断 node 是否已经被转移到 sync queue 中。
1 | final boolean isOnSyncQueue(Node node) { |
signal
1 | public final void signal() { |
doSignal
1 | private void doSignal(Node first) { |
从 first 结点开始后向遍历,直到,某一个结点上 transferForSignal 方法调用成功。最终,firstWaiter字段指向前面被通知的结点的 nextWaiter.
transferForSignal
signal要做的惟一的事就是将node从 condition queue 中 转移到 sync queue.
signal的逻辑应该是:
将当前 在 condition queue 中的 node 转移到 sync
queue 中设置 node 的前继的 waitStates,为 Node.SIGNAL 表示,其后继结点 需要在其上自旋等待。其实就是当前node需要重新请求锁了。
如果 node 的前继,被取消了(ws>0)或者 2 中的设置失败,直接强制 unpark 线程。
1 | final boolean transferForSignal(Node node) { |