直接上代码,先看,然后可以想下:Java5.0后引入Concurrent并发包类,采用非阻塞算法来优化实现多线程操作中的并发问题,而之前采用syncronized锁算法模式。
package com.zbv.producerAndcustomer; import java.text.SimpleDateFormat; import java.util.Date; import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; /** * 生产者\消费者模式 */ public class App { public static Object object = new Object(); public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(2); //Concurrent并发包 LinkedBlockingQueue<E> 就不需要syncronized锁处理了 // Queue<Integer> queue = new LinkedList<>(); LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(1); final Producer producer = new Producer(queue); Customer customer = new Customer(queue); executorService.execute(producer); executorService.execute(customer); try { Thread.sleep(10 * 1000); producer.stop(); customer.stop(); System.out.println("关闭日期=" + timeStr()); executorService.shutdown(); } catch (InterruptedException e) { e.printStackTrace(); } } public static String timeStr() { SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS"); return simpleDateFormat.format(new Date()); } /** * 生产者 */ public static class Producer implements Runnable { private boolean FLAG = true; private Queue<Integer> queue; private int count; public int getCount() { return count; } public Producer(Queue<Integer> queue) { this.queue = queue; } @Override public void run() { while (FLAG) { count++; boolean offerResult = false; try { synchronized (object) { offerResult = queue.offer(count); } if (offerResult) { System.out.println("日期:" + timeStr() + " " + Thread.currentThread().getName() + ":生产了第" + count + "产品成功"); } else { System.out.println("日期:" + timeStr() + " " + Thread.currentThread().getName() + ":生产了第" + count + "产品失败"); } Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } public void stop() { FLAG = false; } } /** * 消费者 */ public static class Customer implements Runnable { private boolean FLAG = true; private Queue<Integer> queue; public Customer(Queue<Integer> queue) { this.queue = queue; } @Override public void run() { while (FLAG) { Integer result = null; try { synchronized (object) { result = queue.poll(); } if (result == null) { System.out.println("日期:" + timeStr() + " " + Thread.currentThread().getName() + ":当前队列为空,无法消费,在等待中..."); } else { System.out.println("日期:" + timeStr() + " " + Thread.currentThread().getName() + ":消费了第" + result + "产品"); } Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } public void stop() { FLAG = false; } } }使用Object wait notify的生产者消费者模式:
package com.qzx.learningThread.LearningThread; import java.util.ArrayList; import java.util.List; import java.util.PriorityQueue; import java.util.Queue; /** * 使用Object wait notify notifyAll写一个简单的消费者、生产者模式 */ public class ConsumerProducerOne { private volatile PriorityQueue<String> stringPriorityQueue; private static final int CapacitySize = 1; private Object lockObj = new Object(); private volatile boolean needStop = false; private final Runnable consumer; private final Runnable producer; /** * 结束实验 */ public void stopTry() { needStop = true; } public ConsumerProducerOne() { stringPriorityQueue = new PriorityQueue<>(CapacitySize); consumer = new ConsumerRunnable(); producer = new ProducerRunnable(); new Thread(consumer).start(); new Thread(producer).start(); } private class ConsumerRunnable implements Runnable { @Override public void run() { while (!needStop) { synchronized (lockObj) { if (stringPriorityQueue.size() <= 0) { System.out.println("仓库没有产品了..."); try { lockObj.wait(); } catch (InterruptedException e) { e.printStackTrace(); System.out.println("consumer happen interrupted exception" + e); } } String obj = stringPriorityQueue.poll(); System.out.println("消费 " + obj); lockObj.notify(); } } } } private class ProducerRunnable implements Runnable { @Override public void run() { while (!needStop) { synchronized (lockObj) { if (stringPriorityQueue.size() >= CapacitySize) { System.out.println("仓库已满,暂时不需要生产了"); try { lockObj.wait(); } catch (InterruptedException e) { e.printStackTrace(); System.out.println("producer happen interrupted exception" + e); } } //重点:睡眠放在锁代码块外面 // try { // Thread.sleep(1000); // } catch (InterruptedException e) { // e.printStackTrace(); // } System.out.println("生产第" + (stringPriorityQueue.size() + 1) + "个产品"); stringPriorityQueue.offer("这是第" + (stringPriorityQueue.size() + 1) + "个产品"); lockObj.notify(); } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } }使用Condition的await、signal的生产者消费者模式(这个比object 的wait和notify好用且相对安全)
package com.qzx.learningThread.LearningThread; import java.util.PriorityQueue; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ConsumerProducerTwo { private static final int CapacitySize = 10; private PriorityQueue<String> stringPriorityQueue = new PriorityQueue<>(CapacitySize); private final Lock lock = new ReentrantLock(); private Condition consumerCond = lock.newCondition(); private Condition producerCond = lock.newCondition(); private volatile boolean needStop = false; public void stopTry() { needStop = true; } public ConsumerProducerTwo() { new Consumer().start(); new Producer().start(); } private class Consumer extends Thread { public Consumer() { } @Override public void run() { super.run(); while (!needStop) { try { lock.lock(); if (stringPriorityQueue.size() <= 0) { try { consumerCond.await(); } catch (InterruptedException e) { e.printStackTrace(); System.out.println("consumer happen interrupt exception" + e); } } String obj = stringPriorityQueue.poll(); System.out.println("消费 " + obj); producerCond.signal(); } finally { lock.unlock(); } } } } private class Producer extends Thread { public Producer() { } @Override public void run() { super.run(); while (!needStop) { try { lock.lock(); if (stringPriorityQueue.size() >= CapacitySize) { try { producerCond.await(); } catch (InterruptedException e) { e.printStackTrace(); System.out.println("producer happen interrupt exception" + e); } } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("生产第" + (stringPriorityQueue.size() + 1) + "个产品"); stringPriorityQueue.offer("这是第" + (stringPriorityQueue.size() + 1) + "个产品"); consumerCond.signal(); } finally { lock.unlock(); } } } } }