并发容器-BlockingQueue(1)-ArrayBlockingQueue
BlockingQueue
API
BlockingQueue methods come in four forms, with different ways of handling operations that cannot be satisfied immediately, but may be satisfied at some point in the future:
one throws an exception,
the second returns a special value (either null or false, depending on the operation),
the third blocks the current thread indefinitely until the operation can succeed,
and the fourth blocks for only a given maximum time limit before giving up.
These methods are summarized in the following table:
Throws exception | Special value | Blocks | Times out | |
---|---|---|---|---|
Insert | add(e) | offer(e) | put(e) | offer(e, time, unit) |
Remove | remove() | poll() | take() | poll(time, unit) |
Examine | elemnt() | peek() | N/A | N/A |
对 null 的支持
A BlockingQueue does not accept null elements.
应用场景
BlockingQueue接口的实现主要的应用场景是 producer-consumer 队列。但是他是支持集合类接口,其中有一个 remove(x)这个方法可以用来删除队列中的任意元素,但是这类集合类操作的实现性能并不是特别高,所以这类操作通常使用在特殊的场景下,例如队列中的message可以被 cancell,这样就需要remove来实现了。
线程安全
BlockingQueue的实现类应该是线程安全的。对于 队列方法 的实现是原子操作,也就是线程安全的。而对于批量操作(bulk operation)可以不是线程安全,例如: for addAll(c) to fail (throwing an exception) after adding only some of the elements in c.
内存一致性
As with other concurrent collections, actions in a thread prior to placing an object into a BlockingQueue happen-before actions subsequent to the access or removal of that element from the BlockingQueue in another thread.
elements A put in queue happen-before elements A access by another thread.
ArrayBlockingQueue
固定容量的Queue
初始化
1 | public ArrayBlockingQueue(int capacity, boolean fair) { |
- lock: Main lock guarding all access
- notEmpty: Condition for waiting takes until the queue is not empty
- notFull: Condition for waiting puts until the queue is not full
- count: Number of elements in the queue
- takeIndex: items index for next take, poll, peek or remove
- putIndex: items index for next put, offer, or add
put
1 | public void put(E e) throws InterruptedException { |
insert
1 | private void insert(E x) { |
take
1 | public E take() throws InterruptedException { |
extract
1 | private E extract() { |
add
1 | public boolean add(E e) { |
remove
1 | public boolean remove(Object o) { |
clear
清除队列中的所有元素,将内部的状态变量恢复到初始状态。
drainTo
这个方法的作用是将当前queue中的元素按照FIFO的顺序插入到参数Collection中。
1 | public int drainTo(Collection<? super E> c) { |
iterator
iterator是其创建时刻时的queue的一个快照,它所持有的关于queue的状态信息,只来自于创建的时刻,至于之后queue是否发生变化iterator并不关心。
这个类提供的 iterator 是具有弱一致性,同时它也仅仅代表iterator 被创建的时刻的 queue 的状态,
1 | // 构造方法 |
由 next 方法实现可以确定,这个iterator返回的是queue的快照元素,因为在并发的情况下,nextItem 记录的元素很有可能已经被消费,而 next 方法却依旧会返回它。
这也说 iterator 是弱一致性的,iterator在循环过程中可以容忍并发地对 queue 进行修改,而不会抛出ConcurrentModificationException。
bulk operator
ArrayBlockingQueue类没有重写 addAll, containsAll, retainAll and removeAll 这四个批量操作方法,所有虽然其中的 add, contains 方法是原子操作,但是这些批量操作方法却是通过循环来完成,所以它们并不是原子操作。