ArrayBlockingQueue原理分析 - 969251639/study GitHub Wiki
ArrayBlockingQueue是jdk自带的以数组实现的阻塞队列
它的规则是以下几个方法的实现
入队:
add:插入到队尾,如果队列满则返回插入失败抛出IllegalStateException异常
offer:插入到队尾,如果队列满则返回插入失败
put:插入到队尾,如果队列满则等待数据出队
出队:
poll:从队头取出数据,队列为空则返回null
take:从对头取出数据,队列为空则等待数据入队
peek:从队头取出数据,但数据仍在队列里,队列为空则返回null
ArrayBlockingQueue内部通过ReentrantLock作为通知机制来实现,看它的构造函数
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
不管哪一种,最后都会生成一个lock和两个condition,看下它主要的成员变量
/** The queued items */
final Object[] items;//存放数据的数组,创建后不可改变
/** items index for next take, poll, peek or remove */
int takeIndex;//队头索引
/** items index for next put, offer, or add */
int putIndex;//下一个入队的索引
/** Number of elements in the queue */
int count;//队列里面的元素数量
/** Main lock guarding all access */
final ReentrantLock lock;//重入锁
/** Condition for waiting takes */
private final Condition notEmpty;//队列不为空的条件队列
/** Condition for waiting puts */
private final Condition notFull;//队列没满的条件队列
add:
public boolean add(E e) {
return super.add(e);//调用父类的add方法
}
public boolean add(E e) {
if (offer(e))//底层用offer实现,插入成功返回true,否则抛出IllegalStateException异常
return true;
else
throw new IllegalStateException("Queue full");
}
offer:
public boolean offer(E e) {
checkNotNull(e);//如果插入元素e为null,抛出空指针异常
final ReentrantLock lock = this.lock;//获取锁
lock.lock();//加锁
try {
if (count == items.length)//队列是否满了,满了返回false,表示入队失败
return false;
else {
enqueue(e);//插入到队尾
return true;//插入成功返回true
}
} finally {
lock.unlock();//解锁
}
}
put:
public void put(E e) throws InterruptedException {
checkNotNull(e);//如果插入元素e为null,抛出空指针异常
final ReentrantLock lock = this.lock;//获取锁
lock.lockInterruptibly();//加锁
try {
while (count == items.length)//队列是否满了,满了等待
notFull.await();
enqueue(e);//没有满插入到队尾
} finally {
lock.unlock();//解锁
}
}
入队:
private void enqueue(E x) {
final Object[] items = this.items;//获取队列存储容器
items[putIndex] = x;//放到putIndex位置下面
if (++putIndex == items.length) //putIndex++,则putIndex存储了下一个存放位置,如果下一个存放位置为队列的最大值容量值,那么从头开始放,复用数组
putIndex = 0;
count++;//队列数量+1
notEmpty.signal();//唤醒take方法中的等待,表示有数据进来了,你们可以去取了
}
poll:
public E poll() {
final ReentrantLock lock = this.lock;//获取锁
lock.lock();//加锁
try {
return (count == 0) ? null : dequeue();//队列为空返回null,否则队头出队
} finally {
lock.unlock();//释放锁
}
}
take:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;//获取锁
lock.lockInterruptibly();//加锁
try {
while (count == 0)//队列为空,等待
notEmpty.await();
return dequeue();//队头出队
} finally {
lock.unlock();//释放锁
}
}
peek:
public E peek() {
final ReentrantLock lock = this.lock;//获取锁
lock.lock();//加锁
try {
return itemAt(takeIndex); // 返回队头的元素
} finally {
lock.unlock();//释放锁
}
}
final E itemAt(int i) {
return (E) items[i];//返回指定下标的数组元素
}
出队:
private E dequeue() {
final Object[] items = this.items;////获取队列存储容器
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];//获取队头元素
items[takeIndex] = null;//将队头元素设置为null,表示数据已从队列中取出
if (++takeIndex == items.length)//++takeIndex后表示下一次去数据时takeIndex的数组索引,如果等于队列容量,从头开始获取
takeIndex = 0;
count--;//队列数量-1
if (itrs != null)//迭代器用
itrs.elementDequeued();
notFull.signal();//唤醒put或offer方法,表示数据已经被取出了,队列有空位置,你们可以去插入数据了
return x;
}