
本文主要介绍和对比我们常用的几种并发工具类,主要涉及 CountDownLatch
、 CyclicBarrier
、 Semaphore
、 Exchanger
相关的内容,如果对多线程相关内容不熟悉,可以看笔者之前的一些文章:
- 介绍
CountDownLatch
、CyclicBarrier
两者的使用与区别,他们都是等待多线程完成,是一种并发流程的控制手段, - 介绍
Semaphore
、Exchanger
的使用,semaphore
是信号量,可以用来控制允许的线程数,而 Exchanger
可以用来交换两个线程间的数据。
CountDownLatch
CountDownLatch
是 JDK5
之后加入的一种并发流程控制工具,它在 java.util.concurrent
包下CountDownLatch
允许一个或多个线程等待其他线程完成操作,这里需要注意,是可以是一个等待也可以是多个来等待CountDownLatch
的构造函数如下,它接受一个 int
类型的参数作为计数器,即如果你想等待N
个线程完成,那么这里就传入 N
1 2 3 4
| public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
|
- 其中有两个核心的方法
countDown
与 await
,其中 当我们调用 countDown
方法时相应的 N
的值减 1,而 await
方法则会阻塞当前线程,直到 N
的值变为零。 - 说起来比较抽象,下面我们通过实际案例来说明。
多个线程等待一个线程
- 在我们生活中最典型的案例就是体育中的跑步,假设现在我们要进行一场赛跑,那么所有的选手都需要等待裁判员的起跑命令,这时候,我们将其抽象化每个选手对应的是一个线程,而裁判员也是一个线程,那么就是多个选手的线程再等待裁判员线程的命令来执行
- 我们通过
CountDownLatch
来实现这一案例,那么等待的个数 N
就是上面的裁判线程的个数,即为 1,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
|
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
new Thread(() ->{ try { System.out.println(Thread.currentThread().getName() +"已准备"); countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"开始跑~~");
},"选手1").start(); new Thread(() ->{ try { System.out.println(Thread.currentThread().getName() +"已准备"); countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"开始跑~~");
},"选手2").start();
TimeUnit.SECONDS.sleep(1); System.out.println("裁判:预备~~~"); countDownLatch.countDown(); System.out.println("裁判:跑~~~"); }
|

在上述代码中,我们首先创建了一个计数为1 的 CountDownLatch
对象,这代表我们需要等待的线程数,之后再创建了两个线程,用来代表选手线程,同时在选手的线程中我们都调用了 await
方法,让线程进入阻塞状态,直到CountDownLatch的计数为零后再执行后面的内容,在主线程 main
方法中我们等待 1秒后执行 countDown
方法,这个方法就是减一,此时的 N
则为零了,那么选手线程则开始执行后面的内容,整体的输出如上图所示
一个/多个线程等待多个线程
- 同样从我们生活中的场景来抽象,假设公司要组织出游,大巴车接送,当凑够五个人大巴车则发车出发,这里就是大巴车需要等待这五个人全部到齐才能继续执行,我们抽象之后用
CountDownLatch
来实现,那么的计数个数 N
则为5,因为要等待这五个,通过代码实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(5);
for (int i = 0; i < 5; i++) { new Thread(()->{ System.out.println(Thread.currentThread().getName() + "从住所出发..."); try { TimeUnit.SECONDS.sleep((long) (Math.random()*10)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " 到达目的地-----"); countDownLatch.countDown(); },"人员-"+i).start(); }
System.out.println("大巴正在等待人员中....."); countDownLatch.await(); System.out.println("-----所有人到齐,出发-----"); }
|

从上述代码中我们可以看到,定义了一个计数为5的 countDownLatch
,之后通过循环创建五个线程,模拟五个人员,当他们到达指定地点后执行 countDown
方法,对计数减一。主线程相当于是大巴车的线程,执行 await
方法进行阻塞,只有当 N
的值减到0后则执行后面的输出
CountDownLatch 主要方法介绍
1
| public CountDownLatch(int count) { };
|
它的构造函数是传入一个参数,该参数 count
是需要倒数的数值。
await()
:调用 await()
方法的线程开始等待,直到倒数结束,也就是 count
值为 0
的时候才会继续执行。await(long timeout, TimeUnit unit)
:await()
有一个重载的方法,里面会传入超时参数,这个方法的作用和 await()
类似,但是这里可以设置超时时间,如果超时就不再等待了。countDown()
:把数值倒数 1
,也就是将 count
值减 1
,直到减为 0
时,之前等待的线程会被唤起。
上面的案例介绍了 CountDownLatch
的使用,但是 CountDownLatch
有个特点,那就是不能够重用,比如已经完成了倒数,那可不可以在下一次继续去重新倒数呢?是可以的,一旦倒数到0 则结束了,无法再次设置循环执行,但是我们实际需求中有很多场景中需要循环来处理,这时候我们可以使用 CyclicBarrier
来实现
CyclicBarrier
CyclicBarrier
与 CountDownLatch
比较相似,当等待到一定数量的线程后开始执行某个任务CyclicBarrier
的字面意思是可以循环使用的屏障,它的功能就是让一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开会,此时所有被屏障阻塞的线程都将继续执行。如下演示

- 上图中可以看到,到线程到达屏障后阻塞,直到最后一个也到达后,则全部放行
- 首先我们来看下它的构造函数,如下:
1 2 3 4 5 6 7 8 9 10
| public CyclicBarrier(int parties) { this(parties, null); }
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
|
CyclicBarrier(int parties)
构造函数提供了int
类型的参数,代表的是需要拦截的线程数量,而每个线程通过调用 await
方法来告诉 CyclicBarrier
我到达屏障点了,然后阻塞CyclicBarrier(int parties, Runnable barrierAction)
构造函数是为我们提供的一个高级方法,加了一个 barrierAction
的参数,这是一个Runnable
类型的,也就是一个线程,它表示当所有线程到达屏障后,悠闲触发 barrierAction
线程执行,再执行各个线程之后的内容
案例
- 假设你要和你女朋友约会,约定了一个时间地点,那么不管你们谁先到都会等待另一个到才会出发取约会~ 那么这时候我们通过
CyclicBarrier
的来实现,这里我们需要来拦截的线程就是两个。具体实现 如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
|
public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
new Thread(() ->{ System.out.println("快速收拾,出门~~~"); try { TimeUnit.MILLISECONDS.sleep(500); System.out.println("到了约会地点等待女朋友前来~~"); cyclicBarrier.await(); System.out.println("女朋友到来嗨皮出发~~约会"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); }
},"男朋友").start(); new Thread(() ->{ System.out.println("慢慢收拾,出门~~~"); try { TimeUnit.MILLISECONDS.sleep(5000); System.out.println("到了约会地点等待男朋友前来~~"); cyclicBarrier.await(); System.out.println("男朋友到来嗨皮出发~~约会"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } },"女朋友").start();
}
|

上面代码,相对简单,创建一个拦截数为2的屏障,之后创建两个线程,调用await方法,只有当调用两次才会触发后面的流程。
- 我们再写一个案例sh,使用含有
Runnable
参数的构造函数;和之前 CountDownLatch
的案例相似,公司组织出游,这时候肯定有很多大巴在等待接送,大巴不会等所有的 人都到才出发,而是每坐满一辆车就出发一辆,这种场景我们就可以使用 CyclicBarrier
来实现,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
|
public static void main(String[] args) { int peopleNum = 2000; CyclicBarrier cyclicBarrier = new CyclicBarrier(25,() ->{ System.out.println("------------25人数凑齐出发------------"); });
for (int j = 1; j <= peopleNum; j++) { new Thread(new PeopleTask("People-"+j,cyclicBarrier)).start(); }
}
static class PeopleTask implements Runnable{
private String name; private CyclicBarrier cyclicBarrier; public PeopleTask(String name,CyclicBarrier cyclicBarrier){ this.name = name; this.cyclicBarrier = cyclicBarrier; }
@Override public void run() { System.out.println(name+"从家里出发,正在前往聚合地...."); try { TimeUnit.MILLISECONDS.sleep(((int) Math.random()*1000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(name+"到达集合地点,等待其他人.."); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); }
} }
|
CyclicBarrier 和 CountDownLatch 的异同
相同点:
- 都能阻塞一个或一组线程,直到某个预设的条件达成发生,再统一出发
不同点:
- 可重复性:
CountDownLatch
的计数器只能使用一次,到到达0后就不能再次使用了,除非新建实例;而 CyclicBarrier
的计数器是可以复用循环的,所以 CyclicBarrier
可以用在更复杂的场景,可以随时调用 reset
方法来重制拦截数,如计算发生错误时可以直接充值计数器,让线程重新执行一次。 - 作用对象:
CyclicBarrier
要等固定数量的线程都到达了屏障位置才能继续执行,而 CountDownLatch
只需等待数字倒数到 0
,也就是说 CountDownLatch
作用于事件,但 CyclicBarrier
作用于线程;CountDownLatch
是在调用了 countDown
方法之后把数字倒数减 1
,而 CyclicBarrier
是在某线程开始等待后把计数减 1
。 - 执行动作:
CyclicBarrier
有执行动作 barrierAction
,而 CountDownLatch
没这个功能。
Semaphore
Semaphore
(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源,

- 从图中可以看出,信号量的一个最主要的作用就是,来控制那些需要限制并发访问量的资源。具体来讲,信号量会维护“许可证”的计数,而线程去访问共享资源前,必须先拿到许可证(
acquire
方法)。线程可以从信号量中去“获取”一个许可证,一旦线程获取之后,信号量持有的许可证就转移过去了,所以信号量手中剩余的许可证要减一。 - 同理,线程也可以“释放”一个许可证,如果线程释放了许可证(
release
方法),这个许可证相当于被归还给信号量了,于是信号量中的许可证的可用数量加一。当信号量拥有的许可证数量减到 0 时,如果下个线程还想要获得许可证,那么这个线程就必须等待,直到之前得到许可证的线程释放,它才能获取。由于线程在没有获取到许可证之前不能进一步去访问被保护的共享资源,所以这就控制了资源的并发访问量,这就是整体思路。
案例
- 如我们平时开发中典型的数据库操作,这是一个密集
IO
操作,我们可以启动很多线程但是数据库的连接池是有限制的,假设我们设置允许五个链接,如果我们开启太多线程直接操作则会出现异常,这时候我们可以通过信号量来控制,让一直最多只有五个线程来获取连接。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
|
public static void main(String[] args) { final ExecutorService service = Executors.newFixedThreadPool(100); Semaphore s = new Semaphore(5); for (int i = 0; i < 100; i++) { service.submit(() ->{ try { s.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } try { System.out.println("数据库耗时操作"+Thread.currentThread().getName()); TimeUnit.MILLISECONDS.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "正在执行...."); s.release(); }); }
}
|
如上代码,创建了一个容量100的线程池,模拟我们程序中大量的线程,添加一百个任务,让线程池执行。创建了一个容量为5的信号量,在线程中我们调用 acquire
来获得信号量的许可,只有获得了才能只能下面的内容不然阻塞。当执行完后释放该许可,通过 release
方法,
- 通过上面的演示,有没有觉得非常眼熟,对,就是和我们之前接触过的锁很相似,只是锁是只允许一个线程访问,那我们能不能将信号量的容量设置为1呢? 这当然是可以的,当我们设置为1时其实就和我们的锁的功能是一致的,如下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| private static int count = 0;
public static void main(String[] args) throws InterruptedException { final ExecutorService service = Executors.newFixedThreadPool(10);
Semaphore semaphore = new Semaphore(1); for (int i = 0; i < 10000; i++) { service.submit(() ->{ try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "执行了"); count ++; semaphore.release(); }); } service.shutdown(); TimeUnit.SECONDS.sleep(5); System.out.println(count);
}
|
其他主要方法介绍
public boolean tryAcquire()
:tryAcquire
和锁的 trylock
思维是一致的,是尝试获取许可证,相当于看看现在有没有空闲的许可证,如果有就获取,如果现在获取不到也没关系,不必陷入阻塞,可以去做别的事。public boolean tryAcquire(long timeout, TimeUnit unit)
:是一个重载的方法,它里面传入了超时时间。比如传入了 3 秒钟,则意味着最多等待 3 秒钟,如果等待期间获取到了许可证,则往下继续执行;如果超时时间到,依然获取不到许可证,它就认为获取失败,且返回 false。int availablePermits()
:返回此信号量中当前可用的许可证数int getQueueLength()
:返回正在等待许可证的线程数boolean hasQueuedThreads()
:判断是否有线程正在等待获取许可证void reducePermits(int reduction)
:减少 reduction
个许可证,是个 protected
方法Collection<Thread> getQueuedThreads()
:返回正在等待获取许可证的线程集合,是个 protected
方法
Exchanger
Exchanger
(交换者)是一个用于线程间协作的工具类,它主要用于进行线程间数据的交换,它有一个同步点,当两个线程到达同步点时可以将各自的数据传给对方,如果一个线程先到达同步点则会等待另一个到达同步点,到达同步点后调用 exchange
方法可以传递自己的数据并且获得对方的数据。- 我们假设现在需要录入一些重要的账单信息,为了保证准备,让两个人分别录入,之后再进行对比后是否一致,防止错误繁盛。下面通过代码来演示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
| public class ExchangerTest {
public static void main(String[] args) {
final ExecutorService service = Executors.newFixedThreadPool(2);
Exchanger<InfoMsg> exchanger = new Exchanger<>();
service.submit(() ->{ InfoMsg infoMsg = new InfoMsg(); infoMsg.content="这是线程A"; infoMsg.id ="10001"; infoMsg.desc = "1"; infoMsg.message = "message"; System.out.println("正在执行其他..."); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } try { final InfoMsg exchange = exchanger.exchange(infoMsg); System.out.println("线程A 交换数据====== 得到"+ exchange); if (!exchange.equals(infoMsg)){ System.out.println("数据不一致~~请稽核"); return; } } catch (InterruptedException e) { e.printStackTrace(); } }); service.submit(() ->{ InfoMsg infoMsg = new InfoMsg(); infoMsg.content="这是线程B"; infoMsg.id ="10001"; infoMsg.desc = "1"; infoMsg.message = "message"; System.out.println("正在执行其他..."); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } try { final InfoMsg exchange = exchanger.exchange(infoMsg); System.out.println("线程B 交换数据====== 得到"+ exchange); if (!exchange.equals(infoMsg)){ System.out.println("数据不一致~~请稽核"); return; } } catch (InterruptedException e) { e.printStackTrace(); } });
service.shutdown(); }
static class InfoMsg{ String id; String name; String message; String content; String desc;
@Override public String toString() { return "InfoMsg{" + "id='" + id + '\'' + ", name='" + name + '\'' + ", message='" + message + '\'' + ", content='" + content + '\'' + ", desc='" + desc + '\'' + '}'; }
@Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; InfoMsg infoMsg = (InfoMsg) o; return Objects.equals(id, infoMsg.id) && Objects.equals(name, infoMsg.name) && Objects.equals(message, infoMsg.message) && Objects.equals(content, infoMsg.content) && Objects.equals(desc, infoMsg.desc); }
@Override public int hashCode() { return Objects.hash(id, name, message, content, desc); } } }
|

上面代码运行可以看到,当我们线程 A/B
到达同步点即调用 exchange
后进行数据的交换,拿到对方的数据再与自己的数据对比可以做到稽核 的效果
Exchanger
同样可以用于遗传算法中,选出两个对象进行交互两个的数据通过交叉规则得到两个混淆的结果。Exchanger
中嗨提供了一个方法 public V exchange(V x, long timeout, TimeUnit unit)
主要是用来防止两个程序中一个一直没有执行 exchange
而导致另一个一直陷入等待状态,这是可以用这个方法,设置超时时间,超过这个时间则不再等待。
