queue - JiyangM/spring GitHub Wiki
-
LinkedBlockingQueue:
// 容量大小
private final int capacity;
// 元素个数,因为有2个锁,存在竞态条件,使用AtomicInteger
private final AtomicInteger count = new AtomicInteger(0);
// 头结点
private transient Node<E> head;
// 尾节点
private transient Node<E> last;
// 拿锁
private final ReentrantLock takeLock = new ReentrantLock();
// 拿锁的条件对象
private final Condition notEmpty = takeLock.newCondition();
// 放锁
private final ReentrantLock putLock = new ReentrantLock();
// 放锁的条件对象
private final Condition notFull = putLock.newCondition();
LinkedBlockingQueue 使用2个锁,放锁和拿锁,锁实现是ReentrantLock,所以LinkedBlockingQueue的读写之间可以并行运行,但是单独针对读或者写来说是阻塞的。
public static void test1() {
//当没有数据插入元素会阻塞当前线程
try {
OptionT.queue.take();
} catch (Exception e) {
e.printStackTrace();
}
//该句话一直没有打印出看来
System.out.println("hello LinkedBlockingQueue");
// 两个线程不发生阻塞
ExecutorService service = Executors.newFixedThreadPool(2);
service.execute(new Runnable() {
@Override
public void run() {
try {
OptionT.queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
service.execute(new Runnable() {
@Override
public void run() {
System.out.println("hello");
}
});
public static void test2() {
//插入删除可并行执行
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i = 0; i < 5; i++) {
service.execute(new Task(i));
}
for (int i = 0; i < 5; i++) {
service.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println(OptionT.queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
public static class Task implements Runnable {
private Integer i;
public Task(Integer i) {
this.i = i;
}
@Override
public void run() {
try {
Thread.sleep(20);
OptionT.queue.add(i + "");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
1
0
3
2
4
方法:
- take方法对于没数据的情况下会阻塞
- poll方法删除链表头结点
- remove方法删除指定的对象,需要注意的是remove方法由于要删除的数据的位置不确定,需要2个锁同时加锁。
-
ArrayBlockingQueue:
-
内部也是通过重入锁ReentrantLock实现,但是只有一把锁,不能并发运行,初始化的时候就需要指定大小。
LinkedBlockingQueue的容量是没有上限的(说的不准确,在不指定时容量为Integer.MAX_VALUE,不要然的话在put时怎么会受阻呢),但是也可以选择指定其最大容量,它是基于链表的队列,此队列按 FIFO(先进先出)排序元素。
ArrayBlockingQueue在构造时需要指定容量, 并可以选择是否需要公平性,如果公平参数被设置true,等待时间最长的线程会优先得到处理(其实就是通过将ReentrantLock设置为true来 达到这种公平性的:即等待时间最长的线程会先操作)。通常,公平性会使你在性能上付出代价,只有在的确非常需要的时候再使用它。它是基于数组的阻塞循环队 列,此队列按 FIFO(先进先出)原则对元素进行排序。
PriorityBlockingQueue是一个带优先级的 队列,而不是先进先出队列。元素按优先级顺序被移除,该队列也没有上限(看了一下源码,PriorityBlockingQueue是对 PriorityQueue的再次包装,是基于堆数据结构的,而PriorityQueue是没有容量限制的,与ArrayList一样,所以在优先阻塞 队列上put时是不会受阻的。虽然此队列逻辑上是无界的,但是由于资源被耗尽,所以试图执行添加操作可能会导致 OutOfMemoryError),但是如果队列为空,那么取元素的操作take就会阻塞,所以它的检索操作take是受阻的。另外,往入该队列中的元 素要具有比较能力。
DelayQueue(基于PriorityQueue来实现的)是一个存放Delayed 元素的无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部,并且poll将返回null。当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于或等于零的值时,则出现期满,poll就以移除这个元素了。此队列不允许使用 null 元素。