Java中的并发工具类-CountDownLatch和CyclicBarrier

在JDK的并发包中,有几个非常有用的并发工具类,它们分别是:CountDownLatchCyclicBarrierSemaphoreExchanger

  • CountDownLatch(倒计时门闩):它允许一个或多个线程等待其他线程完成操作后再继续执行。它通过一个计数器来实现,线程通过调用 countDown() 方法来减少计数器的值,await()方法进行阻塞等待计数器减少,当计数器达到零时,等待的线程将被释放。
  • CyclicBarrier(循环屏障):它允许一组线程互相等待,直到到达一个共同的屏障点,然后继续执行后续操作。与CountDownLatch不同的是,CyclicBarrier的计数器可以重复使用(reset()方法),当所有等待线程都到达屏障点后,计数器会重置,线程可以继续下一次等待。
  • Semaphore(信号量):它用于控制对某个资源的访问权限。Semaphore维护了一组许可证,线程在访问资源前需要获取许可证,如果许可证不可用,则线程必须等待,直到有可用的许可证。
  • Exchanger(交换器):它提供了一种线程间交换数据的机制。两个线程可以通过Exchanger交换数据,当两个线程都调用 exchange() 方法后,他们会彼此交换数据,并继续执行后续操作。

CountDownLatch

Latch(门闩)设计模式

当多个线程并发执行任务,然后只有等待所有子任务全部完成进行汇总,程序的门闩才能打开让程序继续往下执行。它指定了一个屏障,只有所有条件都满足的时候,门阀才能打开。

比如小明和小红相约周末去爬山,约定在人民广场碰头,然后一同出发去爬山,他们各自从家里出发,无论是其中某一个先到达了人民广场都要等待另一个到达之后才可以继续进行下去,这里的人民广场碰头就相当于上述的门闩。

示例

还是使用上面的例子,我们模拟小明和小红从家出发,设定不同的等待时间模拟到达人民广场的路程耗时。代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public static void main(String[] args) throws InterruptedException, ExecutionException {
final int threadNum = 2;
ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
CountDownLatch countDownLatch = new CountDownLatch(threadNum);

executorService.execute(() -> {

System.out.println("小明开始出发");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
countDownLatch.countDown(); // 计数器 -1
System.out.println("小明到达人民广场");


});
executorService.execute(() -> {

System.out.println("小红开始出发");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
countDownLatch.countDown(); // 计数器 -1
System.out.println("小红到达人民广场");


});
countDownLatch.await();
System.out.println("小明和小红都到达了人民广场,开始一起出发去爬山");
executorService.shutdown();

}

结果

1
2
3
4
5
6
7
小明开始出发
小红开始出发
小明到达人民广场 // 2s后打印
小红到达人民广场 // 3s后打印
小明和小红都到达了人民广场,开始一起出发去爬山 //3s后打印

Process finished with exit code 0

与Join()的区别

可能这里会有疑问,使用Thread.join()也可以实现相同的功能,这与使用CountDownLatch有什么区别呢?

join()的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public static void main(String[] args) throws InterruptedException {
Thread thread1 = new Thread(() -> {
System.out.println("小明开始出发");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("小明到达人民广场");

}, "thread1");
Thread thread2 = new Thread(() -> {
System.out.println("小红开始出发");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("小红到达人民广场");

}, "thread2");

thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println("小明和小红都到达了人民广场,开始一起出发去爬山");

}

结果

1
2
3
4
5
6
7
小明开始出发
小红开始出发
小明到达人民广场 // 2s后打印
小红到达人民广场 // 3s后打印
小明和小红都到达了人民广场,开始一起出发去爬山 //3s后打印

Process finished with exit code 0

发现使用join()实现和countDownCatch实现好像在代码上的体现并没有太大差异,不急,我们接着往下看

join()实现原理

我们点进去join的jdk源码查看它的实现逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public final void join() throws InterruptedException {
join(0);
}

public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;

if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
// 调用join真正执行的方法
if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}

我们看到他的核心代码就几行

1
2
3
while (isAlive()) {
wait(0);
}

这几行代码不难理解,通过不停的检查join线程是否存活,如果线程状态是活动的,那么就一直等待下去(wait(0)表示永久等待),直到join线程中止后,线程的this.notifyAll()方法会被调用,不过调用notifyAll()方法是在JVM里 实现的,所以在JDK里看不到。

Join()与countDownLatch比较

回到上一个问题,join到底和countDownLatch有什么区别,countDownLatch底层使用了计数器来控制线程的唤醒,提供了更细粒度的线程控制,比如我们运行了100个线程,但是只需要80个线程执行结束就可以继续下去,那么使用join就不合适了。

综上所述 CountDownLatch相对于Join的优势:

  • CountDownLatch可以等待多个线程的完成,而Join只能等待一个线程。
  • CountDownLatch可以灵活地设置计数器的值,不仅仅限于线程数,可以根据需要自由控制。
  • CountDownLatch提供了更细粒度的线程间协作和控制,可以在任意位置进行countDown()和await()的调用,更灵活地控制线程的流程。

CountDownLatch程序实现

上面说了很多CountDownLatch的示例和与join比较,也提了一下CountDownLatch底层的原理,下面就看一下如何实现一个简单的CountDownLatch

程序

我们先新建一个抽象类,包含countDownLatch需要的参数和方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public abstract class Latch {
// 控制了多少线程完成后门阀才能打开
protected int limit;

// 构造函数
public Latch(int limit){
this.limit = limit;
}

// 方法使得线程一直等待
public abstract void await() throws InterruptedException;

// 当前任务线程完成工作之后调用该方法使得计数器减一
public abstract void countDown();

// 获取当前还有多少个线程没有完成任务
public abstract int getUnArrived();

}

然后实现这个抽象类,并写入具体逻辑代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class CountDownLatch extends Latch {

private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();


public CountDownLatch(int limit) {
super(limit);
}

@Override
public void await() throws InterruptedException {
lock.lock();
while (limit > 0){
condition.await();
}
lock.unlock();
}

@Override
public void countDown() {

lock.lock();
if(limit < 0){
throw new IllegalStateException();
}
limit--;
condition.signalAll();

lock.unlock();

}

@Override
public int getUnArrived() {
return limit;
}
}

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class LatchDemo {
public static void main(String[] args) throws InterruptedException {
Latch latch = new CountDownLatch(2);

ExecutorService executorService = Executors.newFixedThreadPool(2);

executorService.execute(()->{
System.out.println("小明开始出发");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown(); // 计数器 -1
System.out.println("小明到达人民广场");
});
executorService.execute(()->{
System.out.println("小红开始出发");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown(); // 计数器 -1
System.out.println("小红到达人民广场");
});


latch.await();
System.out.println("小明和小红都到达了人民广场,开始一起出发去爬山");

executorService.shutdown();

}
}

结果

1
2
3
4
5
6
7
小明开始出发
小红开始出发
小明到达人民广场
小红到达人民广场
小明和小红都到达了人民广场,开始一起出发去爬山

Process finished with exit code 0

可以看到结果如前文一致,这就实现了一个简单的CountDownLatch,当然具体实现还有更多的细节,如有需要,请翻阅源码。

总结

通过上面的简单实现,我们可以看到CountDownLatch基于计数器实现了多线程之间的门阀拦截,底层还是通过线程之间的通讯、锁和计数器控制。

CyclicBarrier

除了使用CountDownLatch来实现多线程之间的阻塞同步,也可以使用CyclicBarrier来实现,并且CyclicBarrier提供了比CountDownLatch更强大的功能。

CyclicBarrier的字面意思是可循环使用的屏障。它提供了一种同步机制,使一组线程能够在达到屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会开启,所有被阻塞的线程才能继续执行。

网上找的一张示意图

示例

还是用之前的例子,模拟小明和小红去爬山,代码如下,结果就不赘述了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public static void main(String[] args) throws InterruptedException, BrokenBarrierException {

CyclicBarrier cyclicBarrier = new CyclicBarrier(3);

ExecutorService executorService = Executors.newFixedThreadPool(2);

executorService.execute(()->{
System.out.println("小明开始出发");
try {
TimeUnit.SECONDS.sleep(2);
System.out.println("小明到达人民广场");
cyclicBarrier.await(); // 计数器 -1
} catch (Exception e) {
e.printStackTrace();
}

});
executorService.execute(()->{
System.out.println("小红开始出发");
try {
TimeUnit.SECONDS.sleep(3);
System.out.println("小红到达人民广场");
cyclicBarrier.await(); // 计数器 -1
} catch (Exception e) {
e.printStackTrace();
}
});
cyclicBarrier.await();
System.out.println("小明和小红都到达了人民广场,开始一起出发去爬山");

executorService.shutdown();

}

不同的是,这里我设置了三个屏障点 cyclicBarrier.await();,而使用CountDownLatch只用了两个计数器减一操作 + 一个wait()方法,使用起来很相似,我们说cyclicBarrierCountDownLatch 功能更强大,那么强大在哪里呢?

重置计数器和获取状态

重置计数器

CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置。所以CyclicBarrier能处理更为复杂的业务场景。例如,如果计算发生错误,可以重置计数器,并让线程重新执行一次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public static void main(String[] args) throws InterruptedException, BrokenBarrierException {

final int threadNum = 3;
ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
CyclicBarrier cyclicBarrier = new CyclicBarrier(threadNum, () -> {
System.out.println("所有线程都到达屏障");
});

for (int i = 0; i < threadNum; i++) {
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + " 到达屏障");
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}

Thread.sleep(2000); // 等待一段时间,确保所有线程都到达屏障

cyclicBarrier.reset(); // 重置屏障

System.out.println("屏障已重置");

for (int i = 0; i < threadNum-1; i++) {
executorService.execute(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 到达屏障");
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}

System.out.println("第二次进入 循环屏障");
cyclicBarrier.await();
System.out.println("第二次循环 迈过屏障");


executorService.shutdown();


}

结果

1
2
3
4
5
6
7
8
9
10
11
12
13
pool-1-thread-2 到达屏障
pool-1-thread-3 到达屏障
pool-1-thread-1 到达屏障
所有线程都到达屏障
屏障已重置
第二次进入 循环屏障
pool-1-thread-2 到达屏障
pool-1-thread-1 到达屏障
所有线程都到达屏障
第二次循环 迈过屏障

Process finished with exit code 0

先说一下CyclicBarrier提供的另一个构造函数CyclicBarrier(int parties,Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction,也就是上方代码中用到的这几段

1
2
3
CyclicBarrier cyclicBarrier = new CyclicBarrier(threadNum, () -> {
System.out.println("所有线程都到达屏障");
});

这里用于提示所有的线程到达屏障。紧接着是比较常规的代码,循环构造线程并在线程中执行了 cyclicBarrier.await();到达屏障。重点是 cyclicBarrier.reset(); 重置屏障后,我留下一个屏障给主线程测试使用,而在新构造的线程中停留1s, System.out.println("第二次循环 迈过屏障");打印在 System.out.println(Thread.currentThread().getName() + " 到达屏障");之后,说明屏障计数器已经重置并且生效了。

获取状态

除了上述的基本功能外,CyclicBarrier也提供了以下API用来查看状态,

  • getNumberWaiting() // 顾名思义,获取目前正在屏障处阻塞等待的线程数量。
  • getParties() // 获取屏障数量 也就是我们传入构造函数中的parties参数
  • isBroken() // 查询阻塞的线程是否被中断

参考资源

《Java并发编程的艺术》

《Java高并发编程详解》

jdk1.8 api文档