一、线程池
1: public static void main(String[] args) { 2: // 产生线程池,有3个线程,使用固定线程池创建 3: //ExecutorService threadPool = Executors.newFixedThreadPool(3); 4: //产生线程池,动态创建线程池的大小 5: ExecutorService threadPool = Executors.newCachedThreadPool(); 6: //向线程池添加10个任务 7: for (int i=1; i<10; i++) { 8: final int task = i; 9: threadPool.execute(new Runnable() { 10: @Override 11: public void run() { 12: for (int j=0; j<10; j++) { 13: try { 14: Thread.sleep(20); 15: } catch (InterruptedException e) { 16: // TODO Auto-generated catch block 17: e.printStackTrace(); 18: } 19: System.out.println( 20: Thread.currentThread().getName() + 21: " is looping of " + j + 22: " for task " + task); 23: } 24: 25: } 26: }); 27: } 28: 29: //线程调度的使用 30: //该功能为定时运行6秒以后运行,然后每隔2秒运行一次 31: Executors.newScheduledThreadPool(3).scheduleAtFixedRate( 32: new Runnable() { 33: @Override 34: public void run() { 35: // TODO Auto-generated method stub 36: System.out.println("bomb...."); 37: } 38: }, 39: 6, 40: 2, 41: TimeUnit.SECONDS); 42: }二、并发锁(lock)
Lock比传统线程模型中的synchronized方式更加面向对象。
1: Lock lock = new ReentrantLock(); 2: lock.lock(); 3: try { 4: ........ 5: } finally { 6: lock.unlock(); 7: } 读写锁:读锁不互斥,读锁与写锁互斥,写锁与写锁互斥 1: //从JAVA文档中找到的,如果数据没有读到,就解开读锁,加上写锁 2: class CachedData { 3: Object data; 4: volatile boolean cacheValid; 5: ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); 6: 7: void processCachedData() { 8: rwl.readLock().lock(); 9: if (!cacheValid) { 10: // Must release read lock before acquiring write lock 11: rwl.readLock().unlock(); 12: rwl.writeLock().lock(); 13: // Recheck state because another thread might have acquired 14: // write lock and changed state before we did. 15: if (!cacheValid) { 16: data = ... 17: cacheValid = true; 18: } 19: // Downgrade by acquiring read lock before releasing write lock 20: //我的理解,写上读锁,下面的写锁降级成更新锁 21: rwl.readLock().lock(); 22: rwl.writeLock().unlock(); // Unlock write, still hold read 23: } 24: 25: use(data); 26: rwl.readLock().unlock(); 27: } 28: }
三、锁的条件(Condition)
Condition的功能就是在传统线程技术中的wait和notify的功能。
1: /** 2: * 每个方法各执行分10次和5次运行 3: * 4: */ 5: public class ConditionTest { 6: 7: public static void main(String[] args) { 8: ExecutorService service = Executors.newSingleThreadExecutor(); 9: final Business2 business = new Business2(); 10: service.execute(new Runnable(){ 11: 12: public void run() { 13: for(int i=0;i<50;i++){ 14: business.sub(); 15: } 16: } 17: 18: }); 19: 20: for(int i=0;i<50;i++){ 21: business.main(); 22: } 23: } 24: 25: } 26: 27: class Business2{ 28: Lock lock = new ReentrantLock(); 29: //为了达到线程间通信 30: Condition condition = lock.newCondition(); 31: boolean bShouldSub = true; 32: public void sub(){ 33: lock.lock(); 34: if(!bShouldSub) 35: try { 36: condition.await();//线程等待 37: } catch (InterruptedException e) { 38: // TODO Auto-generated catch block 39: e.printStackTrace(); 40: } 41: try 42: { 43: for(int i=0;i<10;i++){ 44: System.out.println(Thread.currentThread().getName() + " : " + i); 45: } 46: bShouldSub = false; 47: //通知其他的线程 48: condition.signal(); 49: }finally{ 50: lock.unlock(); 51: } 52: } 53: 54: public void main(){ 55: lock.lock(); 56: if(bShouldSub) 57: try { 58: condition.await(); 59: } catch (InterruptedException e) { 60: e.printStackTrace(); 61: } 62: try 63: { 64: for(int i=0;i<5;i++){ 65: System.out.println(Thread.currentThread().getName() + " : " + i); 66: } 67: bShouldSub = true; 68: condition.signal(); 69: }finally{ 70: lock.unlock(); 71: } 72: } 73: }另外,再来一个JDK中的例子,更加经典
1: //从JDK中找到的例子,还是JDK得例子经典 2: //本例子是一个可阻塞的队列 3: class BoundedBuffer { 4: final Lock lock = new ReentrantLock(); 5: //此处用到2个Condition,是为了区别取和放操作 6: final Condition notFull = lock.newCondition(); 7: final Condition notEmpty = lock.newCondition(); 8: //队列为100 9: final Object[] items = new Object[100]; 10: int putptr, takeptr, count; 11: 12: public void put(Object x) throws InterruptedException { 13: lock.lock(); 14: try { 15: while (count == items.length) 16: notFull.await();//队列满,等待 17: items[putptr] = x; 18: if (++putptr == items.length) 19: putptr = 0;//指针越界 20: ++count; 21: notEmpty.signal(); 22: } finally { 23: lock.unlock(); 24: } 25: } 26: 27: public Object take() throws InterruptedException { 28: lock.lock(); 29: try { 30: while (count == 0) 31: notEmpty.await();//队列空 32: Object x = items[takeptr]; 33: if (++takeptr == items.length) takeptr = 0; 34: --count; 35: notFull.signal(); 36: return x; 37: } finally { 38: lock.unlock(); 39: } 40: } 41: }
四、信号灯(Semaphore)
维护当前访问自身的线程个数,并提供同步机制。
1: public class SemaphoreTest { 2: public static void main(String[] args) { 3: ExecutorService service = Executors.newCachedThreadPool(); 4: final Semaphore sp = new Semaphore(3);//只允许3个线程的并发 5: for(int i=0;i<10;i++){ 6: Runnable runnable = new Runnable(){ 7: public void run(){ 8: try { 9: sp.acquire();//是否可以让当前线程执行 10: } catch (InterruptedException e1) { 11: e1.printStackTrace(); 12: } 13: System.out.println("线程" + Thread.currentThread().getName() + 14: "进入,当前已有" + (3-sp.availablePermits()) + "个并发"); 15: try { 16: Thread.sleep((long)(Math.random()*10000)); 17: } catch (InterruptedException e) { 18: e.printStackTrace(); 19: } 20: System.out.println("线程" + Thread.currentThread().getName() + 21: "即将离开"); 22: sp.release();//释放信号量 23: //下面代码有时候执行不准确,因为其没有和上面的代码合成原子单元 24: System.out.println("线程" + Thread.currentThread().getName() + 25: "已离开,当前已有" + (3-sp.availablePermits()) + "个并发"); 26: } 27: }; 28: service.execute(runnable); 29: } 30: } 31: 32: }五、其他同步工具 CyclicBarrier:需要有多个线程同时到达才向下执行
1: 2: public class CyclicBarrierTest { 3: //模拟旅游集合的情况 4: public static void main(String[] args) { 5: ExecutorService service = Executors.newCachedThreadPool(); 6: final CyclicBarrier cb = new CyclicBarrier(3); 7: for(int i=0;i<3;i++){ 8: Runnable runnable = new Runnable(){ 9: public void run(){ 10: try { 11: Thread.sleep((long)(Math.random()*10000)); 12: System.out.println("线程" + Thread.currentThread().getName() + 13: "即将到达集合地点1,当前已有" + cb.getNumberWaiting() + "个已经到达,正在等候"); 14: cb.await();//只有3个线程都到此处,程序才会往下走 15: 16: Thread.sleep((long)(Math.random()*10000)); 17: System.out.println("线程" + Thread.currentThread().getName() + 18: "即将到达集合地点2,当前已有" + cb.getNumberWaiting() + "个已经到达,正在等候"); 19: cb.await(); 20: Thread.sleep((long)(Math.random()*10000)); 21: System.out.println("线程" + Thread.currentThread().getName() + 22: "即将到达集合地点3,当前已有" + cb.getNumberWaiting() + "个已经到达,正在等候"); 23: cb.await(); 24: } catch (Exception e) { 25: e.printStackTrace(); 26: } 27: } 28: }; 29: service.execute(runnable); 30: 31: } 32: service.shutdown(); 33: } 34: 35: }CountDownLatch:犹如倒计数器,等计数器减少到0,程序向下执行
1: public class CountdownLatchTest { 2: //模拟赛跑的情况 3: public static void main(String[] args) { 4: ExecutorService service = Executors.newCachedThreadPool(); 5: //计数器 6: final CountDownLatch cdOrder = new CountDownLatch(1); 7: final CountDownLatch cdAnswer = new CountDownLatch(3); 8: for(int i=0;i<3;i++){ 9: Runnable runnable = new Runnable(){ 10: public void run(){ 11: try { 12: System.out.println("线程" + Thread.currentThread().getName() + 13: "正准备接受命令"); 14: cdOrder.await(); 15: System.out.println("线程" + Thread.currentThread().getName() + 16: "已接受命令"); 17: Thread.sleep((long)(Math.random()*10000)); 18: System.out.println("线程" + Thread.currentThread().getName() + 19: "回应命令处理结果"); 20: cdAnswer.countDown(); 21: } catch (Exception e) { 22: e.printStackTrace(); 23: } 24: } 25: }; 26: service.execute(runnable); 27: } 28: //主线程 29: try { 30: Thread.sleep((long)(Math.random()*10000)); 31: 32: System.out.println("线程" + Thread.currentThread().getName() + 33: "即将发布命令"); 34: //计数器减1 35: cdOrder.countDown(); 36: System.out.println("线程" + Thread.currentThread().getName() + 37: "已发送命令,正在等待结果"); 38: cdAnswer.await();//等待计数器为0,然后主线程往下走 39: System.out.println("线程" + Thread.currentThread().getName() + 40: "已收到所有响应结果"); 41: } catch (Exception e) { 42: e.printStackTrace(); 43: } 44: service.shutdown(); 45: 46: } 47: }
Exchanger:实现线程间的数据交换
1: public class ExchangerTest { 2: 3: public static void main(String[] args) { 4: ExecutorService service = Executors.newCachedThreadPool(); 5: final Exchanger exchanger = new Exchanger(); 6: service.execute(new Runnable(){ 7: public void run() { 8: try { 9: Thread.sleep((long)(Math.random()*10000)); 10: String data1 = "zxx"; 11: System.out.println("线程" + Thread.currentThread().getName() + 12: "正在把数据" + data1 +"换出去"); 13: String data2 = (String)exchanger.exchange(data1);//交换数据 14: System.out.println("线程" + Thread.currentThread().getName() + 15: "换回的数据为" + data2); 16: }catch(Exception e){ 17: 18: } 19: } 20: }); 21: service.execute(new Runnable(){ 22: public void run() { 23: try { 24: Thread.sleep((long)(Math.random()*10000)); 25: String data1 = "lhm"; 26: System.out.println("线程" + Thread.currentThread().getName() + 27: "正在把数据" + data1 +"换出去"); 28: String data2 = (String)exchanger.exchange(data1); 29: System.out.println("线程" + Thread.currentThread().getName() + 30: "换回的数据为" + data2); 31: }catch(Exception e){ 32: 33: } 34: } 35: }); 36: } 37: 38: }
六、可阻塞的队列
1: public class BlockingQueueCondition { 2: //现在2个线程的交替操作 3: public static void main(String[] args) { 4: ExecutorService service = Executors.newSingleThreadExecutor(); 5: final Business3 business = new Business3(); 6: service.execute(new Runnable(){ 7: 8: public void run() { 9: for(int i=0;i<50;i++){ 10: business.sub(); 11: } 12: } 13: 14: }); 15: 16: for(int i=0;i<50;i++){ 17: business.main(); 18: } 19: } 20: 21: } 22: 23: class Business3{ 24: BlockingQueue subQueue = new ArrayBlockingQueue(1); 25: BlockingQueue mainQueue = new ArrayBlockingQueue(1); 26: //匿名构造方法,相当于一个构造方法 27: { 28: try { 29: mainQueue.put(1);//让主队列满,不能put操作 30: } catch (InterruptedException e) { 31: e.printStackTrace(); 32: } 33: } 34: public void sub(){ 35: try 36: { 37: mainQueue.take(); 38: for(int i=0;i<10;i++){ 39: System.out.println(Thread.currentThread().getName() + " : " + i); 40: } 41: subQueue.put(1); 42: }catch(Exception e){ 43: 44: } 45: } 46: 47: public void main(){ 48: 49: try 50: { 51: subQueue.take(); 52: for(int i=0;i<5;i++){ 53: System.out.println(Thread.currentThread().getName() + " : " + i); 54: } 55: mainQueue.put(1); 56: }catch(Exception e){ 57: } 58: } 59: }
七、同步集合类
转载于:https://www.cnblogs.com/sodmecai/archive/2012/05/04/2483437.html