BlockingQueue 和 BlockingDeque 内部实现分析

BlockingQueue 介绍

BlockingQueue 继承自 Queue 接口,下面看看阻塞队列提供的接口;
1
public interface BlockingQueue<E> extends Queue<E> {
2
/**
3
* 插入数据到队列尾部(如果立即可行且不会超过该队列的容量)
4
* 在成功时返回 true,如果此队列已满,则抛IllegalStateException。(与offer方法的区别)
5
*/
6
boolean add(E e);
7
8
/**
9
* 插入数据到队列尾部,如果没有空间,直接返回false;
10
* 有空间直接插入,返回true。
11
*/
12
boolean offer(E e);
13
14
/**
15
* 插入数据到队列尾部,如果队列没有空间,一直阻塞;
16
* 有空间直接插入。
17
*/
18
void put(E e) throws InterruptedException;
19
20
/**
21
* 插入数据到队列尾部,如果没有额外的空间,等待一定的时间,有空间即插入,返回true,
22
* 到时间了,还是没有额外空间,返回false。
23
*/
24
boolean offer(E e, long timeout, TimeUnit unit)
25
throws InterruptedException;
26
27
/**
28
* 取出和删除队列中的头元素,如果没有数据,会一直阻塞到有数据
29
*/
30
E take() throws InterruptedException;
31
32
/**
33
* 取出和删除队列中的头元素,如果没有数据,需要会阻塞一定的时间,过期了还没有数据,返回null
34
*/
35
E poll(long timeout, TimeUnit unit)
36
throws InterruptedException;
37
38
//除了上述方法还有继承自Queue接口的方法
39
/**
40
* 取出和删除队列头元素,如果是空队列直接返回null。
41
*/
42
E poll();
43
44
/**
45
* 取出但不删除头元素,该方法与peek方法的区别是当队列为空时会抛出NoSuchElementException异常
46
*/
47
E element();
48
49
/**
50
* 取出但不删除头元素,空队列直接返回null
51
*/
52
E peek();
53
54
/**
55
* 返回队列总额外的空间
56
*/
57
int remainingCapacity();
58
59
/**
60
* 删除队列中存在的元素
61
*/
62
boolean remove(Object o);
63
64
/**
65
* 判断队列中是否存在当前元素
66
*/
67
boolean contains(Object o);
68
69
}
Copied!
  • 插入方法
add(E e): 添加成功返回true,失败抛IllegalStateException异常
offer(E e): 成功返回 true,如果此队列已满,则返回 false。
put(E e): 将元素插入此队列的尾部,如果该队列已满,则一直阻塞
  • 删除方法
remove(Object o): 移除指定元素,成功返回true,失败返回false
poll(): 获取并移除此队列的头元素,若队列为空,则返回 null
take(): 获取并移除此队列的头元素,若队列为空,则一直阻塞
  • 检查方法
peek(): 获取但不移除此队列的头元素,没有元素则抛NoSuchElementException异常
element(): 获取但不移除此队列的头;若队列为空,则返回 null。

ArrayBlockingQueue

ArrayBlockingQueue() 是一个用数组实现的有界阻塞队列,内部按先进先出的原则对元素进行排序; 其中 put 方法和 take 方法为添加和删除元素的阻塞方法。
ArrayBlockingQueue 实现的生产者消费者的 Demo,代码只是一个简单的 ArrayBlockingQueue 的 使用,Consumer 消费者和 Producer 生产者通过 ArrayBlockingQueue 来获取(take)和添加(put) 数据。具体代码请访问:ABQ demo
ArrayBlockingQueue 内部的阻塞队列是通过 ReentrantLock 和 Condition 条件队列实现的, 所以 ArrayBlockingQueue 中的元素存在公平和非公平访问的区别,这是因为 ReentrantLock 里面存在公平锁和非公平锁的原因, ReentrantLock 的具体分析会在 Lock 章节进行具体分析的; 对于 Lock 是公平锁的时候, 则被阻塞的队列可以按照阻塞的先后顺序访问队列,Lock 是非公平锁的时候, 阻塞的线程将进入争夺锁资源的过程中,谁先抢到锁就可以先执行,没有固定的先后顺序。
下面对 ArrayBlockingQueue 构造方法进行分析:
1
/**
2
* 创建一个具体容量的队列,默认是非公平队列
3
*/
4
public ArrayBlockingQueue(int capacity) {
5
this(capacity, false);
6
}
7
8
/**
9
* 创建一个具体容量、是否公平的队列
10
*/
11
public ArrayBlockingQueue(int capacity, boolean fair) {
12
if (capacity <= 0)
13
throw new IllegalArgumentException();
14
this.items = new Object[capacity];
15
lock = new ReentrantLock(fair);
16
notEmpty = lock.newCondition();
17
notFull = lock.newCondition();
18
}
Copied!
ArrayBlockingQueue 除了实现上述 BlockingQueue 接口的方法,其他方法介绍如下:
1
//返回队列剩余容量
2
public int remainingCapacity()
3
4
// 判断队列中是否存在当前元素o
5
public boolean contains(Object o)
6
7
// 返回一个按正确顺序,包含队列中所有元素的数组
8
public Object[] toArray()
9
10
// 返回一个按正确顺序,包含队列中所有元素的数组;数组的运行时类型是指定数组的运行时类型
11
@SuppressWarnings("unchecked")
12
public <T> T[] toArray(T[] a)
13
14
15
// 自动清空队列中的所有元素
16
public void clear()
17
18
// 移除队列中所有可用元素,并将他们加入到给定的 Collection 中
19
public int drainTo(Collection<? super E> c)
20
21
// 从队列中最多移除指定数量的可用元素,并将他们加入到给定的 Collection 中
22
public int drainTo(Collection<? super E> c, int maxElements)
23
24
// 返回此队列中按正确顺序进行迭代的,包含所有元素的迭代器
25
public Iterator<E> iterator()
Copied!

ArrayBlockingQueue 源码和实现原理分析

内部成员变量分析

1
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
2
implements BlockingQueue<E>, java.io.Serializable {
3
4
/** 存储数据的数组 */
5
final Object[] items;
6
7
/** 获取数据的索引,用于下次 take, poll, peek or remove 等方法 */
8
int takeIndex;
9
10
/** 添加元素的索引, 用于下次 put, offer, or add 方法 */
11
int putIndex;
12
13
/** 队列元素的个数 */
14
int count;
15
16
/*
17
* 并发控制使用任何教科书中的经典双条件算法
18
*/
19
20
/** 控制并发访问的锁 */
21
final ReentrantLock lock;
22
23
/** 非空条件对象,用于通知 take 方法中在等待获取数据的线程,队列中已有数据,可以执行获取操作 */
24
private final Condition notEmpty;
25
26
/** 未满条件对象,用于通知 put 方法中在等待添加数据的线程,队列未满,可以执行添加操作 */
27
private final Condition notFull;
28
29
/** 迭代器 */
30
transient Itrs itrs = null;
31
}
Copied!
从上面成员变量中可以看出,内部使用数组对象 items 来存储所有的数据;通过同一个 ReentrantLock 来同时控制添加数据线程和移除数据线程的并发访问,这个与 LinkedBlockingQueue 有很大区别(下面会进行分析)。
对于 notEmpty 条件对象是用于存放等待调用(此时队列中没有数据) take 方法的线程,这些线程会加入到 notEmpty 条件对象的等待队列(单链表)中,同时当队列中有数据后会通过 notEmpty 条件对象唤醒等待队列(链表)中等待的线程(链表中第一个non-null 且 status 为 Condition 的线程)去 take 数据。
对于 notFull 条件对象是用于存放等待调用(此时队列容量已满) put 方法的线程,这些线程会加入到 notFull 条件对象的等待队列(单链表)中,同时当队列中数据被消费后会通过 notFull 条件对象唤醒等待队列(链表)中等待的线程去 put 数据。takeIndex 表示的是下一个(take、poll、peek、remove)方法被调用时获取数组元素的索引,putIndex 表示的是下一个(put、offer、add)被调用时添加元素的索引。
数据出队、入队操作如下:

添加(阻塞添加)的实现分析

1
/**
2
* 在当前 put 位置插入数据,put 位置前进一位,
3
* 同时唤醒 notEmpty 条件对象等待队列(链表)中第一个可用线程去 take 数据。
4
* 当然这一系列动作只有该线程获取锁的时候才能进行,即只有获取锁的线程
5
* 才能执行 enqueue 操作。
6
*/
7
// 元素统一入队操作
8
private void enqueue(E x) {
9
// assert lock.getHoldCount() == 1;
10
// assert items[putIndex] == null;
11
final Object[] items = this.items;
12
items[putIndex] = x; // putIndex 位置添加数据
13
//putIndex 进行自增,当达到数组长度的时候,putIndex 重头再来,即设置为0
14
//为什么呢?下面会具体介绍
15
if (++putIndex == items.length)
16
putIndex = 0;
17
count++; //元素个数自增
18
notEmpty.signal(); //添加完数据后,说明数组中有数据了,所以可以唤醒 notEmpty 条件对象等待队列(链表)中第一个可用线程去 take 数据
19
}
20
21
// 添加数据,数组中元素已满时,直接返回 false。
22
public boolean offer(E e) {
23
checkNotNull(e);
24
final ReentrantLock lock = this.lock;
25
// 获取锁,保证线程安全
26
lock.lock();
27
try {
28
// 当数组元素个数已满时,直接返回false
29
if (count == items.length)
30
return false;
31
else {
32
// 执行入队操作,enqueue 方法在上面分析了
33
enqueue(e);
34
return true;
35
}
36
} finally {
37
// 释放锁,保证其他等待锁的线程可以获取到锁
38
// 为什么放到 finally (避免死锁)
39
lock.unlock();
40
}
41
}
42
43
// add 方法其实就是调用了 offer 方法来实现,
44
// 与 offer 方法的区别就是 offer 方法数组满,抛出 IllegalStateException 异常。
45
public boolean add(E e) {
46
if (offer(e))
47
return true;
48
else
49
throw new IllegalStateException("Queue full");
50
}
Copied!
offer 方法和 add 方法实现很简单,大家只需要知道其区别就好了;这里着重讲一下 enqueue 方法里面留下的疑问,为什么当 putIndex 到了数组最后一个元素之后,是重头再来,设置为0;首先,你要想到 ArrayBlockingQueue 整个入队和出队操作都是线程安全的,而且 ArrayBlockingQueue 也是先进先出的队列;所以想一想,是不是数据入队后,从第一个数组位置上开始添加数据,依次往后入队;数据出队也是从数组第一个位置出队,出队后该位置数据为空,依次出队,然后这些位置数据都为空;所以只要 count 的个数没有达到数组长度时,虽然 putIndex 达到了数组长度,说明数组前面的位置上已经有数据出队了,所以添加元素,是不是就从头开始就行了(想明白了其实就很简单了,哈哈)。因为我们有一个 count 成员变量来记录元素的个数,当队列已满时,put 操作是会阻塞,add 操作会抛出异常,offer 操作会直接返回false;因此我们也不用担心数据会覆盖。这个 putIndex 和 takeIndex 达到数据长度后都会重新设置为0,重头开始再获取数据,整个过程就是一个无限循环的过程。 通过分析,我们发现有添加操作是不是有两种场景,一个是直接往后添加,一个是达到数据长度后,需要重头再来,
具体操作如下图:
下面看看阻塞添加方法(put)
1
/**
2
* 插入数据到队列尾部,如果队列已满,阻塞等待空间
3
*/
4
public void put(E e) throws InterruptedException {
5
checkNotNull(e);
6
final ReentrantLock lock = this.lock;
7
// 获取锁,期间线程可以打断,打断则不会添加
8
lock.lockInterruptibly();
9
try {
10
// 通过上述分析,我们通过 count 来判断数组中元素个数
11
while (count == items.length)
12
notFull.await(); // 元素已满,线程挂起,线程加入 notFull 条件对象等待队列(链表)中,等待被唤醒
13
enqueue(e); // 队列未满,直接执行入队操作
14
} finally {
15
lock.unlock();
16
}
17
}
Copied!
通过源码分析,发现 offer, add 都是无阻塞添加方法,两者的具体区别在上面分析过了;而 put 方法确实是一个阻塞方法,当队列已满的时候,线程会挂起,然后将该线程加入到 notFull 条件对象的等待队列(链表)中;notFull 条件对象有两种情况,第一种是当队列已满,新来的 put 数据的线程会加入到其等待队列(链表)中,第二种情况是,当队列有空间时,会移除队列中的线程,移除成功同时唤醒 put 线程,加入到获取 lock 的等待队列(双链表)的尾部。
具体操作,如下图:
通过以上分析,ArrayBlockingQueue 的 offer、 add、 put 方法已经都详细分析完毕,希望大家可以对其有深入的了解。

提取(阻塞提取)的实现分析

提取即移除数组中的元素,下面我们具体来分析 ArrayBlockingQueue 的提取数组中元素的操作。
同上分析,我们首先从 dequeue 方法分析开始。
1
/**
2
* 提取 takeIndex 位置上的元素, 然后 takeIndex 前进一位,
3
* 同时唤醒 notFull 等待队列(链表)中的第一个可用线程去 put 数据。
4
* 这些操作都是在当前线程获取到锁的前提下进行的,
5
* 同时也说明了 dequeue 方法线程安全的。
6
*/
7
private E dequeue() {
8
// assert lock.getHoldCount() == 1;
9
// assert items[takeIndex] != null;
10
final Object[] items = this.items;
11
@SuppressWarnings("unchecked")
12
E x = (E) items[takeIndex]; // 提取 takeIndex位置上的数据
13
items[takeIndex] = null; // 同时清空数组在 takeIndex 位置上的数据
14
// takeIndex 向前前进一位,如果前进后位置超过了数组的长度,则将其设置为0;
15
// 为什么设置为0,理由在 putIndex 设置为0的时候介绍过了,原因是一样的。
16
if (++takeIndex == items.length)
17
takeIndex = 0;
18
count--; // 同时数组的元素个数进行减1
19
if (itrs != null)
20
itrs.elementDequeued(); // 同时更新迭代器中的元素,迭代器的具体分析会在下面单独整理
21
notFull.signal(); // 提取完数据后,说明数组中有空位,所以可以唤醒 notFull 条件对象的等待队列(链表)中的第一个可用线程去 put 数据
22
return x;
23
}
24
25
// 提取数据,数组中数据为空时,直接返回 null
26
public E poll() {
27
final ReentrantLock lock = this.lock;
28
lock.lock(); // 加锁,前面也分析过,要执行 dequeue操作时,当前线程必须获取锁,保证线程安全
29
try {
30
return (count == 0) ? null : dequeue(); // 元素个数为0时,直接返回 null,不为0时,元素出队
31
} finally {
32
// 释放锁,在 finally 中释放可以避免死锁
33
lock.unlock();
34
}
35
}
Copied!
上面 poll() 方法分析得很清晰了,内部通过 dequeue 删除队列头元素。下面分析下 peek 方法,与 poll 有较大的区别。
1
// 返回数组上第 i 个元素
2
final E itemAt(int i) {
3
return (E) items[i];
4
}
5
6
/**
7
* 通过代码可以看到,peek 是获取元素,而不是提取, 不会删除 takeIndex 位置上的数据。
8
* 内部通过 itemAt 方法实现,而不是 dequeue 方法。
9
*/
10
public E peek() {
11
final ReentrantLock lock = this.lock;
12
lock.lock();
13
try {
14
return itemAt(takeIndex); //当队列为空时,返回 null
15
} finally {
16
lock.unlock();
17
}
18
}
Copied!
通过上述代码,可以看出 peek 和 poll 的区别,peek 是获取元素,不会删除 takeIndex 位置原有的数据,takeIndex 也不会向前前进一位。
下面来分析下阻塞提取 take 方法:
1
// 从队列头部提取数据,队列中没有元素则阻塞,阻塞期间线程可中断
2
public E take() throws InterruptedException {
3
final ReentrantLock lock = this.lock;
4
lock.lockInterruptibly(); //获取锁,期间线程可以打断,打断则不会提取
5
try {
6
// 元素为0时,当有线程提取元素,则将该线程加入到 notEmpty 条件对象的等待队列中,
7
// 直到当队列中有数据之后,会唤醒该线程去提取数据。
8
while (count == 0)
9
notEmpty.await();
10
return dequeue(); // 若有数据,直接调用 dequeue 提取数据
11
} finally {
12
lock.unlock();
13
}
14
}
Copied!
其实分析完阻塞添加 put 方法后,再来看 take 方法,发现也是非常简单的,队列中有元素,直接提取,没有元素则线程阻塞(可中断的阻塞),将该线程加入到 notEmpty 条件对象的等待队列中;等有新的 put 线程添加了数据,分析发现,会在 put 操作中唤醒 notEmpty 条件对象的等待队列中的 take 线程,去执行 take 操作。
具体操作如下图:
通过以上分析,我们把 poll、take 提取元素的方法分析了,也把 peek 获取元素的方法分析了,我们使用的时候,根据具体的场景使用具体的方法。
分析完提取方法后,我们来分析一下 ArrayBlockingQueue 中的删除元素的 remove 方法。
1
void removeAt(final int removeIndex) {
2
// assert lock.getHoldCount() == 1;
3
// assert items[removeIndex] != null;
4
// assert removeIndex >= 0 && removeIndex < items.length;
5
final Object[] items = this.items;
6
if (removeIndex == takeIndex) {
7
// removing front item; just advance
8
items[takeIndex] = null;
9
if (++takeIndex == items.length)
10
takeIndex = 0;
11
count--;
12
if (itrs != null)
13
itrs.elementDequeued();
14
} else {
15
// an "interior" remove
16
17
// slide over all others up through putIndex.
18
final int putIndex = this.putIndex;
19
for (int i = removeIndex;;) {
20
int next = i + 1;
21
if (next == items.length)
22
next = 0;
23
if (next != putIndex) {
24
items[i] = items[next];
25
i = next;
26
} else {
27
items[i] = null;
28
this.putIndex = i;
29
break;
30
}
31
}
32
count--;
33
if (itrs != null)
34
itrs.removedAt(removeIndex);
35
}
36
notFull.signal();
37
}
38
39
public boolean remove(Object o) {
40
if (o == null) return false;
41
final Object[] items = this.items;
42
final ReentrantLock lock = this.lock;
43
lock.lock();
44
try {
45
if (count > 0) {
46
final int putIndex = this.putIndex;
47
int i = takeIndex;
48
do {
49
if (o.equals(items[i])) {
50
removeAt(i);
51
return true;
52
}
53
if (++i == items.length)
54
i = 0;
55
} while (i != putIndex);
56
}
57
return false;
58
} finally {
59
lock.unlock();
60
}
61
}
Copied!

ArrayBlockingQueue 的迭代器分析

1
private class Itr implements Iterator<E> {
2
/** Index to look for new nextItem; NONE at end */
3
private int cursor;
4
5
/** Element to be returned by next call to next(); null if none */
6
private E nextItem;
7
8
/** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
9
private int nextIndex;
10
11
/** Last element returned; null if none or not detached. */
12
private E lastItem;
13
14
/** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
15
private int lastRet;
16
17
/** Previous value of takeIndex, or DETACHED when detached */
18
private int prevTakeIndex;
19
20
/** Previous value of iters.cycles */
21
private int prevCycles;
22
23
/** Special index value indicating "not available" or "undefined" */
24
private static final int NONE = -1;
25
26
/**
27
* Special index value indicating "removed elsewhere", that is,
28
* removed by some operation other than a call to this.remove().
29
*/
30
private static final int REMOVED = -2;
31
32
/** Special value for prevTakeIndex indicating "detached mode" */
33
private static final int DETACHED = -3;
34
35
Itr() {
36
// assert lock.getHoldCount() == 0;
37
lastRet = NONE;
38
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
39
lock.lock();
40
try {
41
if (count == 0) {
42
// assert itrs == null;
43
cursor = NONE;
44
nextIndex = NONE;
45
prevTakeIndex = DETACHED;
46
} else {
47
final int takeIndex = ArrayBlockingQueue.this.takeIndex;
48
prevTakeIndex = takeIndex;
49
nextItem = itemAt(nextIndex = takeIndex);
50
cursor = incCursor(takeIndex);
51
if (itrs == null) {
52
itrs = new Itrs(this);
53
} else {
54
itrs.register(this); // in this order
55
itrs.doSomeSweeping(false);
56
}
57
prevCycles = itrs.cycles;
58
// assert takeIndex >= 0;
59
// assert prevTakeIndex == takeIndex;
60
// assert nextIndex >= 0;
61
// assert nextItem != null;
62
}
63
} finally {
64
lock.unlock();
65
}
66
}
67
68
boolean isDetached() {
69
// assert lock.getHoldCount() == 1;
70
return prevTakeIndex < 0;
71
}
72
73
private int incCursor(int index) {
74
// assert lock.getHoldCount() == 1;
75
if (++index == items.length)
76
index = 0;
77
if (index == putIndex)
78
index = NONE;
79
return index;
80
}
81
82
/**
83
* Returns true if index is invalidated by the given number of
84
* dequeues, starting from prevTakeIndex.
85
*/
86
private boolean invalidated(int index, int prevTakeIndex,
87
long dequeues, int length) {
88
if (index < 0)
89
return false;
90
int distance = index - prevTakeIndex;
91
if (distance < 0)
92
distance += length;
93
return dequeues > distance;
94
}
95
96
/**
97
* Adjusts indices to incorporate all dequeues since the last
98
* operation on this iterator. Call only from iterating thread.
99
*/
100
private void incorporateDequeues() {
101
// assert lock.getHoldCount() == 1;
102
// assert itrs != null;
103
// assert !isDetached();
104
// assert count > 0;
105
106
final int cycles = itrs.cycles;
107
final int takeIndex = ArrayBlockingQueue.this.takeIndex;
108
final int prevCycles = this.prevCycles;
109
final int prevTakeIndex = this.prevTakeIndex;
110
111
if (cycles != prevCycles || takeIndex != prevTakeIndex) {
112
final int len = items.length;
113
// how far takeIndex has advanced since the previous
114
// operation of this iterator
115
long dequeues = (cycles - prevCycles) * len
116
+ (takeIndex - prevTakeIndex);
117
118
// Check indices for invalidation
119
if (invalidated(lastRet, prevTakeIndex, dequeues, len))
120
lastRet = REMOVED;
121
if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
122
nextIndex = REMOVED;
123
if (invalidated(cursor, prevTakeIndex, dequeues, len))
124
cursor = takeIndex;
125
126
if (cursor < 0 && nextIndex < 0 && lastRet < 0)
127
detach();
128
else {
129
this.prevCycles = cycles;
130
this.prevTakeIndex = takeIndex;
131
}
132
}
133
}
134
135
/**
136
* Called when itrs should stop tracking this iterator, either
137
* because there are no more indices to update (cursor < 0 &&
138
* nextIndex < 0 && lastRet < 0) or as a special exception, when
139
* lastRet >= 0, because hasNext() is about to return false for the
140
* first time. Call only from iterating thread.
141
*/
142
private void detach() {
143
// Switch to detached mode
144
// assert lock.getHoldCount() == 1;
145
// assert cursor == NONE;
146
// assert nextIndex < 0;
147
// assert lastRet < 0 || nextItem == null;
148
// assert lastRet < 0 ^ lastItem != null;
149
if (prevTakeIndex >= 0) {
150
// assert itrs != null;
151
prevTakeIndex = DETACHED;
152
// try to unlink from itrs (but not too hard)
153
itrs.doSomeSweeping(true);
154
}
155
}
156
157
/**
158
* For performance reasons, we would like not to acquire a lock in
159
* hasNext in the common case. To allow for this, we only access
160
* fields (i.e. nextItem) that are not modified by update operations
161
* triggered by queue modifications.
162
*/
163
public boolean hasNext() {
164
// assert lock.getHoldCount() == 0;
165
if (nextItem != null)
166
return true;
167
noNext();
168
return false;
169
}
170
171
private void noNext() {
172
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
173
lock.lock();
174
try {
175
// assert cursor == NONE;
176
// assert nextIndex == NONE;
177
if (!isDetached()) {
178
// assert lastRet >= 0;
179
incorporateDequeues(); // might update lastRet
180
if (lastRet >= 0) {
181
lastItem = itemAt(lastRet);
182
// assert lastItem != null;
183
detach();
184
}
185
}
186
// assert isDetached();
187
// assert lastRet < 0 ^ lastItem != null;
188
} finally {
189
lock.unlock();
190
}
191
}
192
193
public E next() {
194
// assert lock.getHoldCount() == 0;
195
final E x = nextItem;
196
if (x == null)
197
throw new NoSuchElementException();
198
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
199
lock.lock();
200
try {
201
if (!isDetached())
202
incorporateDequeues();
203
// assert nextIndex != NONE;
204
// assert lastItem == null;
205
lastRet = nextIndex;
206
final int cursor = this.cursor;
207
if (cursor >= 0) {
208
nextItem = itemAt(nextIndex = cursor);
209
// assert nextItem != null;
210
this.cursor = incCursor(cursor);
211
} else {
212
nextIndex = NONE;
213
nextItem = null;
214
}
215
} finally {
216
lock.unlock();
217
}
218
return x;
219
}
220
221
public void remove() {
222
// assert lock.getHoldCount() == 0;
223
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
224
lock.lock();
225
try {
226
if (!isDetached())
227
incorporateDequeues(); // might update lastRet or detach
228
final int lastRet = this.lastRet;
229
this.lastRet = NONE;
230
if (lastRet >= 0) {
231
if (!isDetached())
232
removeAt(lastRet);
233
else {
234
final E lastItem = this.lastItem;
235
// assert lastItem != null;
236
this.lastItem = null;
237
if (itemAt(lastRet) == lastItem)
238
removeAt(lastRet);
239
}
240
} else if (lastRet == NONE)
241
throw new IllegalStateException();
242
// else lastRet == REMOVED and the last returned element was
243
// previously asynchronously removed via an operation other
244
// than this.remove(), so nothing to do.
245
246
if (cursor < 0 && nextIndex < 0)
247
detach();
248
} finally {
249
lock.unlock();
250
// assert lastRet == NONE;
251
// assert lastItem == null;
252
}
253
}
254
255
/**
256
* Called to notify the iterator that the queue is empty, or that it
257
* has fallen hopelessly behind, so that it should abandon any
258
* further iteration, except possibly to return one more element
259
* from next(), as promised by returning true from hasNext().
260
*/
261
void shutdown() {
262
// assert lock.getHoldCount() == 1;
263
cursor = NONE;
264
if (nextIndex >= 0)
265
nextIndex = REMOVED;
266
if (lastRet >= 0) {
267
lastRet = REMOVED;
268
lastItem = null;
269
}
270
prevTakeIndex = DETACHED;
271
// Don't set nextItem to null because we must continue to be
272
// able to return it on next().
273
//
274
// Caller will unlink from itrs when convenient.
275
}
276
277
private int distance(int index, int prevTakeIndex, int length) {
278
int distance = index - prevTakeIndex;
279
if (distance < 0)
280
distance += length;
281
return distance;
282
}
283
284
/**
285
* Called whenever an interior remove (not at takeIndex) occurred.
286
*
287
* @return true if this iterator should be unlinked from itrs
288
*/
289
boolean removedAt(int removedIndex) {
290
// assert lock.getHoldCount() == 1;
291
if (isDetached())
292
return true;
293
294
final int cycles = itrs.cycles;
295
final int takeIndex = ArrayBlockingQueue.this.takeIndex;
296
final int prevCycles = this.prevCycles;
297
final int prevTakeIndex = this.prevTakeIndex;
298
final int len = items.length;
299
int cycleDiff = cycles - prevCycles;
300
if (removedIndex < takeIndex)
301
cycleDiff++;
302
final int removedDistance =
303
(cycleDiff * len) + (removedIndex - prevTakeIndex);
304
// assert removedDistance >= 0;
305
int cursor = this.cursor;
306
if (cursor >= 0) {
307
int x = distance(cursor, prevTakeIndex, len);
308
if (x == removedDistance) {
309
if (cursor == putIndex)
310
this.cursor = cursor = NONE;
311
}
312
else if (x > removedDistance) {
313
// assert cursor != prevTakeIndex;
314
this.cursor = cursor = dec(cursor);
315
}
316
}
317
int lastRet = this.lastRet;
318
if (lastRet >= 0) {
319
int x = distance(lastRet, prevTakeIndex, len);
320
if (x == removedDistance)
321
this.lastRet = lastRet = REMOVED;
322
else if (x > removedDistance)
323
this.lastRet = lastRet = dec(lastRet);
324
}
325
int nextIndex = this.nextIndex;
326
if (nextIndex >= 0) {
327
int x = distance(nextIndex, prevTakeIndex, len);
328
if (x == removedDistance)
329
this.nextIndex = nextIndex = REMOVED;
330
else if (x > removedDistance)
331
this.nextIndex = nextIndex = dec(nextIndex);
332
}
333
else if (cursor < 0 && nextIndex < 0 && lastRet < 0) {
334
this.prevTakeIndex = DETACHED;
335
return true;
336
}
337
return false;
338
}
339
340
/**
341
* Called whenever takeIndex wraps around to zero.
342
*
343
* @return true if this iterator should be unlinked from itrs
344
*/
345
boolean takeIndexWrapped() {
346
// assert lock.getHoldCount() == 1;
347
if (isDetached())
348
return true;
349
if (itrs.cycles - prevCycles > 1) {
350
// All the elements that existed at the time of the last
351
// operation are gone, so abandon further iteration.
352
shutdown();
353
return true;
354
}
355
return false;
356
}
357
358
// /** Uncomment for debugging. */
359
// public String toString() {
360
// return ("cursor=" + cursor + " " +
361
// "nextIndex=" + nextIndex + " " +
362
// "lastRet=" + lastRet + " " +
363
// "nextItem=" + nextItem + " " +
364
// "lastItem=" + lastItem + " " +
365
// "prevCycles=" + prevCycles + " " +
366
// "prevTakeIndex=" + prevTakeIndex + " " +
367
// "size()=" + size() + " " +
368
// "remainingCapacity()=" + remainingCapacity());
369
// }
370
}
Copied!

LinkedBlockingQueue

LinkedBlockingDeque

ConcurrentLinkedQueue