消费者生产者模式

mac2024-04-07  27

1.synchronized——wait——notifyAll组合

public class ConsumerDemo2 { public int number = 0; //public Lock lock = new ReentrantLock(); //public Condition condition = lock.newCondition(); public synchronized void increase() throws Exception{ while(number != 0){ this.wait(); } number++; System.out.println(Thread.currentThread().getName()+"\t"+number); this.notifyAll(); } public synchronized void decrease() throws Exception{ while(number == 0){ this.wait(); } number--; System.out.println(Thread.currentThread().getName()+"\t"+number); this.notifyAll(); } public static void main(String[] args) { ConsumerDemo2 consumerDemo = new ConsumerDemo2(); new Thread(()->{ for(int i=0;i<5;i++){ try{ consumerDemo.increase(); }catch (Exception e){ e.printStackTrace(); } } },"A").start(); new Thread(()->{ for(int i=0;i<5;i++){ try{ consumerDemo.decrease(); }catch (Exception e){ e.printStackTrace(); } } },"B").start(); } }

2.lock——await——signalAlll组合

public class ConsumerDemo2 { public int number = 0; //public Lock lock = new ReentrantLock(); //public Condition condition = lock.newCondition(); public synchronized void increase() throws Exception{ while(number != 0){ this.wait(); } number++; System.out.println(Thread.currentThread().getName()+"\t"+number); this.notifyAll(); } public synchronized void decrease() throws Exception{ while(number == 0){ this.wait(); } number--; System.out.println(Thread.currentThread().getName()+"\t"+number); this.notifyAll(); } public static void main(String[] args) { ConsumerDemo2 consumerDemo = new ConsumerDemo2(); new Thread(()->{ for(int i=0;i<5;i++){ try{ consumerDemo.increase(); }catch (Exception e){ e.printStackTrace(); } } },"A").start(); new Thread(()->{ for(int i=0;i<5;i++){ try{ consumerDemo.decrease(); }catch (Exception e){ e.printStackTrace(); } } },"B").start(); } }

3.按a-b-c顺序打印,先打印A 5次,然后打印B 10次,然后打印C 15次

package prodecerConsumer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ConsumerDemo3 { public int number = 1; public Lock lock = new ReentrantLock(); public Condition c1 = lock.newCondition(); public Condition c2 = lock.newCondition(); public Condition c3 = lock.newCondition(); public void print5(){ lock.lock(); try{ while(number != 1){ c1.await(); } for(int i=1;i<=5;i++) { System.out.println(Thread.currentThread().getName() + "\t"+i); } number = 2; c2.signal(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } public void print10(){ lock.lock(); try{ while(number != 2){ c2.await(); } for(int i=1;i<=10;i++) { System.out.println(Thread.currentThread().getName() + "\t"+i); } number = 3; c3.signal(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } public void print15(){ lock.lock(); try{ while(number != 3){ c3.await(); } for(int i=1;i<=15;i++) { System.out.println(Thread.currentThread().getName() + "\t"+i); } number = 1; c1.signal(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } public static void main(String[] args) { ConsumerDemo3 consumerDemo = new ConsumerDemo3(); new Thread(()->{ for(int i=0;i<5;i++){ consumerDemo.print5(); } },"A").start(); new Thread(()->{ for(int i=0;i<5;i++){ consumerDemo.print10(); } },"B").start(); new Thread(()->{ for(int i=0;i<5;i++){ consumerDemo.print15(); } },"C").start(); } }

4.阻塞队列(重点)

package prodecerConsumer; import java.lang.annotation.Retention; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ConsumerDemo5 { public volatile boolean flag = true;//可见性 public AtomicInteger atomicInteger = new AtomicInteger(); BlockingQueue<String> blockingQeque = null;//接口扩展性 public ConsumerDemo5(BlockingQueue<String> blockingQeque) { this.blockingQeque = blockingQeque; System.out.println(blockingQeque.getClass().getName());//反射获取当前日志考虑 } public void producer() throws Exception { String data = null; boolean retValue; while (flag) { data = atomicInteger.incrementAndGet() + ""; retValue = blockingQeque.offer(data, 2L, TimeUnit.SECONDS); if (retValue) { System.out.println(Thread.currentThread().getName() + "插入队列 " + data + "成功"); } else { System.out.println(Thread.currentThread().getName() + "插入队列 " + data + "失败"); } TimeUnit.SECONDS.sleep(1); } System.out.println("producer 生产结束"); } public void consumer() throws Exception { String result = null; while (flag) { result = blockingQeque.poll(2L, TimeUnit.SECONDS); if (null == result || result.equalsIgnoreCase("")) { flag = false; System.out.println(Thread.currentThread().getName() + " 超过2s没有取到数据,消费退出"); return; } System.out.println(Thread.currentThread().getName() + "消费队列 " + result + "成功"); } } public void stop() { flag = false; } public static void main(String[] args) { ConsumerDemo5 consumerDemo5 = new ConsumerDemo5(new ArrayBlockingQueue<>(10)); new Thread(() -> { try { consumerDemo5.producer(); } catch (Exception e) { e.printStackTrace(); } }, "producer").start(); new Thread(() -> { try { consumerDemo5.consumer(); } catch (Exception e) { e.printStackTrace(); } }, "consumer").start(); try { TimeUnit.SECONDS.sleep(5);//等待五秒钟结束 } catch (Exception e) { e.printStackTrace(); } System.out.println("活动结束"); consumerDemo5.stop(); } }

 

最新回复(0)