障栅相当于程序中的一个集合点,当结果在中间步骤需要整合的时候会经常用到,当线程需要等待其他线程时,可以让该线程运行到障栅处,一旦所有线程都到达了这个障栅,障栅就撤销,线程可以继续向下运行。 障栅 Cyclicbarrier 类是一个同步辅助类,实现了一个称为障栅的集合点,在不是所有的线程到达集合点前,线程之间可以相互等待。Cyclic的含义是"循环的、周期的",代表该障栅在所有 等待的线程到达该集合点并释放后可以循环使用。 CyclicBarrier类比较适合于线程数量固定的情况。 CyclicBarrier类的构造方法如下: CyclicBarrier(int parties) //创建一个CyclicBarrier障栅对象,其中,parties为需要的等待线程个数,线程等待时启动。 CyclicBarrier(int parties,Runnable barrierAction) //创建一个CyclicBarrier障栅对象,parties为需要等待的线程个数,barrierAction 定义了最后一个进入障栅的线程要执行的动作。 例如: CyclicBarrier barrier = new CyclicBarrier(4); 表示创建了一个障栅,将有4个线程到达障栅后,才能继续向下运行。为了将需要等待的线程和该障栅联系起来,需要用到await()方法。 在障栅的对象上 可以调用await()方法,该方法需要放置到try....catch....语句块中, 并捕捉InterruptedException和BrokenBarrierException异常。线程在完成自己的工作后调用await()方法等待。当最后一个线程调用了await()方法后,将唤醒所有的线程,并继续该障栅点之后的工作。、 障栅的使用示例如下: public void run(){ ...//需要需要处理任务 try{ barrier.await(); }catch(InterruptedException|BrokenBarrierException e){ e.printStackTrace(); } } Demo示例: 数据排序的并行化实现 //工作线程类 public class Worker extends Thread{ int [] arr; CyclicBarrier barrier; public Worker( int[] arr,CyclicBarrier barrier){ this.arr = arr; this.barrier = barrier; } public void run(){ Arrays.sort(arr); try{ barrier.await(); }catch(InterruptedException | BrokenBarrierException e){ e.printStackTrace(); } } } public class Index{ public static void main(String [] args ){ int N = 5000000; int threads=2; int [] array = new int[N]; for(int i=0;i<N;i++){ array[i] = (int)(Math.random()*N); } System.out.println("数据库初始化完毕!"); int[] data = new int[threads+1]; int slice = N /threads; for(int i=0 ;i<=threads; i++){ data[i]=slice*i; if(data[i]>N){ data[i]=N; } } int[][] subAry =new int[threads][slice]; for(int i=0;i<threads;i++){ subAry[i] = Arrays.copyOfRange(array,data[i],data[i+1]); } System.out.println("数据划分完成>>>>>>"); Thread [] t = new Thread[threads]; CyclicBarrier barrier = new CyclicBarrier(threads+1); for(int i=0;i<threads;i++){ t[i] = new Worker( subAry[i],barrier); t[i].start(); } System.out.println(threads+"个线程已经启动>>>>"); try{ barrier.await(); }catch(InterruptedException | BrokenBarrierException e){ e.printStackTrace(); } //合并已经排好的数据 array = converge(subAry[0],subAry[1]); if(check(array)){ System.out.println("排序成功!"); }else{ System.out.println("排序失败!"); } } //合并已经排好的数据 public static int[] converge(int[] arr1,int [] arr2 ){ int [] arr = new int[ arr1.length+arr2.length]; int i1 = 0,i2 =0,i=0; while(i1<arr1.length&&i2<arr2.length){ if(arr1[i1]<arr2[i2]){ arr[i] = arr1[i1]; i++; i1++; }else{ arr[i] = arr2[i2]; i++; i2++; } } while(i1<arr1.length){ arr[i] =arr1[i1]; i++; i1++; } while(i2<arr2.length){ arr[i] = arr2[i2]; i++; i2++; } return arr; } public static boolean check( int[] arr){ int length = arr.length; for(int i=0;i<length;i++){ if(arr[i]>arr[i+1]){ return false; } } return true; } }
结果:
数据库初始化完毕! 数据划分完成>>>>>> 2个线程已经启动>>>> 排序成功!
