CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。
代码如下:
public class CyclicBarrierTest { static CyclicBarrier c = new CyclicBarrier(2); public static void main(String[] args) { new Thread(new Runnable() { @Override public void run() { try { c.await(); } catch (Exception e) { } System.out.println(1); } }).start(); try { c.await(); } catch (Exception e) { } System.out.println(2); } }因为主线程和子线程的调度是由CPU决定的,两个线程都有可能先执行,所以会产生两种输出,第一种可能输出如下:
1 2第二种可能输出如下:
2 1如果把new CyclicBarrier(2)修改成new CyclicBarrier(3)则主线程和子线程会永远等待,因为没有第三个线程执行await方法,即没有第三个线程到达屏障,所以之前到达屏障的两个线程都不会继续执行。
CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties, Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。代码如下:
public class CyclicBarrierTest2 { static CyclicBarrier c = new CyclicBarrier(2, new A()); public static void main(String[] args) { new Thread(new Runnable() { @Override public void run() { try { c.await(); } catch (Exception e) { } System.out.println(1); } }).start(); try { c.await(); } catch (Exception e) { } System.out.println(2); } static class A implements Runnable { @Override public void run() { System.out.println(3); } } }因为CyclicBarrier设置了拦截线程的数量是2,所以必须等代码中的第一个线程和线程A都执行完之后,才会继续执行主线程,然后输出2,所以代码执行后的输出如下。
3 1 2CyclicBarrier可以用于多线程计算数据,最后合并计算结果的应用场景。比如我们用一个Excel保存了用户所有银行流水,每个Sheet保存一个帐户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水,最后,再用barrierAction用这些线程的计算结果,计算出整个Excel的日均银行流水。代码如下:
<span style="color:#333333;">pa<span style="color:#330033;">ckage cn.dachao.thread; import java.util.Map; import java.util.concurrent.*; /** * Created by dachao on 16-7-28. */ public class BankWaterService implements Runnable{ /* 创建4个屏障,处理完之后执行当前类的run方法 */ private CyclicBarrier c=new CyclicBarrier(4,this); /* 假设只有4个sheet,所以只启动4个线程 */ private Executor executor= Executors.newFixedThreadPool(4); /* 保存每个sheet计算出的银流结果 */ private ConcurrentHashMap<String,Integer> count=new ConcurrentHashMap<>(); private void count(){ for(int i=0;i<4;i++){ executor.execute(new Runnable() { @Override public void run() { //计算当前sheet的银行数据,代码省略 count.put(Thread.currentThread().getName(),1); try { c.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }); } } @Override public void run(){ int result=0; //汇总每个sheet计算出的结果 for(Map.Entry<String,Integer> sheet:count.entrySet()){ result+=sheet.getValue(); } //将结果输出 count.put("result",result); System.out.println(result); } public static void main(String[]args){ BankWaterService service=new BankWaterService(); service.count(); } }使用线程池创建4个线程,分别计算每个sheet里的数据,每个sheet计算结果是1,再由BankWaterService线程汇总4个sheet计算出的结果,输出结果如下。
4CountDownLatch的计数器只能使用一次。而CyclicBarrier的计数器可以使用reset() 方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。
CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量。isBroken方法用来知道阻塞的线程是否被中断。比如以下代码执行完之后会返回true。
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierTest3 { static CyclicBarrier c = new CyclicBarrier(2); public static void main(String[] args) throws InterruptedException, BrokenBarrierException { Thread thread = new Thread(new Runnable() { @Override public void run() { try { c.await(); } catch (Exception e) { } } }); thread.start(); thread.interrupt(); try { c.await(); } catch (Exception e) { System.out.println(c.isBroken()); } } }输出如下:
true内容出自《Java并发编程的艺术》