They are well suited for handoff designs, in which an object running in one thread must sync up with an object running in another thread in order to hand it some information, event, or task.
这个类被用在 java.util.concurrent.Executors.newCachedThreadPool 方法中,这个方法用来创建线程池,这个方法的作用是用来创建一个可以执行短暂的异步任务的线程池。(These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks.)
ThreadPoolExecutor.ThreadPoolExecutor 查看文档,可以看到: Direct handoffs. A good default choice for a work queue is a SynchronousQueue that hands off tasks to threads without otherwise holding them. Here, an attempt to queue a task will fail if no threads are immediately available to run it, so a new thread will be constructed. This policy avoids lockups when handling sets of requests that might have internal dependencies. Direct handoffs generally require unbounded maximumPoolSizes to avoid rejection of new submitted tasks. This in turn admits the possibility of unbounded thread growth when commands continue to arrive on average faster than they can be processed.
构造过程
1 2 3 4 5 6
publicSynchronousQueue(boolean fair){
// 无参数的构造函数默认是: // new TransferStack() transferer = fair ? new TransferQueue() : new TransferStack(); }
Transferer
TransferStack
put过程
1 2 3 4 5 6 7
publicvoidput(E o)throws InterruptedException { if (o == null) thrownew NullPointerException(); if (transferer.transfer(o, false, 0) == null) { Thread.interrupted(); thrownew InterruptedException(); } }
Object transfer(Object e, boolean timed, long nanos){ /* * Basic algorithm is to loop trying one of three actions: * * 1. If apparently empty or already containing nodes of same * mode, try to push node on stack and wait for a match, * returning it, or null if cancelled. * * 2. If apparently containing node of complementary mode, * try to push a fulfilling node on to stack, match * with corresponding waiting node, pop both from * stack, and return matched item. The matching or * unlinking might not actually be necessary because of * other threads performing action 3: * * 3. If top of stack already holds another fulfilling node, * help it out by doing its match and/or pop * operations, and then continue. The code for helping * is essentially the same as for fulfilling, except * that it doesn't return the item. */
// (1) 确定 s: mode = DATA, next = head, item = e, SNode s = null; // constructed/reused as needed int mode = (e == null) ? REQUEST : DATA;
for (;;) { SNode h = head; // (2) 第一次 put, 直到 (casHead) 执行成功之前, // h == null 始终成立,所以进入下面的if语句。 if (h == null || h.mode == mode) { // empty or same-mode if (timed && nanos <= 0) { // can't wait if (h != null && h.isCancelled()) casHead(h, h.next); // pop cancelled node else returnnull; // (3) 创建 s = snode(s, e, h, mode) // 使用 casHead 进入设置 head,开始自旋,直到 // 下面的 if 成立, } elseif (casHead(h, s = snode(s, e, h, mode))) { // 经过数次自旋尝试之后,当前线程进入 block 状态 // 等待 一个线程的 take 操作。 SNode m = awaitFulfill(s, timed, nanos); if (m == s) { // wait was cancelled clean(s); returnnull; } if ((h = head) != null && h.next == s) casHead(h, s.next); // help s's fulfiller return (mode == REQUEST) ? m.item : s.item; } } elseif (!isFulfilling(h.mode)) { // try to fulfill if (h.isCancelled()) // already cancelled casHead(h, h.next); // pop and retry elseif (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { for (;;) { // loop until matched or waiters disappear SNode m = s.next; // m is s's match if (m == null) { // all waiters are gone casHead(s, null); // pop fulfill node s = null; // use new node next time break; // restart main loop } SNode mn = m.next; if (m.tryMatch(s)) { casHead(s, mn); // pop both s and m return (mode == REQUEST) ? m.item : s.item; } else// lost match s.casNext(m, mn); // help unlink } } } else { // help a fulfiller SNode m = h.next; // m is h's match if (m == null) // waiter is gone casHead(h, null); // pop fulfilling node else { SNode mn = m.next; if (m.tryMatch(h)) // help match casHead(h, mn); // pop both h and m else// lost match h.casNext(m, mn); // help unlink } } }
SNode awaitFulfill(SNode s, boolean timed, long nanos){ /* * When a node/thread is about to block, it sets its waiter * field and then rechecks state at least one more time * before actually parking, thus covering race vs * fulfiller noticing that waiter is non-null so should be * woken. * * When invoked by nodes that appear at the point of call * to be at the head of the stack, calls to park are * preceded by spins to avoid blocking when producers and * consumers are arriving very close in time. This can * happen enough to bother only on multiprocessors. * * The order of checks for returning out of main loop * reflects fact that interrupts have precedence over * normal returns, which have precedence over * timeouts. (So, on timeout, one last check for match is * done before giving up.) Except that calls from untimed * SynchronousQueue.{poll/offer} don't check interrupts * and don't wait at all, so are trapped in transfer * method rather than calling awaitFulfill. */ // lastTime = 0 long lastTime = timed ? System.nanoTime() : 0; Thread w = Thread.currentThread(); // h 上面 put 的 s SNode h = head; // 自旋次数: maxUntimedSpins int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); // 开始自旋循环,次数就是上面的 spins. // 也就是执行spins下面的循环。 for (;;) { if (w.isInterrupted()) s.tryCancel(); SNode m = s.match; // 当下面的 s.tryCancel 成功之后, m != null 就会成功 if (m != null) return m; // 如果设置了超时,则在自旋过程中判断 // 是否在自旋过程中就已经,超时了,如果超时,尝试 // 调用 tryCancel 来取消,当前结点在 队列中的等待 if (timed) { long now = System.nanoTime(); nanos -= now - lastTime; lastTime = now; if (nanos <= 0) { // 这个方法会在外层的 for 循环中不断循环执行 // 直到 将 s 的 match 字段设置成功, // 当 match 字段设置成功之后,继续 continue // 执行到上面的 if (m != null) 处 // 这个判断成功了,所以上面的 if (m != null) // 将成为 tryCancel 的出口,而不会因为 continue // 使当前线程陷入死循环 s.tryCancel(); continue; } }
// 减小自旋计数,直到 if (spins > 0) spins = shouldSpin(s) ? (spins-1) : 0; // 自旋次数已经足够了,设置 s 的 waiter 为当前线程 elseif (s.waiter == null) s.waiter = w; // establish waiter so can park next iter // 没有设置超时,直接 park elseif (!timed) LockSupport.park(this); // 否则,依时间进行 park elseif (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } }
take 过程
1 2 3 4 5 6 7
public E take()throws InterruptedException { Object e = transferer.transfer(null, false, 0); if (e != null) return (E)e; Thread.interrupted(); thrownew InterruptedException(); }
for (;;) { // loop until matched or waiters disappear
SNode m = s.next; // m is s's match
// 表示已经循环到栈底了,所以 // all waiters are gone pop fulfill node // 所以使用 casHead 将 stack 重置(head 设置成 null) // s = null 这条语句应该是 help for GC if (m == null) { // all waiters are gone casHead(s, null); // pop fulfill node s = null; // use new node next time break; // restart main loop }
// 找到 m 的 next 结点 SNode mn = m.next; if (m.tryMatch(s)) { // 尝试匹配 m 和 s // 如果成功,将 s 和 m 弹出栈, casHead(s, mn); // pop both s and m // 如果 s.mode == REQUEST, 则 m 结点应该是 put 操作, // 所以 返回 m.item, 否则 s 是 put 结点,则返回 s.item // 由此可知,不管是 put 还是 take 的 transfer 操作都将 // 返回 操作的 item return (mode == REQUEST) ? m.item : s.item; } else// lost match s.casNext(m, mn); // help unlink }
booleantryMatch(SNode s){ // 所有并发调用 put 和 take 的线程,最终将按照调用的时间顺序 // 进入 stack 中,所以,其实对于下面的方法,对于具体的某个结点 // 是没有竞争的,所以下面的compareAndSwapObject方法调用一般都会直接成功 if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) { Thread w = waiter; if (w != null) { // waiters need at most one unpark waiter = null; // 唤醒线程 w LockSupport.unpark(w); } returntrue; } return match == s; }