• 在之前的线程池的介绍中我们看到了很多阻塞队列,这篇文章我们主要来说说阻塞队列的事。
  • 阻塞队列也就是 BlockingQueue ,这个类是一个接
  • 口,同时继承了 Queue 接口,这两个接口都是在JDK5 中加入的 。
  • BlockingQueue 阻塞队列是线程安全的,在我们业务中是会经常频繁使用到的,如典型的生产者消费的场景,生产者只需要向队列中添加,而消费者负责从队列中获取。

  • 如上图展示,我们生产者线程不断的put 元素到队列,而消费者从中take 出元素处理,这样实现了任务与执行任务类之间的解耦,任务都被放入到了阻塞队列中,这样生产者和消费者之间就不会直接相互访问实现了隔离提高了安全性。

并发队列

  • 上面是 Java 中队列Queue 类的类图,我们可以看到它分为两大类,阻塞队列与非阻塞队列
  • 阻塞队列的实现接口是 BlockingQueue 而非阻塞队列的接口是 ConcurrentLinkedQueue , 本文主要介绍阻塞队列,非阻塞队列不再过多阐述
  • BlockingQueue 主要有下面六个实现类,分别是 ArrayBlockingQueueLinkedBlockingQueueSynchronousQueueDelayQueuePriorityBlockingQueueLinkedTransferQueue 。这些阻塞队列有着各自的特点和适用场景,后面详细介绍。
  • 非阻塞队列的典型例子如 ConcurrentLinkedQueue , 它不会阻塞线程,而是利用了 CAS 来保证线程的安全。
  • 其实还有一个队列和 Queue 关系很紧密,那就是Deque,这其实是 double-ended-queue 的缩写,意思是双端队列。它的特点是从头部和尾部都能添加和删除元素,而我们常见的普通队列Queue 则是只能一端进一端出,即FIFO

阻塞队列特点

  • 阻塞队列的特点就在于阻塞,它可以阻塞线程,让生产者消费者得以平衡,阻塞队列中有两个关键方法 PutTake 方法

take方法

  • take 方法的功能是获取并移除队列的头结点,通常在队列里有数据的时候是可以正常移除的。可是一旦执行 take 方法的时候,队列里无数据,则阻塞,直到队列里有数据。一旦队列里有数据了,就会立刻解除阻塞状态,并且取到数据。过程如图所示:


put方法

  • put 方法插入元素时,如果队列没有满,那就和普通的插入一样是正常的插入,但是如果队列已满,那么就无法继续插入,则阻塞,直到队列里有了空闲空间。如果后续队列有了空闲空间,比如消费者消费了一个元素,那么此时队列就会解除阻塞状态,并把需要添加的数据添加到队列中。过程如图所示:


是否有界(容量有多大)

  • 此外,阻塞队列还有一个非常重要的属性,那就是容量的大小,分为有界和无界两种。
  • 无界队列意味着里面可以容纳非常多的元素,例如 LinkedBlockingQueue 的上限是 Integer.MAX_VALUE,约为 2 的 31 次方,是非常大的一个数,可以近似认为是无限容量,因为我们几乎无法把这个容量装满。
  • 但是有的阻塞队列是有界的,例如 ArrayBlockingQueue 如果容量满了,也不会扩容,所以一旦满了就无法再往里放数据了。

阻塞队列常见方法

  • 首先我们从常用的方法出发,根据各自的特点我们可以大致分为三个大类,如下表所示:
分类方法含义特点
抛出异常add添加一个元素如果队列已满,添加则抛出  IllegalStateException 异常
remove删除队列头节点当队列为空后,删除则抛出  NoSuchElementException 异常
element获取队列头元素当队列为空时,则抛出 NoSuchElementException 异常
返回无异常offer添加一个元素当队列已满,不会报异常,返回  false ,如果成功返回 true
poll获取队列头节点,并且删除它当队列空时,返回  Null
peek单纯获取头节点当队列为空时反馈 NULL
阻塞put添加一个元素如果队列已满则阻塞
take返回并删除头元素如果队列为空则阻塞
  • 如上面所示主要的八个方法,相对都比较简单,下面我们通过实际代码演示的方式来认识

抛异常类型[add、remove、element]

add

  • 向队列中添加一个元素。如果队列是有界队列,当队列已满时再添加则抛出异常提示,如下:
1
2
3
4
BlockingQueue queue = new ArrayBlockingQueue(2);
queue.add(1);
queue.add(2);
queue.add(3);
  • 上述代码中我们创建了一个阻塞队列容量为2,当我们使用 add 向其中添加元素,当添加到第三个时则会抛出异常如下:

remove

  • remove 方法是从队列中删除队列的头节点,同时会返回该元素。当队列中为空时执行 remove 方法时则会抛出异常,代码如下:
1
2
3
4
5
6
private static void groupRemove() {
BlockingQueue queue = new ArrayBlockingQueue(2);
queue.add("i-code.online");
System.out.println(queue.remove());
System.out.println(queue.remove());
}
  • 上述代码中,我们可以看到,我们想队列中添加了一个元素 i-code.online , 之后通过 remove 方法进行删除,当执行第二次remove 时队列内已无元素,则抛出异常。如下:

element

  • element 方法是获取队列的头元素,但是并不是删除该元素,这也是与 remove 的区别,当队列中没有元素后我们再执行 element 方法时则会抛出异常,代码如下:
1
2
3
4
5
6
7
8
9
10
private static void groupElement() {
BlockingQueue queue = new ArrayBlockingQueue(2);
queue.add("i-code.online");
System.out.println(queue.element());
System.out.println(queue.element());
}
private static void groupElement2() {
BlockingQueue queue = new ArrayBlockingQueue(2);
System.out.println(queue.element());
}
  • 上面两个方法分别演示了在有元素和无元素的情况element 的使用。在第一个方法中并不会报错,因为首元素一直存在的,第二个方法中因为空的,所以抛出异常,如下结果:

无异常类型[offer、poll、peek]

offer

  • offer 方法是向队列中添加元素, 同时反馈成功与失败,如果失败则返回 false ,当队列已满时继续添加则会失败,代码如下:
1
2
3
4
5
6
private static void groupOffer() {
BlockingQueue queue = new ArrayBlockingQueue(2);
System.out.println(queue.offer("i-code.online"));
System.out.println(queue.offer("云栖简码"));
System.out.println(queue.offer("AnonyStar"));
}
  • 如上述代码所示,我们向一个容量为2的队列中通过offer 添加元素,当添加第三个时,则会反馈 false ,如下结果:
1
2
3
4

true
true
false

poll

  • poll 方法对应上面 remove 方法,两者的区别就在于是否会在无元素情况下抛出异常,poll 方法在无元素时不会抛出异常而是返回null ,如下代码:
1
2
3
4
5
6
7
private static void groupPoll() {
BlockingQueue queue = new ArrayBlockingQueue(2);
System.out.println(queue.offer("云栖简码")); //添加元素
System.out.println(queue.poll()); //取出头元素并且删除
System.out.println(queue.poll());

}
  • 上面代码中我们创建一个容量为2的队列,并添加一个元素,之后调用两次poll方法来获取并删除头节点,发现第二次调用时为null ,因为队列中已经为空了,如下:
1
2
3
4

true
云栖简码
null

peek

  • peek 方法与前面的 element 方法是对应的 ,获取元素头节点但不删除,与其不同的在于peek 方法在空队列下并不会抛出异常,而是返回 null,如下:
1
2
3
4
5
6
7
8
9
10
private static void groupPeek() {
BlockingQueue queue = new ArrayBlockingQueue(2);
System.out.println(queue.offer(1));
System.out.println(queue.peek());
System.out.println(queue.peek());
}
private static void groupPeek2() {
BlockingQueue queue = new ArrayBlockingQueue(2);
System.out.println(queue.peek());
}
  • 如上述代码所示,我么们分别展示了非空队列与空队列下peek 的使用,结果如下:

阻塞类型[put、take]

put

  • put 方法是向队列中添加一个元素,这个方法是阻塞的,也就是说当队列已经满的情况下,再put元素时则会阻塞,直到队列中有空位.

take

  • take 方法是从队列中获取头节点并且将其移除,这也是一个阻塞方法,当队列中已经没有元素时,take 方法则会进入阻塞状态,直到队列中有新的元素进入。

常见的阻塞队列

ArrayBlockingQueue

  • ArrayBlockingQueue 是一个我们常用的典型的有界队列,其内部的实现是基于数组来实现的,我们在创建时需要指定其长度,它的线程安全性由 ReentrantLock 来实现的。
1
2
public ArrayBlockingQueue(int capacity) {...}
public ArrayBlockingQueue(int capacity, boolean fair) {...}
  • 如上所示,ArrayBlockingQueue 提供的构造函数中,我们需要指定队列的长度,同时我们也可以设置队列是都是公平的,当我们设置了容量后就不能再修改了,符合数组的特性,此队列按照先进先出(FIFO)的原则对元素进行排序。
  • ReentrantLock 一样,如果 ArrayBlockingQueue 被设置为非公平的,那么就存在插队的可能;如果设置为公平的,那么等待了最长时间的线程会被优先处理,其他线程不允许插队,不过这样的公平策略同时会带来一定的性能损耗,因为非公平的吞吐量通常会高于公平的情况。

LinkedBlockingQueue

  • 从它的名字我们可以知道,它是一个由链表实现的队列,这个队列的长度是 Integer.MAX_VALUE ,这个值是非常大的,几乎无法达到,对此我们可以认为这个队列基本属于一个无界队列(也又认为是有界队列)。此队列按照先进先出的顺序进行排序。

SynchronousQueue

  • synchronousQueue 是一个不存储任何元素的阻塞队列,每一个put操作必须等待take操作,否则不能添加元素。同时它也支持公平锁和非公平锁。
  • synchronousQueue 的容量并不是1,而是0。因为它本身不会持有任何元素,它是直接传递的,synchronousQueue 会把元素从生产者直接传递给消费者,在这个过程中能够是不需要存储的
  • 在我们之前介绍过的线程池 CachedThreadPool 就是利用了该队列。Executors.newCachedThreadPool(),因为这个线程池它的最大线程数是Integer.MAX_VALUE,它是更具需求来创建线程,所有的线程都是临时线程,使用完后空闲60秒则被回收,

PriorityBlockingQueue

  • PriorityBlockingQueue 是一个支持优先级排序的无界阻塞队列,可以通过自定义实现 compareTo() 方法来指定元素的排序规则,或者通过构造器参数 Comparator 来指定排序规则。但是需要注意插入队列的对象必须是可比较大小的,也就是 Comparable 的,否则会抛出 ClassCastException 异常。
  • 它的 take 方法在队列为空的时候会阻塞,但是正因为它是无界队列,而且会自动扩容,所以它的队列永远不会满,所以它的 put 方法永远不会阻塞,添加操作始终都会成功

DelayQueue

  • DelayQueue 是一个实现PriorityBlockingQueue的延迟获取的无界队列。具有“延迟”的功能。
  • DelayQueue 应用场景:1. 缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。2. 定时任务调度。使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,从比如TimerQueue就是使用DelayQueue实现的。
  • 它是无界队列,放入的元素必须实现 Delayed 接口,而 Delayed 接口又继承了 Comparable 接口,所以自然就拥有了比较和排序的能力,代码如下:
1
2
3
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
  • 可以看出 Delayed 接口继承 Comparable,里面有一个需要实现的方法,就是  getDelay。这里的 getDelay 方法返回的是“还剩下多长的延迟时间才会被执行”,如果返回 0 或者负数则代表任务已过期。
  • 元素会根据延迟时间的长短被放到队列的不同位置,越靠近队列头代表越早过期。


快来关注公众号~~ 平时会更新一些干货哈!

欢迎关注公众号“云栖简码”