生产者消费者3.0

mac2022-06-30  21

阻塞队列,volatile,AtomicInteger

package Juc; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * 生产者生产、消费者消费,可停止? */ class MyResource{ private volatile boolean FLAG = true;//默认开启,进行生产+消费 private AtomicInteger atomicInteger = new AtomicInteger(); BlockingQueue<String> blockingQueue =null; public MyResource(BlockingQueue<String> blockingQueue) { this.blockingQueue = blockingQueue; System.out.println(blockingQueue.getClass().getName());//具体用的那个阻塞队列,高度抽象,细节落地 } public void myProd()throws Exception{ String data=null; boolean retValue; while(FLAG){ data=atomicInteger.incrementAndGet()+""; //它可以保证原子性但是再加上添加队列是否保持原子性? retValue=blockingQueue.offer(data, 2L, TimeUnit.SECONDS);//它可以保证原子性但是再加上添加队列是否保持原子性? if(retValue){ System.out.println(Thread.currentThread().getName()+"\t 插入队列"+data+"成功"); }else{ System.out.println(Thread.currentThread().getName()+"\t 插入队列"+data+"失败"); } //try{TimeUnit.SECONDS.sleep(1);}catch (Exception e){e.printStackTrace();} } System.out.println(Thread.currentThread().getName()+"\t 老板叫停,表示FLAG="+FLAG+"生产结束"); } public void myConsumer()throws Exception{ String result = null; while(FLAG){ result=blockingQueue.poll(2L,TimeUnit.SECONDS); if(null==result||result.equalsIgnoreCase("")) { FLAG=false; System.out.println(Thread.currentThread().getName()+"\t 超过2s没有取到数据"+result+"退出!"); System.out.println(); System.out.println(); System.out.println(); return; } System.out.println(Thread.currentThread().getName()+"\t 取到数据"+result+"成功!"); } } public void stop(){ FLAG=false; } } /** * Volatile、CAS、atomicInteger、BlockQueue、线程交互、原子引用、 */ public class ProdConsumer_BlockQueueDemo { public static void main(String[] args) { MyResource myResource=new MyResource(new ArrayBlockingQueue<>(10)); new Thread(() -> { System.out.println(Thread.currentThread().getName()+"\t 生产者线程启动1!"); try { myResource.myProd(); } catch (Exception e) { e.printStackTrace(); } },"prod01").start(); new Thread(() -> { System.out.println(Thread.currentThread().getName()+"\t 生产者线程启动2!"); try { myResource.myProd(); } catch (Exception e) { e.printStackTrace(); } },"prod02").start(); new Thread(() -> { System.out.println(Thread.currentThread().getName()+"\t 消费者线程启动!"); System.out.println(); System.out.println(); try { myResource.myConsumer(); System.out.println(); System.out.println(); } catch (Exception e) { e.printStackTrace(); } },"cinsumer01").start(); new Thread(() -> { System.out.println(Thread.currentThread().getName()+"\t 消费者线程启动!"); System.out.println(); System.out.println(); try { myResource.myConsumer(); System.out.println(); System.out.println(); } catch (Exception e) { e.printStackTrace(); } },"cinsumer02").start(); try{TimeUnit.SECONDS.sleep(5);}catch (Exception e){e.printStackTrace();} myResource.stop(); System.out.println(); System.out.println(); System.out.println(); System.out.println("1s时间到,大老板main线程叫停,活动结束"); } }

最新回复(0)