并发容器-BlockingQueue(1)-ArrayBlockingQueue
BlockingQueue
API
BlockingQueue methods come in four forms, with different ways of handling operations that cannot be satisfied immediately, but may be satisfied at some point in the future:
one throws an exception,
the second returns a special value (either null or false, depending on the operation),
the third blocks the current thread indefinitely until the operation can succeed,
and the fourth blocks for only a given maximum time limit before giving up.
These methods are summarized in the following table:
Throws exception | Special value | Blocks | Times out | |
---|---|---|---|---|
Insert | add(e) | offer(e) | put(e) | offer(e, time, unit) |
Remove | remove() | poll() | take() | poll(time, unit) |
Examine | elemnt() | peek() | N/A | N/A |
对 null 的支持
A BlockingQueue does not accept null elements.
应用场景
BlockingQueue接口的实现主要的应用场景是 producer-consumer 队列。但是他是支持集合类接口,其中有一个 remove(x)这个方法可以用来删除队列中的任意元素,但是这类集合类操作的实现性能并不是特别高,所以这类操作通常使用在特殊的场景下,例如队列中的message可以被 cancell,这样就需要remove来实现了。
线程安全
BlockingQueue的实现类应该是线程安全的。对于 队列方法 的实现是原子操作,也就是线程安全的。而对于批量操作(bulk operation)可以不是线程安全,例如: for addAll(c) to fail (throwing an exception) after adding only some of the elements in c.
内存一致性
As with other concurrent collections, actions in a thread prior to placing an object into a BlockingQueue happen-before actions subsequent to the access or removal of that element from the BlockingQueue in another thread.
elements A put in queue happen-before elements A access by another thread.
ArrayBlockingQueue
固定容量的Queue
初始化
1 | public ArrayBlockingQueue(int capacity, boolean fair) { |
- lock: Main lock guarding all access
- notEmpty: Condition for waiting takes until the queue is not empty
- notFull: Condition for waiting puts until the queue is not full
- count: Number of elements in the queue
- takeIndex: items index for next take, poll, peek or remove
- putIndex: items index for next put, offer, or add
put
1 | public void put(E e) throws InterruptedException { |
insert
1 | private void insert(E x) { |
take
1 | public E take() throws InterruptedException { |
extract
1 | private E extract() { |
add
1 | public boolean add(E e) { |
remove
1 | public boolean remove(Object o) { |
clear
清除队列中的所有元素,将内部的状态变量恢复到初始状态。
drainTo
这个方法的作用是将当前queue中的元素按照FIFO的顺序插入到参数Collection中。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34public int drainTo(Collection<? super E> c) {
checkNotNull(c);
if (c == this)
throw new IllegalArgumentException();
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = takeIndex;
int n = 0;
int max = count;
while (n < max) {
// 按照 FIFO 顺序
c.add(this.<E>cast(items[i]));
items[i] = null;
i = inc(i);
++n;
}
if (n > 0) {
// 只有一种情况会出现 n<=0
// 就是 count = 0, 也就是
// 当前 queue 中并没有元素,
// 所以也就不需要,进行状态初始化操作了
count = 0;
putIndex = 0;
takeIndex = 0;
// 因为queue中没有元素了,所以
// 通知所有在 notFull 条件上 wait 的线程
notFull.signalAll();
}
return n;
} finally {
lock.unlock();
}
iterator
iterator是其创建时刻时的queue的一个快照,它所持有的关于queue的状态信息,只来自于创建的时刻,至于之后queue是否发生变化iterator并不关心。
这个类提供的 iterator 是具有弱一致性,同时它也仅仅代表iterator 被创建的时刻的 queue 的状态,
1 | // 构造方法 |
由 next 方法实现可以确定,这个iterator返回的是queue的快照元素,因为在并发的情况下,nextItem 记录的元素很有可能已经被消费,而 next 方法却依旧会返回它。
这也说 iterator 是弱一致性的,iterator在循环过程中可以容忍并发地对 queue 进行修改,而不会抛出ConcurrentModificationException。
bulk operator
ArrayBlockingQueue类没有重写 addAll, containsAll, retainAll and removeAll 这四个批量操作方法,所有虽然其中的 add, contains 方法是原子操作,但是这些批量操作方法却是通过循环来完成,所以它们并不是原子操作。